fixed a bug in pumpMsgs.
[charm.git] / src / arch / util / machine-common.c
1 #if CMK_C_INLINE
2 #define INLINE_KEYWORD inline
3 #else
4 #define INLINE_KEYWORD
5 #endif
6
7 /* ###Beginning of Broadcast related definitions ### */
8 #ifndef CMK_BROADCAST_SPANNING_TREE
9 #define CMK_BROADCAST_SPANNING_TREE    1
10 #endif
11
12 #ifndef CMK_BROADCAST_HYPERCUBE
13 #define CMK_BROADCAST_HYPERCUBE        0
14 #endif
15
16 #define BROADCAST_SPANNING_FACTOR      4
17 /* The number of children used when a msg is broadcast inside a node */
18 #define BROADCAST_SPANNING_INTRA_FACTOR  8
19
20 /* Root of broadcast:
21  * non-bcast msg: root = 0;
22  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
23  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
24  */
25 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
26 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
27 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
28
29 /**
30  * For some machine layers such as on Active Message framework,
31  * the receiver callback is usally executed on an internal
32  * thread (i.e. not the flow managed by ours). Therefore, for
33  * forwarding broadcast messages, we could have a choice whether
34  * to offload such function to the flow we manage such as the
35  * communication thread. -Chao Mei
36  */
37
38 #ifndef CMK_OFFLOAD_BCAST_PROCESS
39 #define CMK_OFFLOAD_BCAST_PROCESS 0
40 #endif
41 #if CMK_OFFLOAD_BCAST_PROCESS
42 CsvDeclare(PCQueue, procBcastQ);
43 #if CMK_NODE_QUEUE_AVAILABLE
44 CsvDeclare(PCQueue, nodeBcastQ);
45 #endif
46 #endif
47
48 #if CMK_BROADCAST_HYPERCUBE
49 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
50 static int CmiNodesDim;
51 #endif
52 /* ###End of Broadcast related definitions ### */
53
54
55 #ifndef CMK_HAS_SIZE_IN_MSGHDR
56 #define CMK_HAS_SIZE_IN_MSGHDR 1
57 #endif
58 #if CMK_HAS_SIZE_IN_MSGHDR
59 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
60 #else
61 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
62 #endif
63
64 #if CMK_NODE_QUEUE_AVAILABLE
65 /* This value should be larger than the number of cores used
66  * per charm smp node. So it's currently set to such a large
67  * value.
68  */
69 #define DGRAM_NODEMESSAGE   (0x1FFB)
70 #endif
71
72 /* Node state structure */
73 int               _Cmi_mynode;    /* Which address space am I */
74 int               _Cmi_mynodesize;/* Number of processors in my address space */
75 int               _Cmi_numnodes;  /* Total number of address spaces */
76 int               _Cmi_numpes;    /* Total number of processors */
77
78 CpvDeclare(void*, CmiLocalQueue);
79
80 /* different modes for sending a message */
81 #define P2P_SYNC 0x1
82 #define P2P_ASYNC 0x2
83 #define BCAST_SYNC 0x3
84 #define BCAST_ASYNC 0x4
85
86 #if CMK_SMP
87 static volatile int commThdExit = 0;
88 static CmiNodeLock  commThdExitLock = 0;
89
90 /**
91  *  The macro defines whether to have a comm thd to offload some
92  *  work such as forwarding bcast messages etc. This macro
93  *  should be defined before including "machine-smp.c". Note
94  *  that whether a machine layer in SMP mode could run w/o comm
95  *  thread depends on the support of the underlying
96  *  communication library.
97  *
98  *  --Chao Mei
99  */
100 #ifndef CMK_SMP_NO_COMMTHD
101 #define CMK_SMP_NO_COMMTHD 0
102 #endif
103
104 #if CMK_SMP_NO_COMMTHD
105 int Cmi_commthread = 0;
106 #else
107 int Cmi_commthread = 1;
108 #endif
109
110 #endif
111
112 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
113 static int Cmi_nodestart;
114
115 /*
116  * Network progress utility variables. Period controls the rate at
117  * which the network poll is called
118  */
119 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
120 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
121 #endif
122
123 CpvDeclare(unsigned , networkProgressCount);
124 int networkProgressPeriod;
125
126
127 /* ===== Beginning of Common Function Declarations ===== */
128 void CmiAbort(const char *message);
129 static void PerrorExit(const char *msg);
130
131 /* This function handles the msg received as which queue to push into */
132 static void handleOneRecvedMsg(int size, char *msg);
133
134 static void handleOneBcastMsg(int size, char *msg);
135 static void processBcastQs();
136
137 /* Utility functions for forwarding broadcast messages,
138  * should not be used in machine-specific implementations
139  * except in some special occasions.
140  */
141 static void processProcBcastMsg(int size, char *msg);
142 static void processNodeBcastMsg(int size, char *msg);
143 static void SendSpanningChildrenProc(int size, char *msg);
144 static void SendHyperCubeProc(int size, char *msg);
145 #if CMK_NODE_QUEUE_AVAILABLE
146 static void SendSpanningChildrenNode(int size, char *msg);
147 static void SendHyperCubeNode(int size, char *msg);
148 #endif
149
150 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
151 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
152 /* send to other ranks except me on the same node*/
153 static void SendToPeers(int size, char *msg);
154
155
156 void CmiPushPE(int rank, void *msg);
157
158 #if CMK_NODE_QUEUE_AVAILABLE
159 void CmiPushNode(void *msg);
160 #endif
161
162 /* Functions regarding send ops declared in converse.h */
163
164 /* In default, using the common codes for msg sending */
165 #ifndef USE_COMMON_SYNC_P2P
166 #define USE_COMMON_SYNC_P2P 1
167 #endif
168 #ifndef USE_COMMON_ASYNC_P2P
169 #define USE_COMMON_ASYNC_P2P 1
170 #endif
171 #ifndef USE_COMMON_SYNC_BCAST
172 #define USE_COMMON_SYNC_BCAST 1
173 #endif
174 #ifndef USE_COMMON_ASYNC_BCAST
175 #define USE_COMMON_ASYNC_BCAST 1
176 #endif
177
178 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
179 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
180 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
181 #endif
182 #endif
183
184 static void CmiSendSelf(char *msg);
185
186 void CmiSyncSendFn(int destPE, int size, char *msg);
187 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
188 void CmiFreeSendFn(int destPE, int size, char *msg);
189
190 void CmiSyncBroadcastFn(int size, char *msg);
191 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
192 void CmiFreeBroadcastFn(int size, char *msg);
193
194 void CmiSyncBroadcastAllFn(int size, char *msg);
195 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
196 void CmiFreeBroadcastAllFn(int size, char *msg);
197
198 #if CMK_NODE_QUEUE_AVAILABLE
199 static void CmiSendNodeSelf(char *msg);
200
201 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
202 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
203 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
204
205 void CmiSyncNodeBroadcastFn(int size, char *msg);
206 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg);
207 void CmiFreeNodeBroadcastFn(int size, char *msg);
208
209 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
210 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
211 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
212 #endif
213
214 /* Functions and variables regarding machine startup */
215 static char     **Cmi_argv;
216 static char     **Cmi_argvcopy;
217 static CmiStartFn Cmi_startfn;   /* The start function */
218 static int        Cmi_usrsched;  /* Continue after start function finishes? */
219 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
220 static void ConverseRunPE(int everReturn);
221
222 /* Functions regarding machine running on every proc */
223 static void AdvanceCommunication();
224 static void CommunicationServer(int sleepTime);
225 static void CommunicationServerThread(int sleepTime);
226 void ConverseExit(void);
227
228 /* Functions providing incoming network messages */
229 void *CmiGetNonLocal(void);
230 #if CMK_NODE_QUEUE_AVAILABLE
231 void *CmiGetNonLocalNodeQ(void);
232 #endif
233 /* Utiltiy functions */
234 static char *CopyMsg(char *msg, int len);
235
236 /* ===== End of Common Function Declarations ===== */
237
238 #include "machine-smp.c"
239
240 /* ===== Beginning of Idle-state Related Declarations =====  */
241 typedef struct {
242     int sleepMs; /*Milliseconds to sleep while idle*/
243     int nIdles; /*Number of times we've been idle in a row*/
244     CmiState cs; /*Machine state*/
245 } CmiIdleState;
246
247 static CmiIdleState *CmiNotifyGetState(void);
248
249 /**
250  *  Generally,
251  *
252  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
253  *  When the proc is idle, AdvanceCommunication needs to be
254  *  called.
255  *
256  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
257  *
258  *  Different layers have choices of registering different callbacks for
259  *  idle state.
260  */
261 static void CmiNotifyBeginIdle(CmiIdleState *s);
262 static void CmiNotifyStillIdle(CmiIdleState *s);
263 void CmiNotifyIdle(void);
264 /* ===== End of Idle-state Related Declarations =====  */
265
266 /* ===== Beginning of Processor/Node State-related Stuff =====*/
267 #if !CMK_SMP
268 /************ non SMP **************/
269 static struct CmiStateStruct Cmi_state;
270 int _Cmi_mype;
271 int _Cmi_myrank;
272
273 void CmiMemLock() {}
274 void CmiMemUnlock() {}
275
276 #define CmiGetState() (&Cmi_state)
277 #define CmiGetStateN(n) (&Cmi_state)
278
279 void CmiYield(void) {
280     sleep(0);
281 }
282
283 static void CmiStartThreads(char **argv) {
284     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
285     _Cmi_mype = Cmi_nodestart;
286     _Cmi_myrank = 0;
287 }
288 #else
289 /************** SMP *******************/
290 INLINE_KEYWORD int CmiMyPe(void) {
291     return CmiGetState()->pe;
292 }
293 INLINE_KEYWORD int CmiMyRank(void) {
294     return CmiGetState()->rank;
295 }
296 INLINE_KEYWORD int CmiNodeFirst(int node) {
297     return node*_Cmi_mynodesize;
298 }
299 INLINE_KEYWORD int CmiNodeSize(int node) {
300     return _Cmi_mynodesize;
301 }
302 INLINE_KEYWORD int CmiNodeOf(int pe) {
303     return (pe/_Cmi_mynodesize);
304 }
305 INLINE_KEYWORD int CmiRankOf(int pe) {
306     return pe%_Cmi_mynodesize;
307 }
308 #endif
309 CsvDeclare(CmiNodeState, NodeState);
310 /* ===== End of Processor/Node State-related Stuff =====*/
311
312 #include "immediate.c"
313
314 /* ===== Beginning of Common Function Definitions ===== */
315 static void PerrorExit(const char *msg) {
316     perror(msg);
317     exit(1);
318 }
319
320 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
321 /*Add a message to this processor's receive queue, pe is a rank */
322 void CmiPushPE(int rank,void *msg) {
323     CmiState cs = CmiGetStateN(rank);
324     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
325 #if CMK_IMMEDIATE_MSG
326     if (CmiIsImmediate(msg)) {
327         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
328         CMI_DEST_RANK(msg) = rank;
329         CmiPushImmediateMsg(msg);
330         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
331         return;
332     }
333 #endif
334
335     PCQueuePush(cs->recv,msg);
336     CmiIdleLock_addMessage(&cs->idle);
337     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
338 }
339
340 #if CMK_NODE_QUEUE_AVAILABLE
341 /*Add a message to this processor's receive queue */
342 void CmiPushNode(void *msg) {
343     MACHSTATE(3,"Pushing message into NodeRecv queue");
344 #if CMK_IMMEDIATE_MSG
345     if (CmiIsImmediate(msg)) {
346         CMI_DEST_RANK(msg) = 0;
347         CmiPushImmediateMsg(msg);
348         return;
349     }
350 #endif
351     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
352     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
353     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
354     {
355         CmiState cs=CmiGetStateN(0);
356         CmiIdleLock_addMessage(&cs->idle);
357     }
358 }
359 #endif
360
361 /* This function handles the msg received as which queue to push into */
362 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
363     int isBcastMsg = 0;
364 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
365     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
366 #endif
367
368     if (isBcastMsg) {
369         handleOneBcastMsg(size, msg);
370         return;
371     }
372
373 #if CMK_NODE_QUEUE_AVAILABLE
374     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
375         CmiPushNode(msg);
376     else
377 #endif
378         CmiPushPE(CMI_DEST_RANK(msg), msg);
379
380 }
381
382
383 /* ##### Beginning of Broadcast-related functions' defitions ##### */
384 static void handleOneBcastMsg(int size, char *msg) {
385     CmiAssert(CMI_BROADCAST_ROOT(msg)!=0);
386 #if CMK_OFFLOAD_BCAST_PROCESS
387     if (CMI_BROADCAST_ROOT(msg)>0) {
388         PCQueuePush(CsvAccess(procBcastQ), msg);
389     } else {
390 #if CMK_NODE_QUEUE_AVAILABLE
391         PCQueuePush(CsvAccess(nodeBcastQ), msg);
392 #endif
393     }
394 #else
395     if (CMI_BROADCAST_ROOT(msg)>0) {
396         processProcBcastMsg(size, msg);
397     } else {
398 #if CMK_NODE_QUEUE_AVAILABLE
399         processNodeBcastMsg(size, msg);
400 #endif
401     }
402 #endif
403 }
404
405 static void processBcastQs() {
406 #if CMK_OFFLOAD_BCAST_PROCESS
407     char *msg;
408     do {
409         msg = PCQueuePop(CsvAccess(procBcastQ));
410         if (!msg) break;
411         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p begin{", CmiMyNode(), msg);
412         processProcBcastMsg(CMI_MSG_SIZE(msg), msg);
413         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p end}", CmiMyNode(), msg);
414     } while (1);
415 #if CMK_NODE_QUEUE_AVAILABLE
416     do {
417         msg = PCQueuePop(CsvAccess(nodeBcastQ));
418         if (!msg) break;
419         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p begin{", CmiMyNode(), msg);
420         processNodeBcastMsg(CMI_MSG_SIZE(msg), msg);
421         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p end}", CmiMyNode(), msg);
422     } while (1);
423 #endif
424 #endif
425 }
426
427 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg) {
428 #if CMK_BROADCAST_SPANNING_TREE
429     SendSpanningChildrenProc(size, msg);
430 #elif CMK_BROADCAST_HYPERCUBE
431     SendHyperCubeProc(size, msg);
432 #endif
433
434     /* Since this function is only called on intermediate nodes,
435      * the rank of this msg should be 0.
436      */
437     CmiAssert(CMI_DEST_RANK(msg)==0);
438     /*CmiPushPE(CMI_DEST_RANK(msg), msg);*/
439     CmiPushPE(0, msg);
440 }
441
442 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
443 #if CMK_BROADCAST_SPANNING_TREE
444     SendSpanningChildrenNode(size, msg);
445 #elif CMK_BROADCAST_HYPERCUBE
446     SendHyperCubeNode(size, msg);
447 #endif
448
449     /* In SMP mode, this push operation needs to be executed
450      * after forwarding broadcast messages. If it is executed
451      * earlier, then during the bcast msg forwarding period,
452      * the msg could be already freed on the worker thread.
453      * As a result, the forwarded message could be wrong!
454      * --Chao Mei
455      */
456     CmiPushNode(msg);
457 }
458
459 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode) {
460 #if CMK_BROADCAST_SPANNING_TREE
461     int i, oldRank;
462     char *newmsg;
463
464     oldRank = CMI_DEST_RANK(msg);
465     /* doing this is to avoid the multiple assignment in the following for loop */
466     CMI_DEST_RANK(msg) = rankToAssign;
467     /* first send msgs to other nodes */
468     CmiAssert(startNode >=0 &&  startNode<CmiNumNodes());
469     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
470         int nd = CmiMyNode()-startNode;
471         if (nd<0) nd+=CmiNumNodes();
472         nd = BROADCAST_SPANNING_FACTOR*nd + i;
473         if (nd > CmiNumNodes() - 1) break;
474         nd += startNode;
475         nd = nd%CmiNumNodes();
476         CmiAssert(nd>=0 && nd!=CmiMyNode());
477 #if CMK_BROADCAST_USE_CMIREFERENCE
478         CmiReference(msg);
479         LrtsSendFunc(nd, size, msg, P2P_SYNC);
480 #else
481         newmsg = CopyMsg(msg, size);
482         LrtsSendFunc(nd, size, newmsg, P2P_SYNC);
483 #endif
484     }
485     CMI_DEST_RANK(msg) = oldRank;
486 #endif
487 }
488 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode) {
489 #if CMK_BROADCAST_HYPERCUBE
490     int i, cnt, tmp, relDist, oldRank;
491     const int dims=CmiNodesDim;
492
493     oldRank = CMI_DEST_RANK(msg);
494     /* doing this is to avoid the multiple assignment in the following for loop */
495     CMI_DEST_RANK(msg) = rankToAssign;
496
497     /* first send msgs to other nodes */
498     relDist = CmiMyNode()-startNode;
499     if (relDist < 0) relDist += CmiNumNodes();
500
501     /* Sending scheme example: say we have 9 nodes, and the msg is sent from 0
502      * The overall sending steps will be as follows:
503      * 0-->8, 0-->4, 0-->2, 0-->1
504      *               4-->6, 4-->5
505      *                      2-->3
506      *                      6-->7
507      * So for node id as N=A+2^B, it will forward the broadcast (B-1) msg to in
508      * the order as: N+2^(B-1), N+2^(B-2),..., N+1 except node 0, where B is
509      * the first position of bit 1 in the binary format of the number of N
510      * counting from the right with count starting from 0.
511      * On node 0, the value "B" should be CmiNodesDim
512      */
513     /* Calculate 2^B */
514     if(relDist==0) cnt = 1<<dims;
515     else cnt = relDist & ((~relDist)+1);
516     /*CmiPrintf("ND[%d]: send bcast msg with cnt=%d\n", CmiMyNode(), cnt);*/
517     /* Begin to send msgs */
518     for(cnt>>=1; cnt>0; cnt>>=1){
519         int nd = relDist + cnt;
520         if (nd >= CmiNumNodes()) continue;
521         nd = (nd+startNode)%CmiNumNodes();
522         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
523         CmiAssert(nd>=0 && nd!=CmiMyNode());
524 #if CMK_BROADCAST_USE_CMIREFERENCE
525         CmiReference(msg);
526         LrtsSendFunc(nd, size, msg, P2P_SYNC);
527 #else
528         char *newmsg = CopyMsg(msg, size);
529         LrtsSendFunc(nd, size, newmsg, P2P_SYNC);
530 #endif
531     }
532     CMI_DEST_RANK(msg) = oldRank;
533 #endif
534 }
535
536 static void SendSpanningChildrenProc(int size, char *msg) {
537     int startpe = CMI_BROADCAST_ROOT(msg)-1;
538     int startnode = CmiNodeOf(startpe);
539 #if CMK_SMP
540     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
541 #endif
542     SendSpanningChildren(size, msg, 0, startnode);
543 #if CMK_SMP
544     /* second send msgs to my peers on this node */
545     SendToPeers(size, msg);
546 #endif
547 }
548
549 /* send msg along the hypercube in broadcast. (Sameer) */
550 static void SendHyperCubeProc(int size, char *msg) {
551     int startpe = CMI_BROADCAST_ROOT(msg)-1;
552     int startnode = CmiNodeOf(startpe);
553 #if CMK_SMP
554     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
555 #endif
556     SendHyperCube(size, msg, 0, startnode);
557 #if CMK_SMP
558     /* second send msgs to my peers on this node */
559     SendToPeers(size, msg);
560 #endif
561 }
562
563 static void SendToPeers(int size, char *msg) {
564     /* FIXME: now it's just a flat p2p send!! When node size is large,
565     * it should also be sent in a tree
566     */
567     int exceptRank = CMI_DEST_RANK(msg);
568     int i;
569     for (i=0; i<exceptRank; i++) {
570         CmiPushPE(i, CopyMsg(msg, size));
571     }
572     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
573         CmiPushPE(i, CopyMsg(msg, size));
574     }
575 }
576
577 #if CMK_NODE_QUEUE_AVAILABLE
578 static void SendSpanningChildrenNode(int size, char *msg) {
579     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
580     SendSpanningChildren(size, msg, DGRAM_NODEMESSAGE, startnode);
581 }
582 static void SendHyperCubeNode(int size, char *msg) {
583     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
584     SendHyperCube(size, msg, DGRAM_NODEMESSAGE, startnode);
585 }
586 #endif
587 /*##### End of Broadcast-related functions' defitions #####*/
588
589 /* Functions regarding sending operations */
590 static void CmiSendSelf(char *msg) {
591 #if CMK_IMMEDIATE_MSG
592     if (CmiIsImmediate(msg)) {
593         /* CmiBecomeNonImmediate(msg); */
594         CmiPushImmediateMsg(msg);
595         CmiHandleImmediate();
596         return;
597     }
598 #endif
599     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
600 }
601
602 /* Functions regarding P2P send op */
603 #if USE_COMMON_SYNC_P2P
604 void CmiSyncSendFn(int destPE, int size, char *msg) {
605     char *dupmsg = CopyMsg(msg, size);
606     CmiFreeSendFn(destPE, size, dupmsg);
607 }
608
609 void CmiFreeSendFn(int destPE, int size, char *msg) {
610     CMI_SET_BROADCAST_ROOT(msg, 0);
611     CQdCreate(CpvAccess(cQdState), 1);
612     if (CmiMyPe()==destPE) {
613         CmiSendSelf(msg);
614     } else {
615         int destNode = CmiNodeOf(destPE);
616 #if CMK_SMP
617         if (CmiMyNode()==destNode) {
618             CmiPushPE(CmiRankOf(destPE), msg);
619             return;
620         }
621 #endif
622         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
623         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
624     }
625 }
626 #endif
627
628 #if USE_COMMON_ASYNC_P2P
629 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
630     int destNode = CmiNodeOf(destPE);
631     if (destNode == CmiMyNode()) {
632         CmiSyncSendFn(destPE,size,msg);
633         return 0;
634     } else {
635         return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
636     }
637 }
638 #endif
639
640 #if USE_COMMON_SYNC_BCAST
641 /* Functions regarding broadcat op that sends to every one else except me */
642 void CmiSyncBroadcastFn(int size, char *msg) {
643     int mype = CmiMyPe();
644     int i;
645
646     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
647 #if CMK_SMP
648     /*record the rank to avoid re-sending the msg in  spanning tree or hypercube*/
649     CMI_DEST_RANK(msg) = CmiMyRank();
650 #endif
651
652 #if CMK_BROADCAST_SPANNING_TREE
653     CMI_SET_BROADCAST_ROOT(msg, mype+1);
654     SendSpanningChildrenProc(size, msg);
655 #elif CMK_BROADCAST_HYPERCUBE
656     CMI_SET_BROADCAST_ROOT(msg, mype+1);
657     SendHyperCubeProc(size, msg);
658 #else
659     for ( i=mype+1; i<_Cmi_numpes; i++ )
660         CmiSyncSendFn(i, size, msg) ;
661     for ( i=0; i<mype; i++ )
662         CmiSyncSendFn(i, size, msg) ;
663 #endif
664
665     /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
666 }
667
668 void CmiFreeBroadcastFn(int size, char *msg) {
669     CmiSyncBroadcastFn(size,msg);
670     CmiFree(msg);
671 }
672 #endif
673
674 #if USE_COMMON_ASYNC_BCAST
675 /* FIXME: should use spanning or hypercube, but luckily async is never used */
676 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
677     /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
678     CmiAbort("CmiAsyncBroadcastFn should never be called");
679     return 0;
680 }
681 #endif
682
683 /* Functions regarding broadcat op that sends to every one */
684 void CmiSyncBroadcastAllFn(int size, char *msg) {
685     CmiSyncSendFn(CmiMyPe(), size, msg) ;
686     CmiSyncBroadcastFn(size, msg);
687 }
688
689 void CmiFreeBroadcastAllFn(int size, char *msg) {
690     CmiSendSelf(msg);
691     CmiSyncBroadcastFn(size, msg);
692 }
693
694 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
695     CmiSendSelf(CopyMsg(msg, size));
696     return CmiAsyncBroadcastFn(size, msg);
697 }
698
699 #if CMK_NODE_QUEUE_AVAILABLE
700 static void CmiSendNodeSelf(char *msg) {
701 #if CMK_IMMEDIATE_MSG
702     if (CmiIsImmediate(msg)) {
703         CmiPushImmediateMsg(msg);
704         if (!_immRunning) CmiHandleImmediate();
705         return;
706     }
707 #endif
708     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
709     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
710     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
711 }
712
713 #if USE_COMMON_ASYNC_P2P
714 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
715     char *dupmsg = CopyMsg(msg, size);
716     CmiFreeNodeSendFn(destNode, size, dupmsg);
717 }
718
719 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
720     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
721     CQdCreate(CpvAccess(cQdState), 1);
722     CMI_SET_BROADCAST_ROOT(msg, 0);
723     if (destNode == CmiMyNode()) {
724         CmiSendNodeSelf(msg);
725     } else {
726         LrtsSendFunc(destNode, size, msg, P2P_SYNC);
727     }
728 }
729 #endif
730
731 #if USE_COMMON_ASYNC_P2P
732 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
733     if (destNode == CmiMyNode()) {
734         CmiSyncNodeSendFn(destNode, size, msg);
735         return 0;
736     } else {
737         return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
738     }
739 }
740 #endif
741
742 #if USE_COMMON_SYNC_BCAST
743 void CmiSyncNodeBroadcastFn(int size, char *msg) {
744     int mynode = CmiMyNode();
745     int i;
746     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
747 #if CMK_BROADCAST_SPANNING_TREE
748     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
749     SendSpanningChildrenNode(size, msg);
750 #elif CMK_BROADCAST_HYPERCUBE
751     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
752     SendHyperCubeNode(size, msg);
753 #else
754     for (i=mynode+1; i<CmiNumNodes(); i++)
755         CmiSyncNodeSendFn(i, size, msg);
756     for (i=0; i<mynode; i++)
757         CmiSyncNodeSendFn(i, size, msg);
758 #endif
759 }
760
761 void CmiFreeNodeBroadcastFn(int size, char *msg) {
762     CmiSyncNodeBroadcastFn(size, msg);
763     CmiFree(msg);
764 }
765 #endif
766
767 #if USE_COMMON_ASYNC_BCAST
768 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
769     CmiSyncNodeBroadcastFn(size, msg);
770     return 0;
771 }
772 #endif
773
774 void CmiSyncNodeBroadcastAllFn(int size, char *msg) {
775     CmiSyncNodeSendFn(CmiMyNode(), size, msg);
776     CmiSyncNodeBroadcastFn(size, msg);
777 }
778
779 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg) {
780     CmiSendNodeSelf(CopyMsg(msg, size));
781     return CmiAsyncNodeBroadcastFn(size, msg);
782 }
783
784 void CmiFreeNodeBroadcastAllFn(int size, char *msg) {
785     CmiSyncNodeBroadcastFn(size, msg);
786     /* Since it's a node-level msg, the msg could be executed on any other
787      * procs on the same node. This means, the push of this msg to the
788      * node-level queue could be immediately followed a pop of this msg on
789      * other cores on the same node even when this msg has not been sent to
790      * other nodes. This is the reason CmiSendNodeSelf must be called after
791      * CmiSyncNodeBroadcastFn -Chao Mei
792      */
793     CmiSendNodeSelf(msg);
794 }
795 #endif
796 /* ##### End of Functions Related with Message Sending OPs ##### */
797
798 /* ##### Beginning of Functions Related with Machine Startup ##### */
799 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
800     int tmp;
801     /* processor per node */
802     _Cmi_mynodesize = 1;
803     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
804         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
805 #if ! CMK_SMP
806     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
807         CmiAbort("+ppn cannot be used in non SMP version!\n");
808 #endif
809
810     /* Network progress function is used to poll the network when for
811     messages. This flushes receive buffers on some  implementations*/
812     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
813     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
814
815     /* _Cmi_mynodesize has to be obtained before LrtsInit
816      * because it may be used inside LrtsInit
817      */
818     /* argv could be changed inside LrtsInit */
819     /* Inside this function, the number of nodes and my node id are obtained */
820     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
821
822     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
823     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
824     Cmi_argvcopy = CmiCopyArgs(argv);
825     Cmi_argv = argv;
826     Cmi_startfn = fn;
827     Cmi_usrsched = usched;
828
829     /* CmiTimerInit(); */
830 #if CMK_BROADCAST_HYPERCUBE
831     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
832     tmp = CmiNumNodes()-1;
833     CmiNodesDim = 0;
834     while (tmp>0) {
835         CmiNodesDim++;
836         tmp = tmp >> 1;
837     }
838     if (CmiNumNodes()==1) CmiNodesDim=1;
839 #endif
840
841     CsvInitialize(CmiNodeState, NodeState);
842     CmiNodeStateInit(&CsvAccess(NodeState));
843 #if CMK_SMP
844     commThdExitLock = CmiCreateLock();
845 #endif
846
847 #if CMK_OFFLOAD_BCAST_PROCESS
848     /* the actual queues should be created on comm thread considering NUMA in SMP */
849     CsvInitialize(PCQueue, procBcastQ);
850 #if CMK_NODE_QUEUE_AVAILABLE
851     CsvInitialize(PCQueue, nodeBcastQ);
852 #endif
853 #endif
854
855     CmiStartThreads(argv);
856     ConverseRunPE(initret);
857 }
858
859 static void ConverseRunPE(int everReturn) {
860     CmiState cs;
861     char** CmiMyArgv;
862
863     LrtsPreCommonInit(everReturn);
864
865 #if CMK_OFFLOAD_BCAST_PROCESS
866     int createQueue = 1;
867 #if CMK_SMP
868 #if CMK_SMP_NO_COMMTHD
869     /* If there's no comm thread, then the queue is created on rank 0 */
870     if (CmiMyRank()) createQueue = 0;
871 #else
872     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
873 #endif
874 #endif
875
876     if (createQueue) {
877         CsvAccess(procBcastQ) = PCQueueCreate();
878 #if CMK_NODE_QUEUE_AVAILABLE
879         CsvAccess(nodeBcastQ) = PCQueueCreate();
880 #endif
881     }
882 #endif
883
884     CmiNodeAllBarrier();
885
886     cs = CmiGetState();
887     CpvInitialize(void *,CmiLocalQueue);
888     CpvAccess(CmiLocalQueue) = cs->localqueue;
889
890     if (CmiMyRank())
891         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
892     else
893         CmiMyArgv=Cmi_argv;
894
895     CthInit(CmiMyArgv);
896
897     /* initialize the network progress counter*/
898     /* Network progress function is used to poll the network when for
899        messages. This flushes receive buffers on some  implementations*/
900     CpvInitialize(unsigned , networkProgressCount);
901     CpvAccess(networkProgressCount) = 0;
902
903     ConverseCommonInit(CmiMyArgv);
904
905     LrtsPostCommonInit(everReturn);
906
907     /* Converse initialization finishes, immediate messages can be processed.
908        node barrier previously should take care of the node synchronization */
909     _immediateReady = 1;
910
911     /* communication thread */
912     if (CmiMyRank() == CmiMyNodeSize()) {
913         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
914         while (1) CommunicationServerThread(5);
915     } else { /* worker thread */
916         if (!everReturn) {
917             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
918             if (Cmi_usrsched==0) CsdScheduler(-1);
919             ConverseExit();
920         }
921     }
922 }
923 /* ##### End of Functions Related with Machine Startup ##### */
924
925 /* ##### Beginning of Functions Related with Machine Running ##### */
926 static INLINE_KEYWORD void AdvanceCommunication() {
927     int doProcessBcast = 1;
928
929     LrtsAdvanceCommunication();
930
931 #if CMK_OFFLOAD_BCAST_PROCESS
932 #if CMK_SMP_NO_COMMTHD
933     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
934     if (CmiMyRank()) doProcessBcast = 0;
935 #endif
936     if (doProcessBcast) processBcastQs();
937 #endif
938
939 #if CMK_IMMEDIATE_MSG
940 #if !CMK_SMP
941     CmiHandleImmediate();
942 #endif
943 #if CMK_SMP && CMK_SMP_NO_COMMTHD
944     if (CmiMyRank()==0) CmiHandleImmediate();
945 #endif
946 #endif
947
948 }
949
950 static void CommunicationServer(int sleepTime) {
951 #if CMK_SMP
952     AdvanceCommunication();
953
954     if (commThdExit == CmiMyNodeSize()) {
955         MACHSTATE(2, "CommunicationServer exiting {");
956         LrtsDrainResources();
957         MACHSTATE(2, "} CommunicationServer EXIT");
958
959         ConverseCommonExit();
960
961         LrtsExit();
962     }
963 #endif
964 }
965
966 static void CommunicationServerThread(int sleepTime) {
967     CommunicationServer(sleepTime);
968 #if CMK_IMMEDIATE_MSG
969     CmiHandleImmediate();
970 #endif
971 }
972
973 void ConverseExit(void) {
974 #if !CMK_SMP
975     LrtsDrainResources();
976 #endif
977
978     ConverseCommonExit();
979
980 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
981     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
982 #endif
983
984 #if !CMK_SMP
985     LrtsExit();
986 #else
987     /* In SMP, the communication thread will exit */
988     /* atomic increment */
989     CmiLock(commThdExitLock);
990     commThdExit++;
991     CmiUnlock(commThdExitLock);
992     while (1) CmiYield();
993 #endif
994 }
995 /* ##### End of Functions Related with Machine Running ##### */
996
997
998 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
999 void *CmiGetNonLocal(void) {
1000     CmiState cs = CmiGetState();
1001     void *msg = NULL;
1002
1003 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
1004     /**
1005       * In SMP mode with comm thread, it's possible a normal
1006       * msg is sent from an immediate msg which is executed
1007       * on comm thread. In this case, the msg is sent to
1008       * the network queue of the work thread. Therefore,
1009       * even there's only one worker thread, the polling of
1010       * network queue is still required. -Chao Mei
1011       */
1012     if (CmiNumPes() == 1) return NULL;
1013 #endif
1014
1015     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
1016     CmiIdleLock_checkMessage(&cs->idle);
1017     /* ?????although it seems that lock is not needed, I found it crashes very often
1018        on mpi-smp without lock */
1019 #if !CMK_SMP
1020     AdvanceCommunication();
1021 #endif
1022
1023     msg = PCQueuePop(cs->recv);
1024
1025 #if !CMK_SMP
1026     LrtsPostNonLocal();
1027 #endif
1028
1029     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
1030
1031     return msg;
1032 }
1033 #if CMK_NODE_QUEUE_AVAILABLE
1034 void *CmiGetNonLocalNodeQ(void) {
1035     CmiState cs = CmiGetState();
1036     char *result = 0;
1037     CmiIdleLock_checkMessage(&cs->idle);
1038     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1039         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1040         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1041         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1042         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1043         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1044     }
1045
1046     return result;
1047 }
1048 #endif
1049 /* ##### End of Functions Providing Incoming Network Messages ##### */
1050
1051 /* ##### Beginning of Functions Related with Idle-state ##### */
1052 static CmiIdleState *CmiNotifyGetState(void) {
1053     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1054     s->sleepMs=0;
1055     s->nIdles=0;
1056     s->cs=CmiGetState();
1057     return s;
1058 }
1059
1060 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1061     s->sleepMs=0;
1062     s->nIdles=0;
1063 }
1064
1065 /*Number of times to spin before sleeping*/
1066 #define SPINS_BEFORE_SLEEP 20
1067 static void CmiNotifyStillIdle(CmiIdleState *s) {
1068     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1069     s->nIdles++;
1070     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1071         s->sleepMs+=2;
1072         if (s->sleepMs>10) s->sleepMs=10;
1073     }
1074
1075     if (s->sleepMs>0) {
1076         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1077         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1078         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1079     }
1080
1081 #if !CMK_SMP
1082     AdvanceCommunication();
1083 #endif
1084
1085     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1086 }
1087
1088 /* usually called in non-smp mode */
1089 void CmiNotifyIdle(void) {
1090     AdvanceCommunication();
1091     CmiYield();
1092 }
1093 /* ##### End of Functions Related with Idle-state ##### */
1094
1095 /* Utiltiy functions */
1096 static char *CopyMsg(char *msg, int len) {
1097     char *copy = (char *)CmiAlloc(len);
1098 #if CMK_ERROR_CHECKING
1099     if (!copy) {
1100         CmiAbort("Error: out of memory in machine layer\n");
1101     }
1102 #endif
1103     memcpy(copy, msg, len);
1104     return copy;
1105 }
1106 /* ===== End of Common Function Definitions ===== */