541715c2872dda9496ccbce34f99ed8a9e7b43fe
[charm.git] / src / arch / util / machine-common.c
1 #if CMK_C_INLINE
2 #define INLINE_KEYWORD inline
3 #else
4 #define INLINE_KEYWORD
5 #endif
6
7 /* ###Beginning of Broadcast related definitions ### */
8 #ifndef CMK_BROADCAST_SPANNING_TREE
9 #define CMK_BROADCAST_SPANNING_TREE    1
10 #endif
11
12 #ifndef CMK_BROADCAST_HYPERCUBE
13 #define CMK_BROADCAST_HYPERCUBE        0
14 #endif
15
16 #define BROADCAST_SPANNING_FACTOR      4
17 /* The number of children used when a msg is broadcast inside a node */
18 #define BROADCAST_SPANNING_INTRA_FACTOR  8
19
20 /* Root of broadcast:
21  * non-bcast msg: root = 0;
22  * proc-level bcast msg: root >=1; (CmiMyPe()+1)
23  * node-level bcast msg: root <=-1; (-CmiMyNode()-1)
24  */
25 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
26 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
27 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
28
29 #if USE_COMMON_SYNC_BCAST || USE_COMMON_ASYNC_BCAST
30 #if !CMK_BROADCAST_SPANNING_TREE && !CMK_BROADCAST_HYPERCUBE
31 #warning "Broadcast function is based on the plain P2P O(P)-message scheme!!!"
32 #endif
33 #endif
34
35 /**
36  * For some machine layers such as on Active Message framework,
37  * the receiver callback is usally executed on an internal
38  * thread (i.e. not the flow managed by ours). Therefore, for
39  * forwarding broadcast messages, we could have a choice whether
40  * to offload such function to the flow we manage such as the
41  * communication thread. -Chao Mei
42  */
43
44 #ifndef CMK_OFFLOAD_BCAST_PROCESS
45 #define CMK_OFFLOAD_BCAST_PROCESS 0
46 #endif
47 #if CMK_OFFLOAD_BCAST_PROCESS
48 CsvDeclare(PCQueue, procBcastQ);
49 #if CMK_NODE_QUEUE_AVAILABLE
50 CsvDeclare(PCQueue, nodeBcastQ);
51 #endif
52 #endif
53
54 #if CMK_BROADCAST_HYPERCUBE
55 /* ceil(log2(CmiNumNodes)) except when _Cmi_numnodes is 1, used for hypercube */
56 static int CmiNodesDim;
57 #endif
58 /* ###End of Broadcast related definitions ### */
59
60
61 #ifndef CMK_HAS_SIZE_IN_MSGHDR
62 #define CMK_HAS_SIZE_IN_MSGHDR 1
63 #endif
64 #if CMK_HAS_SIZE_IN_MSGHDR
65 #define CMI_MSG_SIZE(msg)  ((CmiMsgHeaderBasic *)msg)->size
66 #else
67 #define CMI_MSG_SIZE(msg)  (CmiAbort("Has no msg size in header"))
68 #endif
69
70 #if CMK_NODE_QUEUE_AVAILABLE
71 /* This value should be larger than the number of cores used
72  * per charm smp node. So it's currently set to such a large
73  * value.
74  */
75 #define DGRAM_NODEMESSAGE   (0x1FFB)
76 #endif
77
78 /* Node state structure */
79 int               _Cmi_mynode;    /* Which address space am I */
80 int               _Cmi_mynodesize;/* Number of processors in my address space */
81 int               _Cmi_numnodes;  /* Total number of address spaces */
82 int               _Cmi_numpes;    /* Total number of processors */
83
84 CpvDeclare(void*, CmiLocalQueue);
85
86 /* different modes for sending a message */
87 #define P2P_SYNC 0x1
88 #define P2P_ASYNC 0x2
89 #define BCAST_SYNC 0x3
90 #define BCAST_ASYNC 0x4
91
92 #if CMK_SMP
93 static volatile int commThdExit = 0;
94 static CmiNodeLock  commThdExitLock = 0;
95
96 /**
97  *  The macro defines whether to have a comm thd to offload some
98  *  work such as forwarding bcast messages etc. This macro
99  *  should be defined before including "machine-smp.c". Note
100  *  that whether a machine layer in SMP mode could run w/o comm
101  *  thread depends on the support of the underlying
102  *  communication library.
103  *
104  *  --Chao Mei
105  */
106 #ifndef CMK_SMP_NO_COMMTHD
107 #define CMK_SMP_NO_COMMTHD 0
108 #endif
109
110 #if CMK_SMP_NO_COMMTHD
111 int Cmi_commthread = 0;
112 #else
113 int Cmi_commthread = 1;
114 #endif
115
116 #endif
117
118 /*SHOULD BE MOVED TO MACHINE-SMP.C ??*/
119 static int Cmi_nodestart;
120
121 /*
122  * Network progress utility variables. Period controls the rate at
123  * which the network poll is called
124  */
125 #ifndef NETWORK_PROGRESS_PERIOD_DEFAULT
126 #define NETWORK_PROGRESS_PERIOD_DEFAULT 1000
127 #endif
128
129 CpvDeclare(unsigned , networkProgressCount);
130 int networkProgressPeriod;
131
132
133 /* ===== Beginning of Common Function Declarations ===== */
134 void CmiAbort(const char *message);
135 static void PerrorExit(const char *msg);
136
137 /* This function handles the msg received as which queue to push into */
138 static void handleOneRecvedMsg(int size, char *msg);
139
140 static void handleOneBcastMsg(int size, char *msg);
141 static void processBcastQs();
142
143 /* Utility functions for forwarding broadcast messages,
144  * should not be used in machine-specific implementations
145  * except in some special occasions.
146  */
147 static void processProcBcastMsg(int size, char *msg);
148 static void processNodeBcastMsg(int size, char *msg);
149 static void SendSpanningChildrenProc(int size, char *msg);
150 static void SendHyperCubeProc(int size, char *msg);
151 #if CMK_NODE_QUEUE_AVAILABLE
152 static void SendSpanningChildrenNode(int size, char *msg);
153 static void SendHyperCubeNode(int size, char *msg);
154 #endif
155
156 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode);
157 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode);
158 /* send to other ranks except me on the same node*/
159 static void SendToPeers(int size, char *msg);
160
161
162 void CmiPushPE(int rank, void *msg);
163
164 #if CMK_NODE_QUEUE_AVAILABLE
165 void CmiPushNode(void *msg);
166 #endif
167
168 /* Functions regarding send ops declared in converse.h */
169
170 /* In default, using the common codes for msg sending */
171 #ifndef USE_COMMON_SYNC_P2P
172 #define USE_COMMON_SYNC_P2P 1
173 #endif
174 #ifndef USE_COMMON_ASYNC_P2P
175 #define USE_COMMON_ASYNC_P2P 1
176 #endif
177 #ifndef USE_COMMON_SYNC_BCAST
178 #define USE_COMMON_SYNC_BCAST 1
179 #endif
180 #ifndef USE_COMMON_ASYNC_BCAST
181 #define USE_COMMON_ASYNC_BCAST 1
182 #endif
183
184 static void CmiSendSelf(char *msg);
185
186 void CmiSyncSendFn(int destPE, int size, char *msg);
187 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg);
188 void CmiFreeSendFn(int destPE, int size, char *msg);
189
190 void CmiSyncBroadcastFn(int size, char *msg);
191 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg);
192 void CmiFreeBroadcastFn(int size, char *msg);
193
194 void CmiSyncBroadcastAllFn(int size, char *msg);
195 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg);
196 void CmiFreeBroadcastAllFn(int size, char *msg);
197
198 #if CMK_NODE_QUEUE_AVAILABLE
199 static void CmiSendNodeSelf(char *msg);
200
201 void CmiSyncNodeSendFn(int destNode, int size, char *msg);
202 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg);
203 void CmiFreeNodeSendFn(int destNode, int size, char *msg);
204
205 void CmiSyncNodeBroadcastFn(int size, char *msg);
206 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg);
207 void CmiFreeNodeBroadcastFn(int size, char *msg);
208
209 void CmiSyncNodeBroadcastAllFn(int size, char *msg);
210 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg);
211 void CmiFreeNodeBroadcastAllFn(int size, char *msg);
212 #endif
213
214 /* Functions and variables regarding machine startup */
215 static char     **Cmi_argv;
216 static char     **Cmi_argvcopy;
217 static CmiStartFn Cmi_startfn;   /* The start function */
218 static int        Cmi_usrsched;  /* Continue after start function finishes? */
219 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret);
220 static void ConverseRunPE(int everReturn);
221
222 static void MachineSpecificInit(int argc, char **argv, int *numNodes, int *myNodeID);
223 /* Used in ConverseRunPE: one is called before ConverseCommonInit;
224   * The other is called after ConverseCommonInit as some data structures
225   * have been initialized, such as the tracing-relatd stuff --Chao Mei
226   */
227 static void MachineSpecificPreCommonInit(int everReturn);
228 static void MachineSpecificPostCommonInit(int everReturn);
229
230 /* Functions regarding machine running on every proc */
231 static void AdvanceCommunication();
232 static void CommunicationServer(int sleepTime);
233 static void CommunicationServerThread(int sleepTime);
234 void ConverseExit(void);
235
236 static void MachineSpecificAdvanceCommunication();
237 static void MachineSpecificDrainResources(); /* used when exit */
238 static void MachineSpecificExit();
239
240 /* Functions providing incoming network messages */
241 void *CmiGetNonLocal(void);
242 #if CMK_NODE_QUEUE_AVAILABLE
243 void *CmiGetNonLocalNodeQ(void);
244 #endif
245 void MachineSpecificPostNonLocal(void);
246
247 /* Utiltiy functions */
248 static char *CopyMsg(char *msg, int len);
249
250 /* ===== End of Common Function Declarations ===== */
251
252 #include "machine-smp.c"
253
254 /* ===== Beginning of Idle-state Related Declarations =====  */
255 typedef struct {
256     int sleepMs; /*Milliseconds to sleep while idle*/
257     int nIdles; /*Number of times we've been idle in a row*/
258     CmiState cs; /*Machine state*/
259 } CmiIdleState;
260
261 static CmiIdleState *CmiNotifyGetState(void);
262
263 /**
264  *  Generally,
265  *
266  *  CmiNotifyIdle is used in non-SMP mode when the proc is idle.
267  *  When the proc is idle, AdvanceCommunication needs to be
268  *  called.
269  *
270  *  CmiNotifyStillIdle and CmiNotifyBeginIdle are used in SMP mode.
271  *
272  *  Different layers have choices of registering different callbacks for
273  *  idle state.
274  */
275 static void CmiNotifyBeginIdle(CmiIdleState *s);
276 static void CmiNotifyStillIdle(CmiIdleState *s);
277 void CmiNotifyIdle(void);
278 /* ===== End of Idle-state Related Declarations =====  */
279
280 /* ===== Beginning of Processor/Node State-related Stuff =====*/
281 #if !CMK_SMP
282 /************ non SMP **************/
283 static struct CmiStateStruct Cmi_state;
284 int _Cmi_mype;
285 int _Cmi_myrank;
286
287 void CmiMemLock() {}
288 void CmiMemUnlock() {}
289
290 #define CmiGetState() (&Cmi_state)
291 #define CmiGetStateN(n) (&Cmi_state)
292
293 void CmiYield(void) {
294     sleep(0);
295 }
296
297 static void CmiStartThreads(char **argv) {
298     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
299     _Cmi_mype = Cmi_nodestart;
300     _Cmi_myrank = 0;
301 }
302 #else
303 /************** SMP *******************/
304 INLINE_KEYWORD int CmiMyPe(void) {
305     return CmiGetState()->pe;
306 }
307 INLINE_KEYWORD int CmiMyRank(void) {
308     return CmiGetState()->rank;
309 }
310 INLINE_KEYWORD int CmiNodeFirst(int node) {
311     return node*_Cmi_mynodesize;
312 }
313 INLINE_KEYWORD int CmiNodeSize(int node) {
314     return _Cmi_mynodesize;
315 }
316 INLINE_KEYWORD int CmiNodeOf(int pe) {
317     return (pe/_Cmi_mynodesize);
318 }
319 INLINE_KEYWORD int CmiRankOf(int pe) {
320     return pe%_Cmi_mynodesize;
321 }
322 #endif
323 CsvDeclare(CmiNodeState, NodeState);
324 /* ===== End of Processor/Node State-related Stuff =====*/
325
326 #include "immediate.c"
327
328 /* ===== Beginning of Common Function Definitions ===== */
329 static void PerrorExit(const char *msg) {
330     perror(msg);
331     exit(1);
332 }
333
334 /* ##### Beginning of Functions Related with Message Sending OPs ##### */
335 /*Add a message to this processor's receive queue, pe is a rank */
336 void CmiPushPE(int rank,void *msg) {
337     CmiState cs = CmiGetStateN(rank);
338     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",rank, cs->recv);
339 #if CMK_IMMEDIATE_MSG
340     if (CmiIsImmediate(msg)) {
341         MACHSTATE1(3, "[%p] Push Immediate Message begin{",CmiGetState());
342         CMI_DEST_RANK(msg) = rank;
343         CmiPushImmediateMsg(msg);
344         MACHSTATE1(3, "[%p] Push Immediate Message end}",CmiGetState());
345         return;
346     }
347 #endif
348
349     PCQueuePush(cs->recv,msg);
350     CmiIdleLock_addMessage(&cs->idle);
351     MACHSTATE1(3,"} Pushing message into rank %d's queue done",rank);
352 }
353
354 #if CMK_NODE_QUEUE_AVAILABLE
355 /*Add a message to this processor's receive queue */
356 void CmiPushNode(void *msg) {
357     MACHSTATE(3,"Pushing message into NodeRecv queue");
358 #if CMK_IMMEDIATE_MSG
359     if (CmiIsImmediate(msg)) {
360         CMI_DEST_RANK(msg) = 0;
361         CmiPushImmediateMsg(msg);
362         return;
363     }
364 #endif
365     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
366     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
367     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
368     {
369         CmiState cs=CmiGetStateN(0);
370         CmiIdleLock_addMessage(&cs->idle);
371     }
372 }
373 #endif
374
375 /* This function handles the msg received as which queue to push into */
376 static INLINE_KEYWORD void handleOneRecvedMsg(int size, char *msg) {
377     int isBcastMsg = 0;
378 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
379     isBcastMsg = (CMI_BROADCAST_ROOT(msg)!=0);
380 #endif
381
382     if (isBcastMsg) {
383         handleOneBcastMsg(size, msg);
384         return;
385     }
386
387 #if CMK_NODE_QUEUE_AVAILABLE
388     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
389         CmiPushNode(msg);
390     else
391 #endif
392         CmiPushPE(CMI_DEST_RANK(msg), msg);
393
394 }
395
396
397 /* ##### Beginning of Broadcast-related functions' defitions ##### */
398 static void handleOneBcastMsg(int size, char *msg) {
399     CmiAssert(CMI_BROADCAST_ROOT(msg)!=0);
400 #if CMK_OFFLOAD_BCAST_PROCESS
401     if (CMI_BROADCAST_ROOT(msg)>0) {
402         PCQueuePush(CsvAccess(procBcastQ), msg);
403     } else {
404 #if CMK_NODE_QUEUE_AVAILABLE
405         PCQueuePush(CsvAccess(nodeBcastQ), msg);
406 #endif
407     }
408 #else
409     if (CMI_BROADCAST_ROOT(msg)>0) {
410         processProcBcastMsg(size, msg);
411     } else {
412 #if CMK_NODE_QUEUE_AVAILABLE
413         processNodeBcastMsg(size, msg);
414 #endif
415     }
416 #endif
417 }
418
419 static void processBcastQs() {
420 #if CMK_OFFLOAD_BCAST_PROCESS
421     char *msg;
422     do {
423         msg = PCQueuePop(CsvAccess(procBcastQ));
424         if (!msg) break;
425         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p begin{", CmiMyNode(), msg);
426         processProcBcastMsg(CMI_MSG_SIZE(msg), msg);
427         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p end}", CmiMyNode(), msg);
428     } while (1);
429 #if CMK_NODE_QUEUE_AVAILABLE
430     do {
431         msg = PCQueuePop(CsvAccess(nodeBcastQ));
432         if (!msg) break;
433         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p begin{", CmiMyNode(), msg);
434         processNodeBcastMsg(CMI_MSG_SIZE(msg), msg);
435         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p end}", CmiMyNode(), msg);
436     } while (1);
437 #endif
438 #endif
439 }
440
441 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg) {
442 #if CMK_BROADCAST_SPANNING_TREE
443     SendSpanningChildrenProc(size, msg);
444 #elif CMK_BROADCAST_HYPERCUBE
445     SendHyperCubeProc(size, msg);
446 #endif
447
448     /* Since this function is only called on intermediate nodes,
449      * the rank of this msg should be 0.
450      */
451     CmiAssert(CMI_DEST_RANK(msg)==0);
452     /*CmiPushPE(CMI_DEST_RANK(msg), msg);*/
453     CmiPushPE(0, msg);
454 }
455
456 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
457 #if CMK_BROADCAST_SPANNING_TREE
458     SendSpanningChildrenNode(size, msg);
459 #elif CMK_BROADCAST_HYPERCUBE
460     SendHyperCubeNode(size, msg);
461 #endif
462
463     /* In SMP mode, this push operation needs to be executed
464      * after forwarding broadcast messages. If it is executed
465      * earlier, then during the bcast msg forwarding period,
466      * the msg could be already freed on the worker thread.
467      * As a result, the forwarded message could be wrong!
468      * --Chao Mei
469      */
470     CmiPushNode(msg);
471 }
472
473 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode) {
474 #if CMK_BROADCAST_SPANNING_TREE
475     int i, oldRank;
476
477     oldRank = CMI_DEST_RANK(msg);
478     /* doing this is to avoid the multiple assignment in the following for loop */
479     CMI_DEST_RANK(msg) = rankToAssign;
480     /* first send msgs to other nodes */
481     CmiAssert(startNode >=0 &&  startNode<CmiNumNodes());
482     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
483         int nd = CmiMyNode()-startNode;
484         if (nd<0) nd+=CmiNumNodes();
485         nd = BROADCAST_SPANNING_FACTOR*nd + i;
486         if (nd > CmiNumNodes() - 1) break;
487         nd += startNode;
488         nd = nd%CmiNumNodes();
489         CmiAssert(nd>=0 && nd!=CmiMyNode());
490 #if CMK_BROADCAST_USE_CMIREFERENCE
491         CmiReference(msg);
492         CmiMachineSpecificSendFunc(nd, size, msg, P2P_SYNC);
493 #else
494         char *newmsg = CopyMsg(msg, size);
495         CmiMachineSpecificSendFunc(nd, size, newmsg, P2P_SYNC);
496 #endif
497     }
498     CMI_DEST_RANK(msg) = oldRank;
499 #endif
500 }
501 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode) {
502 #if CMK_BROADCAST_HYPERCUBE
503     int i, cnt, tmp, relDist, oldRank;
504     const int dims=CmiNodesDim;
505
506     oldRank = CMI_DEST_RANK(msg);
507     /* doing this is to avoid the multiple assignment in the following for loop */
508     CMI_DEST_RANK(msg) = rankToAssign;
509
510     /* first send msgs to other nodes */
511     relDist = CmiMyNode()-startNode;
512     if (relDist < 0) relDist += CmiNumNodes();
513     cnt=0;
514     tmp = relDist;
515     /* count how many zeros (in binary format) relDist has */
516     for (i=0; i<dims; i++, cnt++) {
517         if (tmp & 1 == 1) break;
518         tmp = tmp >> 1;
519     }
520
521     /*CmiPrintf("ND[%d]: SendHypercube with snd=%d, relDist=%d, cnt=%d\n", CmiMyNode(), startnode, relDist, cnt);*/
522     for (i = cnt-1; i >= 0; i--) {
523         int nd = relDist + (1 << i);
524         if (nd >= CmiNumNodes()) continue;
525         nd = (nd+startNode)%CmiNumNodes();
526         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
527         CmiAssert(nd>=0 && nd!=CmiMyNode());
528 #if CMK_BROADCAST_USE_CMIREFERENCE
529         CmiReference(msg);
530         CmiMachineSpecificSendFunc(nd, size, msg, P2P_SYNC);
531 #else
532         char *newmsg = CopyMsg(msg, size);
533         CmiMachineSpecificSendFunc(nd, size, newmsg, P2P_SYNC);
534 #endif
535     }
536     CMI_DEST_RANK(msg) = oldRank;
537 #endif
538 }
539
540 static void SendSpanningChildrenProc(int size, char *msg) {
541     int startpe = CMI_BROADCAST_ROOT(msg)-1;
542     int startnode = CmiNodeOf(startpe);
543 #if CMK_SMP
544     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
545 #endif
546     SendSpanningChildren(size, msg, 0, startnode);
547 #if CMK_SMP
548     /* second send msgs to my peers on this node */
549     SendToPeers(size, msg);
550 #endif
551 }
552
553 /* send msg along the hypercube in broadcast. (Sameer) */
554 static void SendHyperCubeProc(int size, char *msg) {
555     int startpe = CMI_BROADCAST_ROOT(msg)-1;
556     int startnode = CmiNodeOf(startpe);
557 #if CMK_SMP
558     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
559 #endif
560     SendHyperCube(size, msg, 0, startnode);
561 #if CMK_SMP
562     /* second send msgs to my peers on this node */
563     SendToPeers(size, msg);
564 #endif
565 }
566
567 static void SendToPeers(int size, char *msg) {
568     /* FIXME: now it's just a flat p2p send!! When node size is large,
569     * it should also be sent in a tree
570     */
571     int exceptRank = CMI_DEST_RANK(msg);
572     int i;
573     for (i=0; i<exceptRank; i++) {
574         CmiPushPE(i, CopyMsg(msg, size));
575     }
576     for (i=exceptRank+1; i<CmiMyNodeSize(); i++) {
577         CmiPushPE(i, CopyMsg(msg, size));
578     }
579 }
580
581 #if CMK_NODE_QUEUE_AVAILABLE
582 static void SendSpanningChildrenNode(int size, char *msg) {
583     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
584     SendSpanningChildren(size, msg, DGRAM_NODEMESSAGE, startnode);
585 }
586 static void SendHyperCubeNode(int size, char *msg) {
587     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
588     SendHyperCube(size, msg, DGRAM_NODEMESSAGE, startnode);
589 }
590 #endif
591 /*##### End of Broadcast-related functions' defitions #####*/
592
593 /* Functions regarding sending operations */
594 static void CmiSendSelf(char *msg) {
595 #if CMK_IMMEDIATE_MSG
596     if (CmiIsImmediate(msg)) {
597         /* CmiBecomeNonImmediate(msg); */
598         CmiPushImmediateMsg(msg);
599         CmiHandleImmediate();
600         return;
601     }
602 #endif
603     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
604 }
605
606 /* Functions regarding P2P send op */
607 #if USE_COMMON_SYNC_P2P
608 void CmiSyncSendFn(int destPE, int size, char *msg) {
609     char *dupmsg = CopyMsg(msg, size);
610     CmiFreeSendFn(destPE, size, dupmsg);
611 }
612
613 void CmiFreeSendFn(int destPE, int size, char *msg) {
614     CMI_SET_BROADCAST_ROOT(msg, 0);
615     CQdCreate(CpvAccess(cQdState), 1);
616     if (CmiMyPe()==destPE) {
617         CmiSendSelf(msg);
618     } else {
619         int destNode = CmiNodeOf(destPE);
620 #if CMK_SMP
621         if (CmiMyNode()==destNode) {
622             CmiPushPE(CmiRankOf(destPE), msg);
623             return;
624         }
625 #endif
626         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
627         CmiMachineSpecificSendFunc(destNode, size, msg, P2P_SYNC);
628     }
629 }
630 #endif
631
632 #if USE_COMMON_ASYNC_P2P
633 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
634     int destNode = CmiNodeOf(destPE);
635     if (destNode == CmiMyNode()) {
636         CmiSyncSendFn(destPE,size,msg);
637         return 0;
638     } else {
639         return CmiMachineSpecificSendFunc(destPE, size, msg, P2P_ASYNC);
640     }
641 }
642 #endif
643
644 #if USE_COMMON_SYNC_BCAST
645 /* Functions regarding broadcat op that sends to every one else except me */
646 void CmiSyncBroadcastFn(int size, char *msg) {
647     int mype = CmiMyPe();
648     int i;
649
650     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
651 #if CMK_SMP
652     /*record the rank to avoid re-sending the msg in  spanning tree or hypercube*/
653     CMI_DEST_RANK(msg) = CmiMyRank();
654 #endif
655
656 #if CMK_BROADCAST_SPANNING_TREE
657     CMI_SET_BROADCAST_ROOT(msg, mype+1);
658     SendSpanningChildrenProc(size, msg);
659 #elif CMK_BROADCAST_HYPERCUBE
660     CMI_SET_BROADCAST_ROOT(msg, mype+1);
661     SendHyperCubeProc(size, msg);
662 #else
663     for ( i=mype+1; i<_Cmi_numpes; i++ )
664         CmiSyncSendFn(i, size, msg) ;
665     for ( i=0; i<mype; i++ )
666         CmiSyncSendFn(i, size, msg) ;
667 #endif
668
669     /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
670 }
671
672 void CmiFreeBroadcastFn(int size, char *msg) {
673     CmiSyncBroadcastFn(size,msg);
674     CmiFree(msg);
675 }
676 #endif
677
678 #if USE_COMMON_ASYNC_BCAST
679 /* FIXME: should use spanning or hypercube, but luckily async is never used */
680 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
681     /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
682     CmiAbort("CmiAsyncBroadcastFn should never be called");
683     return 0;
684 }
685 #endif
686
687 /* Functions regarding broadcat op that sends to every one */
688 void CmiSyncBroadcastAllFn(int size, char *msg) {
689     CmiSyncSendFn(CmiMyPe(), size, msg) ;
690     CmiSyncBroadcastFn(size, msg);
691 }
692
693 void CmiFreeBroadcastAllFn(int size, char *msg) {
694     CmiSendSelf(msg);
695     CmiSyncBroadcastFn(size, msg);
696 }
697
698 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
699     CmiSendSelf(CopyMsg(msg, size));
700     return CmiAsyncBroadcastFn(size, msg);
701 }
702
703 #if CMK_NODE_QUEUE_AVAILABLE
704 static void CmiSendNodeSelf(char *msg) {
705 #if CMK_IMMEDIATE_MSG
706     if (CmiIsImmediate(msg)) {
707         CmiPushImmediateMsg(msg);
708         if (!_immRunning) CmiHandleImmediate();
709         return;
710     }
711 #endif
712     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
713     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
714     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
715 }
716
717 #if USE_COMMON_ASYNC_P2P
718 void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
719     char *dupmsg = CopyMsg(msg, size);
720     CmiFreeNodeSendFn(destNode, size, dupmsg);
721 }
722
723 void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
724     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
725     CQdCreate(CpvAccess(cQdState), 1);
726     CMI_SET_BROADCAST_ROOT(msg, 0);
727     if (destNode == CmiMyNode()) {
728         CmiSendNodeSelf(msg);
729     } else {
730         CmiMachineSpecificSendFunc(destNode, size, msg, P2P_SYNC);
731     }
732 }
733 #endif
734
735 #if USE_COMMON_ASYNC_P2P
736 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
737     if (destNode == CmiMyNode()) {
738         CmiSyncNodeSendFn(destNode, size, msg);
739         return 0;
740     } else {
741         return CmiMachineSpecificSendFunc(destNode, size, msg, P2P_ASYNC);
742     }
743 }
744 #endif
745
746 #if USE_COMMON_SYNC_BCAST
747 void CmiSyncNodeBroadcastFn(int size, char *msg) {
748     int mynode = CmiMyNode();
749     int i;
750     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
751 #if CMK_BROADCAST_SPANNING_TREE
752     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
753     SendSpanningChildrenNode(size, msg);
754 #elif CMK_BROADCAST_HYPERCUBE
755     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
756     SendHyperCubeNode(size, msg);
757 #else
758     for (i=mynode+1; i<CmiNumNodes(); i++)
759         CmiSyncNodeSendFn(i, size, msg);
760     for (i=0; i<mynode; i++)
761         CmiSyncNodeSendFn(i, size, msg);
762 #endif
763 }
764
765 void CmiFreeNodeBroadcastFn(int size, char *msg) {
766     CmiSyncNodeBroadcastFn(size, msg);
767     CmiFree(msg);
768 }
769 #endif
770
771 #if USE_COMMON_ASYNC_BCAST
772 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
773     CmiSyncNodeBroadcastFn(size, msg);
774     return 0;
775 }
776 #endif
777
778 void CmiSyncNodeBroadcastAllFn(int size, char *msg) {
779     CmiSyncNodeSendFn(CmiMyNode(), size, msg);
780     CmiSyncNodeBroadcastFn(size, msg);
781 }
782
783 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg) {
784     CmiSendNodeSelf(CopyMsg(msg, size));
785     return CmiAsyncNodeBroadcastFn(size, msg);
786 }
787
788 void CmiFreeNodeBroadcastAllFn(int size, char *msg) {
789     CmiSyncNodeBroadcastFn(size, msg);
790     /* Since it's a node-level msg, the msg could be executed on any other
791      * procs on the same node. This means, the push of this msg to the
792      * node-level queue could be immediately followed a pop of this msg on
793      * other cores on the same node even when this msg has not been sent to
794      * other nodes. This is the reason CmiSendNodeSelf must be called after
795      * CmiSyncNodeBroadcastFn -Chao Mei
796      */
797     CmiSendNodeSelf(msg);
798 }
799 #endif
800 /* ##### End of Functions Related with Message Sending OPs ##### */
801
802 /* ##### Beginning of Functions Related with Machine Startup ##### */
803 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
804     int tmp;
805     /* processor per node */
806     _Cmi_mynodesize = 1;
807     if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
808         CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
809 #if ! CMK_SMP
810     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
811         CmiAbort("+ppn cannot be used in non SMP version!\n");
812 #endif
813
814     /* Network progress function is used to poll the network when for
815     messages. This flushes receive buffers on some  implementations*/
816     networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
817     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
818
819     /* _Cmi_mynodesize has to be obtained before MachineSpecificInit
820      * because it may be used inside MachineSpecificInit
821      */
822     /* argv could be changed inside MachineSpecificInit */
823     /* Inside this function, the number of nodes and my node id are obtained */
824     MachineSpecificInit(argc, argv, &_Cmi_numnodes, &_Cmi_mynode);
825
826     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
827     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
828     Cmi_argvcopy = CmiCopyArgs(argv);
829     Cmi_argv = argv;
830     Cmi_startfn = fn;
831     Cmi_usrsched = usched;
832
833     /* CmiTimerInit(); */
834 #if CMK_BROADCAST_HYPERCUBE
835     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
836     tmp = CmiNumNodes()-1;
837     CmiNodesDim = 0;
838     while (tmp>0) {
839         CmiNodesDim++;
840         tmp = tmp >> 1;
841     }
842     if (CmiNumNodes()==1) CmiNodesDim=1;
843 #endif
844
845     CsvInitialize(CmiNodeState, NodeState);
846     CmiNodeStateInit(&CsvAccess(NodeState));
847 #if CMK_SMP
848     commThdExitLock = CmiCreateLock();
849 #endif
850
851 #if CMK_OFFLOAD_BCAST_PROCESS
852     /* the actual queues should be created on comm thread considering NUMA in SMP */
853     CsvInitialize(PCQueue, procBcastQ);
854 #if CMK_NODE_QUEUE_AVAILABLE
855     CsvInitialize(PCQueue, nodeBcastQ);
856 #endif
857 #endif
858
859     CmiStartThreads(argv);
860     ConverseRunPE(initret);
861 }
862
863 static void ConverseRunPE(int everReturn) {
864     CmiState cs;
865     char** CmiMyArgv;
866
867     MachineSpecificPreCommonInit(everReturn);
868
869 #if CMK_OFFLOAD_BCAST_PROCESS
870     int createQueue = 1;
871 #if CMK_SMP
872 #if CMK_SMP_NO_COMMTHD
873     /* If there's no comm thread, then the queue is created on rank 0 */
874     if (CmiMyRank()) createQueue = 0;
875 #else
876     if (CmiMyRank()<CmiMyNodeSize()) createQueue = 0;
877 #endif
878 #endif
879
880     if (createQueue) {
881         CsvAccess(procBcastQ) = PCQueueCreate();
882 #if CMK_NODE_QUEUE_AVAILABLE
883         CsvAccess(nodeBcastQ) = PCQueueCreate();
884 #endif
885     }
886 #endif
887
888     CmiNodeAllBarrier();
889
890     cs = CmiGetState();
891     CpvInitialize(void *,CmiLocalQueue);
892     CpvAccess(CmiLocalQueue) = cs->localqueue;
893
894     if (CmiMyRank())
895         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
896     else
897         CmiMyArgv=Cmi_argv;
898
899     CthInit(CmiMyArgv);
900
901     /* initialize the network progress counter*/
902     /* Network progress function is used to poll the network when for
903        messages. This flushes receive buffers on some  implementations*/
904     CpvInitialize(unsigned , networkProgressCount);
905     CpvAccess(networkProgressCount) = 0;
906
907     ConverseCommonInit(CmiMyArgv);
908
909     MachineSpecificPostCommonInit(everReturn);
910
911     /* Converse initialization finishes, immediate messages can be processed.
912        node barrier previously should take care of the node synchronization */
913     _immediateReady = 1;
914
915     /* communication thread */
916     if (CmiMyRank() == CmiMyNodeSize()) {
917         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
918         while (1) CommunicationServerThread(5);
919     } else { /* worker thread */
920         if (!everReturn) {
921             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
922             if (Cmi_usrsched==0) CsdScheduler(-1);
923             ConverseExit();
924         }
925     }
926 }
927 /* ##### End of Functions Related with Machine Startup ##### */
928
929 /* ##### Beginning of Functions Related with Machine Running ##### */
930 static INLINE_KEYWORD void AdvanceCommunication() {
931     int doProcessBcast = 1;
932
933     MachineSpecificAdvanceCommunication();
934
935 #if CMK_OFFLOAD_BCAST_PROCESS
936 #if CMK_SMP_NO_COMMTHD
937     /*FIXME: only asks rank 0 to process bcast msgs, so perf may suffer*/
938     if (CmiMyRank()) doProcessBcast = 0;
939 #endif
940     if (doProcessBcast) processBcastQs();
941 #endif
942
943 #if CMK_IMMEDIATE_MSG
944 #if !CMK_SMP
945     CmiHandleImmediate();
946 #endif
947 #if CMK_SMP && CMK_SMP_NO_COMMTHD
948     if (CmiMyRank()==0) CmiHandleImmediate();
949 #endif
950 #endif
951
952 }
953
954 static void CommunicationServer(int sleepTime) {
955 #if CMK_SMP
956     AdvanceCommunication();
957
958     if (commThdExit == CmiMyNodeSize()) {
959         MACHSTATE(2, "CommunicationServer exiting {");
960         MachineSpecificDrainResources();
961         MACHSTATE(2, "} CommunicationServer EXIT");
962
963         ConverseCommonExit();
964
965         MachineSpecificExit();
966     }
967 #endif
968 }
969
970 static void CommunicationServerThread(int sleepTime) {
971     CommunicationServer(sleepTime);
972 #if CMK_IMMEDIATE_MSG
973     CmiHandleImmediate();
974 #endif
975 }
976
977 void ConverseExit(void) {
978 #if !CMK_SMP
979     MachineSpecificDrainResources();
980 #endif
981
982     ConverseCommonExit();
983
984 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
985     if (CmiMyPe() == 0) CmiPrintf("End of program\n");
986 #endif
987
988 #if !CMK_SMP
989     MachineSpecificExit();
990 #else
991     /* In SMP, the communication thread will exit */
992     /* atomic increment */
993     CmiLock(commThdExitLock);
994     commThdExit++;
995     CmiUnlock(commThdExitLock);
996     while (1) CmiYield();
997 #endif
998 }
999 /* ##### End of Functions Related with Machine Running ##### */
1000
1001
1002 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
1003 void *CmiGetNonLocal(void) {
1004     CmiState cs = CmiGetState();
1005     void *msg = NULL;
1006
1007 #if !CMK_SMP || CMK_SMP_NO_COMMTHD
1008     /**
1009       * In SMP mode with comm thread, it's possible a normal
1010       * msg is sent from an immediate msg which is executed
1011       * on comm thread. In this case, the msg is sent to
1012       * the network queue of the work thread. Therefore,
1013       * even there's only one worker thread, the polling of
1014       * network queue is still required. -Chao Mei
1015       */
1016     if (CmiNumPes() == 1) return NULL;
1017 #endif
1018
1019     MACHSTATE2(3, "[%p] CmiGetNonLocal begin %d{", cs, CmiMyPe());
1020     CmiIdleLock_checkMessage(&cs->idle);
1021     /* ?????although it seems that lock is not needed, I found it crashes very often
1022        on mpi-smp without lock */
1023 #if !CMK_SMP
1024     AdvanceCommunication();
1025 #endif
1026
1027     msg = PCQueuePop(cs->recv);
1028
1029 #if !CMK_SMP
1030     MachineSpecificPostNonLocal();
1031 #endif
1032
1033     MACHSTATE3(3,"[%p] CmiGetNonLocal from queue %p with msg %p end }",CmiGetState(),(cs->recv), msg);
1034
1035     return msg;
1036 }
1037 #if CMK_NODE_QUEUE_AVAILABLE
1038 void *CmiGetNonLocalNodeQ(void) {
1039     CmiState cs = CmiGetState();
1040     char *result = 0;
1041     CmiIdleLock_checkMessage(&cs->idle);
1042     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1043         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1044         CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1045         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1046         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1047         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1048     }
1049
1050     return result;
1051 }
1052 #endif
1053 /* ##### End of Functions Providing Incoming Network Messages ##### */
1054
1055 /* ##### Beginning of Functions Related with Idle-state ##### */
1056 static CmiIdleState *CmiNotifyGetState(void) {
1057     CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1058     s->sleepMs=0;
1059     s->nIdles=0;
1060     s->cs=CmiGetState();
1061     return s;
1062 }
1063
1064 static void CmiNotifyBeginIdle(CmiIdleState *s) {
1065     s->sleepMs=0;
1066     s->nIdles=0;
1067 }
1068
1069 /*Number of times to spin before sleeping*/
1070 #define SPINS_BEFORE_SLEEP 20
1071 static void CmiNotifyStillIdle(CmiIdleState *s) {
1072     MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1073     s->nIdles++;
1074     if (s->nIdles>SPINS_BEFORE_SLEEP) { /*Start giving some time back to the OS*/
1075         s->sleepMs+=2;
1076         if (s->sleepMs>10) s->sleepMs=10;
1077     }
1078
1079     if (s->sleepMs>0) {
1080         MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1081         CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1082         MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1083     }
1084
1085 #if !CMK_SMP
1086     AdvanceCommunication();
1087 #endif
1088
1089     MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1090 }
1091
1092 /* usually called in non-smp mode */
1093 void CmiNotifyIdle(void) {
1094     AdvanceCommunication();
1095     CmiYield();
1096 }
1097 /* ##### End of Functions Related with Idle-state ##### */
1098
1099 /* Utiltiy functions */
1100 static char *CopyMsg(char *msg, int len) {
1101     char *copy = (char *)CmiAlloc(len);
1102 #if CMK_ERROR_CHECKING
1103     if (!copy) {
1104         CmiAbort("Error: out of memory in machine layer\n");
1105     }
1106 #endif
1107     memcpy(copy, msg, len);
1108     return copy;
1109 }
1110 /* ===== End of Common Function Definitions ===== */