fix bug in node level persistent
[charm.git] / src / arch / util / machine-common-core.c
1 /*
2  *created by Chao Mei
3  *revised by Yanhua, Gengbin
4  */
5
6 #if CMK_C_INLINE
7 #define INLINE_KEYWORD inline
8 #else
9 #define INLINE_KEYWORD
10 #endif
11
12 /******* broadcast related  */
13 #ifndef CMK_BROADCAST_SPANNING_TREE
14 #define CMK_BROADCAST_SPANNING_TREE    1
15 #endif
16
17 #ifndef CMK_BROADCAST_HYPERCUBE
18 #define CMK_BROADCAST_HYPERCUBE        0
19 #endif
20
21 #define BROADCAST_SPANNING_FACTOR      4
22 /* The number of children used when a msg is broadcast inside a node */
23 #define BROADCAST_SPANNING_INTRA_FACTOR  8
24
25 /* Root of broadcast:
26  * non-bcast msg: root = 0;
27  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
28  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
29  */
30 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
31 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
32
33 /**
34  * For some machine layers such as on Active Message framework,
35  * the receiver callback is usally executed on an internal
36  * thread (i.e. not the flow managed by ours). Therefore, for
37  * forwarding broadcast messages, we could have a choice whether
38  * to offload such function to the flow we manage such as the
39  * communication thread. -
40  */
41
42 #ifndef CMK_OFFLOAD_BCAST_PROCESS
43 #define CMK_OFFLOAD_BCAST_PROCESS 0
44 #endif
45
46 #if CMK_OFFLOAD_BCAST_PROCESS
47 CsvDeclare(PCQueue, procBcastQ);
48 #if CMK_NODE_QUEUE_AVAILABLE
49 CsvDeclare(PCQueue, nodeBcastQ);
50 #endif
51 #endif
52
53 #if CMK_WITH_STATS
54 static int  MSG_STATISTIC = 0;
55 int     msg_histogram[22];
56 static int _cmi_log2(int size)
57 {
58     int ret = 1;
59     size = size-1;
60     while( (size=size>>1)>0) ret++;
61     return ret;
62 }
63 #endif
64
65 #if CMK_BROADCAST_HYPERCUBE
66 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
67 static int CmiNodesDim;
68 #endif
69 /* ###End of Broadcast related definitions ### */
70
71
72 static void handleOneBcastMsg(int size, char *msg);
73 static void processBcastQs();
74
75 /* Utility functions for forwarding broadcast messages,
76  * should not be used in machine-specific implementations
77  * except in some special occasions.
78  */
79 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg);
80 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg);
81 static void SendSpanningChildrenProc(int size, char *msg);
82 static void SendHyperCubeProc(int size, char *msg);
83 #if CMK_NODE_QUEUE_AVAILABLE
84 static void SendSpanningChildrenNode(int size, char *msg);
85 static void SendHyperCubeNode(int size, char *msg);
86 #endif
87
88 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
89 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
90
91 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
92 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
93 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
94 #endif
95 #endif
96
97
98 void CmiSyncBroadcastFn(int size, char *msg);
99 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
100 void CmiFreeBroadcastFn(int size, char *msg);
101
102 void CmiSyncBroadcastAllFn(int size, char *msg);
103 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
104 void CmiFreeBroadcastAllFn(int size, char *msg);
105
106 #if CMK_NODE_QUEUE_AVAILABLE
107 void CmiSyncNodeBroadcastFn(int size, char *msg);
108 CmiCommHandle CmiAsyncNodeeroadcastFn(int size, char *msg);
109 void CmiFreeNodeBroadcastFn(int size, char *msg);
110
111 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
112 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
113 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
114 #endif
115
116 /************** Done with Broadcast related */
117
118 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
119
120 #ifndef CMK_HAS_SIZE_IN_MSGHDR
121 #define CMK_HAS_SIZE_IN_MSGHDR 1
122 #endif
123 #if CMK_HAS_SIZE_IN_MSGHDR
124 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
125 #else
126 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
127 #endif
128
129 #if CMK_NODE_QUEUE_AVAILABLE
130 /* This value should be larger than the number of cores used
131  * per charm smp node. So it's currently set to such a large
132  * value.
133  */
134 #define DGRAM_NODEMESSAGE   (0x1FFB)
135 #endif
136
137 /* Node state structure */
138 int               _Cmi_mynode;    /* Which address space am I */
139 int               _Cmi_mynodesize;/* Number of processors in my address space */
140 int               _Cmi_numnodes;  /* Total number of address spaces */
141 int               _Cmi_numpes;    /* Total number of processors */
142
143 CpvDeclare(void*, CmiLocalQueue);
144
145 /* different modes for sending a message */
146 #define P2P_SYNC      0x1
147 #define P2P_ASYNC     0x2
148 #define BCAST_SYNC    0x4
149 #define BCAST_ASYNC   0x8
150 #define OUT_OF_BAND   0x10
151
152 enum MACHINE_SMP_MODE {
153     INVALID_MODE,
154 #if CMK_BLUEGENEQ
155     COMM_THREAD_SEND_RECV = 1,
156 #else 
157     COMM_THREAD_SEND_RECV = 0,
158 #endif
159     COMM_THREAD_ONLY_RECV, /* work threads will do the send */
160     COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
161     COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
162 };
163 /* The default mode of smp charm runtime */
164 static enum MACHINE_SMP_MODE Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV;
165
166
167 #if CMK_SMP
168 static volatile int commThdExit = 0;
169 static CmiNodeLock  commThdExitLock = 0;
170
171 /**
172  *  The macro defines whether to have a comm thd to offload some
173  *  work such as forwarding bcast messages etc. This macro
174  *  should be defined before including "machine-smp.c". Note
175  *  that whether a machine layer in SMP mode could run w/o comm
176  *  thread depends on the support of the underlying
177  *  communication library.
178  *
179  */
180 #ifndef CMK_SMP_NO_COMMTHD
181 #define CMK_SMP_NO_COMMTHD 0
182 #endif
183
184 #if CMK_SMP_NO_COMMTHD
185 int Cmi_commthread = 0;
186 #else
187 int Cmi_commthread = 1;
188 #endif
189
190 #endif
191
192 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
193 static int Cmi_nodestart;
194
195 /*
196  * Network progress utility variables. Period controls the rate at
197  * which the network poll is called
198  */
199 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
200 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
201 #endif
202
203 CpvDeclare(unsigned , networkProgressCount);
204 int networkProgressPeriod;
205
206
207 /* ===== Beginning of Common Function Declarations ===== */
208 void CmiAbort(const char *message);
209 static void PerrorExit(const char *msg);
210
211 /* This function handles the msg received as which queue to push into */
212 static void handleOneRecvedMsg(int size, char *msg);
213
214 /* Utility functions for forwarding broadcast messages,
215  * should not be used in machine-specific implementations
216  * except in some special occasions.
217  */
218 static void SendToPeers(int size, char *msg);
219
220
221 void CmiPushPE(int rank, void *msg);
222
223 #if CMK_NODE_QUEUE_AVAILABLE
224 void CmiPushNode(void *msg);
225 #endif
226
227 /* Functions regarding send ops declared in converse.h */
228
229 /* In default, using the common codes for msg sending */
230 #ifndef USE_COMMON_SYNC_P2P
231 #define USE_COMMON_SYNC_P2P 1
232 #endif
233 #ifndef USE_COMMON_ASYNC_P2P
234 #define USE_COMMON_ASYNC_P2P 1
235 #endif
236 #ifndef USE_COMMON_SYNC_BCAST
237 #define USE_COMMON_SYNC_BCAST 1
238 #endif
239 #ifndef USE_COMMON_ASYNC_BCAST
240 #define USE_COMMON_ASYNC_BCAST 1
241 #endif
242
243 static void CmiSendSelf(char *msg);
244
245 void CmiSyncSendFn(int destPE, int size, char *msg);
246 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
247 void CmiFreeSendFn(int destPE, int size, char *msg);
248
249 #if CMK_NODE_QUEUE_AVAILABLE
250 static void CmiSendNodeSelf(char *msg);
251
252 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
253 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
254 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
255
256 #endif
257
258 /* Functions and variables regarding machine startup */
259 static char     **Cmi_argv;
260 static char     **Cmi_argvcopy;
261 static CmiStartFn Cmi_startfn;   /* The start function */
262 static int        Cmi_usrsched;  /* Continue after start function finishes? */
263 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
264 static void ConverseRunPE(int everReturn);
265
266 /* Functions regarding machine running on every proc */
267 static void AdvanceCommunication(int whenidle);
268 static void CommunicationServer(int sleepTime);
269 static void CommunicationServerThread(int sleepTime);
270 void ConverseExit(void);
271
272 /* Functions providing incoming network messages */
273 void *CmiGetNonLocal(void);
274 #if CMK_NODE_QUEUE_AVAILABLE
275 void *CmiGetNonLocalNodeQ(void);
276 #endif
277 /* Utiltiy functions */
278 static char *CopyMsg(char *msg, int len);
279
280 /* ===== End of Common Function Declarations ===== */
281
282 #include "machine-smp.c"
283
284 /* ===== Beginning of Idle-state Related Declarations =====  */
285 typedef struct {
286     int sleepMs; /*Milliseconds to sleep while idle*/
287     int nIdles; /*Number of times we've been idle in a row*/
288     CmiState cs; /*Machine state*/
289 } CmiIdleState;
290
291 static CmiIdleState *CmiNotifyGetState(void);
292
293 /**
294  *  Generally,
295  *
296  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
297  *  When the proc is idle, AdvanceCommunication needs to be
298  *  called.
299  *
300  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
301  *
302  *  Different layers have choices of registering different callbacks for
303  *  idle state.
304  */
305 static void CmiNotifyBeginIdle(CmiIdleState *s);
306 static void CmiNotifyStillIdle(CmiIdleState *s);
307 void CmiNotifyIdle(void);
308 /* ===== End of Idle-state Related Declarations =====  */
309
310 /* ===== Beginning of Processor/Node State-related Stuff =====*/
311 #if !CMK_SMP
312 /************ non SMP **************/
313 static struct CmiStateStruct Cmi_state;
314 int _Cmi_mype;
315 int _Cmi_myrank;
316
317 void CmiMemLock() {}
318 void CmiMemUnlock() {}
319
320 #define CmiGetState() (&Cmi_state)
321 #define CmiGetStateN(n) (&Cmi_state)
322
323 void CmiYield(void) {
324     sleep(0);
325 }
326
327 static void CmiStartThreads(char **argv) {
328     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
329     _Cmi_mype = Cmi_nodestart;
330     _Cmi_myrank = 0;
331 }
332 #else
333 /************** SMP *******************/
334 INLINE_KEYWORD int CmiMyPe() {
335     return CmiGetState()->pe;
336 }
337 INLINE_KEYWORD int CmiMyRank() {
338     return CmiGetState()->rank;
339 }
340 INLINE_KEYWORD int CmiNodeFirst(int node) {
341     return node*_Cmi_mynodesize;
342 }
343 INLINE_KEYWORD int CmiNodeSize(int node) {
344     return _Cmi_mynodesize;
345 }
346 INLINE_KEYWORD int CmiNodeOf(int pe) {
347     return (pe/_Cmi_mynodesize);
348 }
349 INLINE_KEYWORD int CmiRankOf(int pe) {
350     return pe%_Cmi_mynodesize;
351 }
352 #endif
353 CsvDeclare(CmiNodeState, NodeState);
354 /* ===== End of Processor/Node State-related Stuff =====*/
355
356 #include "machine-broadcast.c"
357 #include "immediate.c"
358 #include "machine-commthd-util.c"
359
360 /* ===== Beginning of Common Function Definitions ===== */
361 static void PerrorExit(const char *msg) {
362     perror(msg);
363     exit(1);
364 }
365
366 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
367 /*Add a message to this processor's receive queue, pe is a rank */
368 void CmiPushPE(int rank,void *msg) {
369     CmiState cs = CmiGetStateN(rank);
370     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
371 #if CMK_IMMEDIATE_MSG
372     if (CmiIsImmediate(msg)) {
373         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
374         CMI_DEST_RANK(msg) = rank;
375         CmiPushImmediateMsg(msg);
376         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
377         return;
378     }
379 #endif
380
381     PCQueuePush(cs->recv,(char*)msg);
382
383 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
384   if (_Cmi_noprocforcommthread)
385 #endif
386     CmiIdleLock_addMessage(&cs->idle);
387     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
388 }
389
390 #if CMK_NODE_QUEUE_AVAILABLE
391 /*Add a message to this processor's receive queue */
392 void CmiPushNode(void *msg) {
393     MACHSTATE(3,"Pushing message into NodeRecv queue");
394 #if CMK_IMMEDIATE_MSG
395     if (CmiIsImmediate(msg)) {
396         CMI_DEST_RANK(msg) = 0;
397         CmiPushImmediateMsg(msg);
398         return;
399     }
400 #endif
401     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
402     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
403     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
404
405 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
406     if (_Cmi_noprocforcommthread)
407 #endif
408     {
409         CmiState cs=CmiGetStateN(0);
410         CmiIdleLock_addMessage(&cs->idle);
411     }
412 }
413 #endif
414
415 /* This function handles the msg received as which queue to push into */
416 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
417     int isBcastMsg = 0;
418 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
419     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
420 #endif
421
422     if (isBcastMsg) {
423         handleOneBcastMsg(size, msg);
424         return;
425     }
426
427 #if CMK_NODE_QUEUE_AVAILABLE
428     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE){
429         CmiPushNode(msg);
430         return;
431     }
432 #endif
433     CmiPushPE(CMI_DEST_RANK(msg), msg);
434
435 }
436
437
438 static void SendToPeers(int size, char *msg) {
439     /* FIXME: now it's just a flat p2p send!! When node size is large,
440     * it should also be sent in a tree
441     */
442     int exceptRank = CMI_DEST_RANK(msg);
443     int i;
444     for (i=0; i<exceptRank; i++) {
445         CmiPushPE(i, CopyMsg(msg, size));
446     }
447     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
448         CmiPushPE(i, CopyMsg(msg, size));
449     }
450 }
451
452
453 /* Functions regarding sending operations */
454 static void CmiSendSelf(char *msg) {
455 #if CMK_IMMEDIATE_MSG
456     if (CmiIsImmediate(msg)) {
457         /* CmiBecomeNonImmediate(msg); */
458         CmiPushImmediateMsg(msg);
459         CmiHandleImmediate();
460         return;
461     }
462 #endif
463     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
464 }
465
466 /* Functions regarding P2P send op */
467 #if USE_COMMON_SYNC_P2P
468 void CmiSyncSendFn(int destPE, int size, char *msg) {
469     char *dupmsg = CopyMsg(msg, size);
470     CmiFreeSendFn(destPE, size, dupmsg);
471 }
472
473 #if CMK_USE_PXSHM
474 #include "machine-pxshm.c"
475 #endif
476 #if CMK_USE_XPMEM
477 #include "machine-xpmem.c"
478 #endif
479
480 static int refcount = 0;
481
482 #if CMK_USE_OOB
483 CpvExtern(int, _urgentSend);
484 #endif
485
486 /* a wrapper of LrtsSendFunc */
487 #if CMK_C_INLINE
488 inline 
489 #endif
490 CmiCommHandle CmiSendNetworkFunc(int destNode, int size, char *msg, int mode)
491 {
492         int rank;
493 #if CMK_USE_PXSHM
494         if (CmiValidPxshm(destNode, size)) {
495           CmiSendMessagePxshm(msg, size, destNode, &refcount);
496           //for (int i=0; i<refcount; i++) CmiReference(msg);
497           return 0;
498         }
499 #endif
500 #if CMK_USE_XPMEM
501         if (CmiValidXpmem(destNode, size)) {
502           CmiSendMessageXpmem(msg, size, destNode, &refcount);
503           //for (int i=0; i<refcount; i++) CmiReference(msg);
504           return 0;
505         }
506 #endif
507 #if CMK_PERSISTENT_COMM
508         if (CpvAccess(phs)) {
509           if (size > PERSIST_MIN_SIZE) {
510             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
511             LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)], destNode, size, msg);
512             return 0;
513           }
514         }
515 #endif
516
517 #if CMK_WITH_STATS
518 if (MSG_STATISTIC)
519 {
520     int ret_log = _cmi_log2(size);
521     if(ret_log >21) ret_log = 21;
522     msg_histogram[ret_log]++;
523 }
524 #endif
525 #if CMK_USE_OOB
526     if (CpvAccess(_urgentSend)) mode |= OUT_OF_BAND;
527 #endif
528     return LrtsSendFunc(destNode, size, msg, mode);
529 }
530
531 void CmiFreeSendFn(int destPE, int size, char *msg) {
532     CMI_SET_BROADCAST_ROOT(msg, 0);
533     CQdCreate(CpvAccess(cQdState), 1);
534     if (CmiMyPe()==destPE) {
535         CmiSendSelf(msg);
536 #if CMK_PERSISTENT_COMM
537         if (CpvAccess(phs)) CpvAccess(curphs)++;
538 #endif
539     } 
540     else {
541         int destNode = CmiNodeOf(destPE);
542         int destRank = CmiRankOf(destPE);
543 #if CMK_SMP
544         if (CmiMyNode()==destNode) {
545             CmiPushPE(destRank, msg);
546 #if CMK_PERSISTENT_COMM
547             if (CpvAccess(phs)) CpvAccess(curphs)++;
548 #endif
549             return;
550         }
551 #endif
552         CMI_DEST_RANK(msg) = destRank;
553         CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
554 #if CMK_PERSISTENT_COMM
555         if (CpvAccess(phs)) CpvAccess(curphs)++;
556 #endif
557     }
558 }
559 #endif
560
561 #if USE_COMMON_ASYNC_P2P
562 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
563     int destNode = CmiNodeOf(destPE);
564     if (destNode == CmiMyNode()) {
565         CmiSyncSendFn(destPE,size,msg);
566         return 0;
567     } else {
568 #if CMK_WITH_STATS
569 if (  MSG_STATISTIC)
570 {
571     int ret_log = _cmi_log2(size);
572         if(ret_log >21) ret_log = 21;
573         msg_histogram[ret_log]++;
574 }
575 #endif
576         return CmiSendNetworkFunc(destPE, size, msg, P2P_ASYNC);
577     }
578 }
579 #endif
580
581 #if CMK_NODE_QUEUE_AVAILABLE
582 static void CmiSendNodeSelf(char *msg) {
583 #if CMK_IMMEDIATE_MSG
584     if (CmiIsImmediate(msg)) {
585         CmiPushImmediateMsg(msg);
586         if (!_immRunning) CmiHandleImmediate();
587         return;
588     }
589 #endif
590     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
591     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
592     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
593 }
594
595 #if USE_COMMON_ASYNC_P2P
596 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
597     char *dupmsg = CopyMsg(msg, size);
598     CmiFreeNodeSendFn(destNode, size, dupmsg);
599 }
600
601 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
602     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
603     CQdCreate(CpvAccess(cQdState), 1);
604     CMI_SET_BROADCAST_ROOT(msg, 0);
605     if (destNode == CmiMyNode()) {
606         CmiSendNodeSelf(msg);
607     } else {
608 #if CMK_WITH_STATS
609 if (  MSG_STATISTIC)
610 {
611     int ret_log = _cmi_log2(size);
612     if(ret_log >21) ret_log = 21;
613     msg_histogram[ret_log]++;
614 }
615 #endif
616         CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
617     }
618 #if CMK_PERSISTENT_COMM
619     if (CpvAccess(phs)) CpvAccess(curphs)++;
620 #endif
621 }
622 #endif
623
624 #if USE_COMMON_ASYNC_P2P
625 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
626     if (destNode == CmiMyNode()) {
627         CmiSyncNodeSendFn(destNode, size, msg);
628         return 0;
629     } else {
630 #if CMK_WITH_STATS
631 if (  MSG_STATISTIC)
632 {
633         int ret_log = _cmi_log2(size);
634         if(ret_log >21) ret_log = 21;
635         msg_histogram[ret_log]++;
636 }
637 #endif
638         return CmiSendNetworkFunc(destNode, size, msg, P2P_ASYNC);
639     }
640 }
641 #endif
642 #endif
643
644 /* ##### Beginning of Functions Related with Machine Startup ##### */
645 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
646     int _ii;
647     int tmp;
648 #if CMK_WITH_STATS
649     MSG_STATISTIC = CmiGetArgFlag(argv, "+msgstatistic");
650 #endif
651     /* processor per node */
652     _Cmi_mynodesize = 1;
653     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
654         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
655 #if ! CMK_SMP
656     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
657         CmiAbort("+ppn cannot be used in non SMP version!\n");
658 #endif
659
660     /* Network progress function is used to poll the network when for
661     messages. This flushes receive buffers on some  implementations*/
662     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
663     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
664
665     /* _Cmi_mynodesize has to be obtained before LrtsInit
666      * because it may be used inside LrtsInit
667      */
668     /* argv could be changed inside LrtsInit */
669     /* Inside this function, the number of nodes and my node id are obtained */
670 #if CMK_WITH_STATS
671 if (  MSG_STATISTIC)
672 {
673     for(_ii=0; _ii<22; _ii++)
674         msg_histogram[_ii] = 0;
675 }
676 #endif
677
678     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
679    
680         if (_Cmi_mynode==0) {
681 #if !CMK_SMP 
682                 printf("Charm++> Running on non-SMP mode\n");
683 #else
684                 printf("Charm++> Running on SMP mode, %d worker threads per process\n", _Cmi_mynodesize);
685                 if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
686                         printf("Charm++> The comm. thread both sends and receives messages\n");
687                 } else if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
688                         printf("Charm++> The comm. thread only receives messages, while work threads send messages\n");
689                 } else if (Cmi_smp_mode_setting == COMM_WORK_THREADS_SEND_RECV) {
690                         printf("Charm++> Both  comm. thread and worker thread send and messages\n");
691                 } else if (Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST) {
692                         printf("Charm++> There's no comm. thread. Work threads both send and receive messages\n");
693                 } else {
694                         CmiAbort("Charm++> Invalid SMP mode setting\n");
695                 }
696 #endif
697         }
698
699     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
700     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
701     Cmi_argvcopy = CmiCopyArgs(argv);
702     Cmi_argv = argv;
703     Cmi_startfn = fn;
704     Cmi_usrsched = usched;
705
706 #if CMK_USE_PXSHM
707     CmiInitPxshm(argv);
708 #endif
709 #if CMK_USE_XPMEM
710     CmiInitXpmem(argv);
711 #endif
712
713     /* CmiTimerInit(); */
714 #if CMK_BROADCAST_HYPERCUBE
715     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
716     tmp = CmiNumNodes()-1;
717     CmiNodesDim = 0;
718     while (tmp>0) {
719         CmiNodesDim++;
720         tmp = tmp >> 1;
721     }
722     if (CmiNumNodes()==1) CmiNodesDim=1;
723 #endif
724
725     CsvInitialize(CmiNodeState, NodeState);
726     CmiNodeStateInit(&CsvAccess(NodeState));
727 #if CMK_SMP
728     commThdExitLock = CmiCreateLock();
729 #endif
730
731 #if CMK_OFFLOAD_BCAST_PROCESS
732     /* the actual queues should be created on comm thread considering NUMA in SMP */
733     CsvInitialize(PCQueue, procBcastQ);
734 #if CMK_NODE_QUEUE_AVAILABLE
735     CsvInitialize(PCQueue, nodeBcastQ);
736 #endif
737 #endif
738
739 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
740     CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
741 #endif
742
743     CmiStartThreads(argv);
744
745     ConverseRunPE(initret);
746 }
747
748 extern void ConverseCommonInit(char **argv);
749 extern void CthInit(char **argv);
750
751 static void ConverseRunPE(int everReturn) {
752     CmiState cs;
753     char** CmiMyArgv;
754
755     LrtsPreCommonInit(everReturn);
756
757 #if CMK_OFFLOAD_BCAST_PROCESS
758     int createQueue = 1;
759 #if CMK_SMP
760 #if CMK_SMP_NO_COMMTHD
761     /* If there's no comm thread, then the queue is created on rank 0 */
762     if (CmiMyRank()) createQueue = 0;
763 #else
764     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
765 #endif
766 #endif
767
768     if (createQueue) {
769         CsvAccess(procBcastQ) = PCQueueCreate();
770 #if CMK_NODE_QUEUE_AVAILABLE
771         CsvAccess(nodeBcastQ) = PCQueueCreate();
772 #endif
773     }
774 #endif
775
776     CmiNodeAllBarrier();
777
778     cs = CmiGetState();
779     CpvInitialize(void *,CmiLocalQueue);
780     CpvAccess(CmiLocalQueue) = cs->localqueue;
781
782     if (CmiMyRank())
783         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
784     else
785         CmiMyArgv=Cmi_argv;
786
787     CthInit(CmiMyArgv);
788
789     /* initialize the network progress counter*/
790     /* Network progress function is used to poll the network when for
791        messages. This flushes receive buffers on some  implementations*/
792     CpvInitialize(unsigned , networkProgressCount);
793     CpvAccess(networkProgressCount) = 0;
794
795     ConverseCommonInit(CmiMyArgv);
796
797     LrtsPostCommonInit(everReturn);
798
799 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
800     CmiInitNotifyCommThdScheme();
801 #endif
802     /* Converse initialization finishes, immediate messages can be processed.
803        node barrier previously should take care of the node synchronization */
804     _immediateReady = 1;
805
806     if(CharmLibInterOperate) {
807         /* !!! Not considering SMP mode now */
808         /* TODO: make interoperability working in SMP!!! */
809         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
810         CsdScheduler(-1);
811     } else {
812       /* communication thread */
813       if (CmiMyRank() == CmiMyNodeSize()) {
814         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
815         while (1) CommunicationServerThread(5);
816       } else { /* worker thread */
817         if (!everReturn) {
818           Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
819           if (Cmi_usrsched==0) CsdScheduler(-1);
820           ConverseExit();
821         }
822       }
823     }
824 }
825 /* ##### End of Functions Related with Machine Startup ##### */
826
827 /* ##### Beginning of Functions Related with Machine Running ##### */
828 static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
829     int doProcessBcast = 1;
830
831 #if CMK_USE_PXSHM
832     CommunicationServerPxshm();
833 #endif
834 #if CMK_USE_XPMEM
835     CommunicationServerXpmem();
836 #endif
837
838     LrtsAdvanceCommunication(whenidle);
839
840 #if CMK_OFFLOAD_BCAST_PROCESS
841 #if CMK_SMP_NO_COMMTHD
842     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
843     if (CmiMyRank()) doProcessBcast = 0;
844 #endif
845     if (doProcessBcast) processBcastQs();
846 #endif
847
848 #if CMK_IMMEDIATE_MSG
849 #if !CMK_SMP
850     CmiHandleImmediate();
851 #endif
852 #if CMK_SMP && CMK_SMP_NO_COMMTHD
853     if (CmiMyRank()==0) CmiHandleImmediate();
854 #endif
855 #endif
856 }
857
858 extern void ConverseCommonExit();
859
860 static void CommunicationServer(int sleepTime) {
861 #if CMK_SMP
862     AdvanceCommunication(1);
863
864     if (commThdExit == CmiMyNodeSize()) {
865         MACHSTATE(2, "CommunicationServer exiting {");
866         LrtsDrainResources();
867         MACHSTATE(2, "} CommunicationServer EXIT");
868
869         ConverseCommonExit();
870
871 #if CMK_USE_PXSHM
872         CmiExitPxshm();
873 #endif
874 #if CMK_USE_XPMEM
875         CmiExitXpmem();
876 #endif
877         LrtsExit();
878     }
879 #endif
880 }
881
882 static void CommunicationServerThread(int sleepTime) {
883     CommunicationServer(sleepTime);
884 #if CMK_IMMEDIATE_MSG
885     CmiHandleImmediate();
886 #endif
887 }
888
889 void ConverseExit(void) {
890     int i;
891 #if !CMK_SMP
892     LrtsDrainResources();
893 #else
894         if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV
895            || Cmi_smp_mode_setting == COMM_THREAD_NOT_EXIST)
896                 LrtsDrainResources();
897 #endif
898
899     ConverseCommonExit();
900
901 #if CMK_WITH_STATS
902 if (MSG_STATISTIC)
903 {
904     for(i=0; i<22; i++)
905     {
906         CmiPrintf("[MSG PE:%d]", CmiMyPe());
907         if(msg_histogram[i] >0)
908             CmiPrintf("(%d:%d) ", 1<<i, msg_histogram[i]);
909     }
910     CmiPrintf("\n");
911 }
912 #endif
913
914 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
915     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
916 #endif
917
918 #if !CMK_SMP
919 #if CMK_USE_PXSHM
920     CmiExitPxshm();
921 #endif
922 #if CMK_USE_XPMEM
923     CmiExitXpmem();
924 #endif
925     LrtsExit();
926 #else
927     /* In SMP, the communication thread will exit */
928     /* atomic increment */
929     CmiLock(commThdExitLock);
930     commThdExit++;
931     CmiUnlock(commThdExitLock);
932     while (1) CmiYield();
933 #endif
934 }
935 /* ##### End of Functions Related with Machine Running ##### */
936
937 void CmiAbort(const char *message) {
938 #if CMK_USE_PXSHM
939     CmiExitPxshm();
940 #endif
941 #if CMK_USE_XPMEM
942     CmiExitXpmem();
943 #endif
944     LrtsAbort(message);
945 }
946
947 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
948 void *CmiGetNonLocal(void) {
949     CmiState cs = CmiGetState();
950     void *msg = NULL;
951
952 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
953     /**
954       * In SMP mode with comm thread, it's possible a normal
955       * msg is sent from an immediate msg which is executed
956       * on comm thread. In this case, the msg is sent to
957       * the network queue of the work thread. Therefore,
958       * even there's only one worker thread, the polling of
959       * network queue is still required.
960       */
961     if (CmiNumPes() == 1) return NULL;
962 #endif
963
964     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
965     CmiIdleLock_checkMessage(&cs->idle);
966     /* ?????although it seems that lock is not needed, I found it crashes very often
967        on mpi-smp without lock */
968     msg = PCQueuePop(cs->recv);
969 #if !CMK_SMP
970     if (!msg) {
971        AdvanceCommunication(0);
972        msg = PCQueuePop(cs->recv);
973     }
974 #else
975 //    LrtsPostNonLocal();
976 #endif
977
978     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
979
980     return msg;
981 }
982
983 #if CMK_NODE_QUEUE_AVAILABLE
984 void *CmiGetNonLocalNodeQ(void) {
985     CmiState cs = CmiGetState();
986     char *result = 0;
987     CmiIdleLock_checkMessage(&cs->idle);
988     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
989         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
990         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
991         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
992         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
993         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
994     }
995
996     return result;
997 }
998 #endif
999 /* ##### End of Functions Providing Incoming Network Messages ##### */
1000
1001 static CmiIdleState *CmiNotifyGetState(void) {
1002     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1003     s->sleepMs=0;
1004     s->nIdles=0;
1005     s->cs=CmiGetState();
1006     return s;
1007 }
1008
1009 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1010     s->sleepMs=0;
1011     s->nIdles=0;
1012 }
1013
1014 /*Number of times to spin before sleeping*/
1015 #define SPINS_BEFORE_SLEEP 20
1016 static void CmiNotifyStillIdle(CmiIdleState *s) {
1017     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1018 #if !CMK_SMP
1019     AdvanceCommunication(1);
1020 #else
1021     LrtsPostNonLocal();
1022
1023     if (_Cmi_noprocforcommthread) {
1024     s->nIdles++;
1025     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1026         s->sleepMs+=2;
1027         if (s->sleepMs>10) s->sleepMs=10;
1028     }
1029
1030     if (s->sleepMs>0) {
1031         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1032         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1033         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1034     }
1035     }
1036 #endif
1037
1038     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1039 }
1040
1041 /* usually called in non-smp mode */
1042 void CmiNotifyIdle(void) {
1043     AdvanceCommunication(1);
1044     CmiYield();
1045 }
1046
1047 /* Utiltiy functions */
1048 static char *CopyMsg(char *msg, int len) {
1049     char *copy = (char *)CmiAlloc(len);
1050 #if CMK_ERROR_CHECKING
1051     if (!copy) {
1052         CmiAbort("Error: out of memory in machine layer\n");
1053     }
1054 #endif
1055     memcpy(copy, msg, len);
1056     return copy;
1057 }
1058
1059
1060