fixed a msg corruption in broadcast
[charm.git] / src / arch / util / machine-broadcast.c
1 static void handleOneBcastMsg(int size, char *msg) {
2     CmiAssert(CMI_BROADCAST_ROOT(msg)!=0);
3 #if CMK_OFFLOAD_BCAST_PROCESS
4     if (CMI_BROADCAST_ROOT(msg)>0) {
5         PCQueuePush(CsvAccess(procBcastQ), msg);
6     } else {
7 #if CMK_NODE_QUEUE_AVAILABLE
8         PCQueuePush(CsvAccess(nodeBcastQ), msg);
9 #endif
10     }
11 #else
12     if (CMI_BROADCAST_ROOT(msg)>0) {
13         processProcBcastMsg(size, msg);
14     } else {
15 #if CMK_NODE_QUEUE_AVAILABLE
16         processNodeBcastMsg(size, msg);
17 #endif
18     }
19 #endif
20 }
21
22 static void processBcastQs() {
23 #if CMK_OFFLOAD_BCAST_PROCESS
24     char *msg;
25     do {
26         msg = PCQueuePop(CsvAccess(procBcastQ));
27         if (!msg) break;
28         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p begin{", CmiMyNode(), msg);
29         processProcBcastMsg(CMI_MSG_SIZE(msg), msg);
30         MACHSTATE2(4, "[%d]: process a proc-level bcast msg %p end}", CmiMyNode(), msg);
31     } while (1);
32 #if CMK_NODE_QUEUE_AVAILABLE
33     do {
34         msg = PCQueuePop(CsvAccess(nodeBcastQ));
35         if (!msg) break;
36         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p begin{", CmiMyNode(), msg);
37         processNodeBcastMsg(CMI_MSG_SIZE(msg), msg);
38         MACHSTATE2(4, "[%d]: process a node-level bcast msg %p end}", CmiMyNode(), msg);
39     } while (1);
40 #endif
41 #endif
42 }
43
44 static INLINE_KEYWORD void processProcBcastMsg(int size, char *msg) {
45     /* Since this function is only called on intermediate nodes,
46      * the rank of this msg should be 0.
47      */
48     CmiAssert(CMI_DEST_RANK(msg)==0);
49     /*CmiPushPE(CMI_DEST_RANK(msg), msg);*/
50
51 #if CMK_BROADCAST_SPANNING_TREE
52     SendSpanningChildrenProc(size, msg);
53 #elif CMK_BROADCAST_HYPERCUBE
54     SendHyperCubeProc(size, msg);
55 #endif
56     CmiPushPE(0, msg);
57
58 }
59
60 static INLINE_KEYWORD void processNodeBcastMsg(int size, char *msg) {
61 #if CMK_BROADCAST_SPANNING_TREE
62     SendSpanningChildrenNode(size, msg);
63 #elif CMK_BROADCAST_HYPERCUBE
64     SendHyperCubeNode(size, msg);
65 #endif
66
67     /* In SMP mode, this push operation needs to be executed
68      * after forwarding broadcast messages. If it is executed
69      * earlier, then during the bcast msg forwarding period,
70      * the msg could be already freed on the worker thread.
71      * As a result, the forwarded message could be wrong!
72      * 
73      */
74     CmiPushNode(msg);
75 }
76
77 static void SendSpanningChildren(int size, char *msg, int rankToAssign, int startNode) {
78 #if CMK_BROADCAST_SPANNING_TREE
79     int i, oldRank;
80     char *newmsg;
81
82     oldRank = CMI_DEST_RANK(msg);
83     /* doing this is to avoid the multiple assignment in the following for loop */
84     CMI_DEST_RANK(msg) = rankToAssign;
85     /* first send msgs to other nodes */
86     CmiAssert(startNode >=0 &&  startNode<CmiNumNodes());
87     for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
88         int nd = CmiMyNode()-startNode;
89         if (nd<0) nd+=CmiNumNodes();
90         nd = BROADCAST_SPANNING_FACTOR*nd + i;
91         if (nd > CmiNumNodes() - 1) break;
92         nd += startNode;
93         nd = nd%CmiNumNodes();
94         CmiAssert(nd>=0 && nd!=CmiMyNode());
95 #if CMK_BROADCAST_USE_CMIREFERENCE
96         CmiReference(msg);
97         LrtsSendFunc(nd, size, msg, BCAST_SYNC);
98 #else
99         newmsg = CopyMsg(msg, size);
100         LrtsSendFunc(nd, size, newmsg, BCAST_SYNC);
101 #endif
102     }
103     CMI_DEST_RANK(msg) = oldRank;
104 #endif
105 }
106
107 static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode) {
108 #if CMK_BROADCAST_HYPERCUBE
109     int i, cnt, tmp, relDist, oldRank;
110     const int dims=CmiNodesDim;
111
112     oldRank = CMI_DEST_RANK(msg);
113     /* doing this is to avoid the multiple assignment in the following for loop */
114     CMI_DEST_RANK(msg) = rankToAssign;
115
116     /* first send msgs to other nodes */
117     relDist = CmiMyNode()-startNode;
118     if (relDist < 0) relDist += CmiNumNodes();
119
120     /* Sending scheme example: say we have 9 nodes, and the msg is sent from 0
121      * The overall sending steps will be as follows:
122      * 0-->8, 0-->4, 0-->2, 0-->1
123      *               4-->6, 4-->5
124      *                      2-->3
125      *                      6-->7
126      * So for node id as N=A+2^B, it will forward the broadcast (B-1) msg to in
127      * the order as: N+2^(B-1), N+2^(B-2),..., N+1 except node 0, where B is
128      * the first position of bit 1 in the binary format of the number of N
129      * counting from the right with count starting from 0.
130      * On node 0, the value "B" should be CmiNodesDim
131      */
132     /* Calculate 2^B */
133     if(relDist==0) cnt = 1<<dims;
134     else cnt = relDist & ((~relDist)+1);
135     /*CmiPrintf("ND[%d]: send bcast msg with cnt=%d\n", CmiMyNode(), cnt);*/
136     /* Begin to send msgs */
137     for(cnt>>=1; cnt>0; cnt>>=1){
138         int nd = relDist + cnt;
139         if (nd >= CmiNumNodes()) continue;
140         nd = (nd+startNode)%CmiNumNodes();
141         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
142         CmiAssert(nd>=0 && nd!=CmiMyNode());
143 #if CMK_BROADCAST_USE_CMIREFERENCE
144         CmiReference(msg);
145         LrtsSendFunc(nd, size, msg, BCAST_SYNC);
146 #else
147         char *newmsg = CopyMsg(msg, size);
148         LrtsSendFunc(nd, size, newmsg, BCAST_SYNC);
149 #endif
150     }
151     CMI_DEST_RANK(msg) = oldRank;
152 #endif
153 }
154
155 static void SendSpanningChildrenProc(int size, char *msg) {
156     int startpe = CMI_BROADCAST_ROOT(msg)-1;
157     int startnode = CmiNodeOf(startpe);
158 #if CMK_SMP
159     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
160 #endif
161     SendSpanningChildren(size, msg, 0, startnode);
162 #if CMK_SMP
163     /* second send msgs to my peers on this node */
164     SendToPeers(size, msg);
165 #endif
166 }
167
168 /* send msg along the hypercube in broadcast. (Sameer) */
169 static void SendHyperCubeProc(int size, char *msg) {
170     int startpe = CMI_BROADCAST_ROOT(msg)-1;
171     int startnode = CmiNodeOf(startpe);
172 #if CMK_SMP
173     if (startpe > CmiNumPes()) startnode = startpe - CmiNumPes();
174 #endif
175     SendHyperCube(size, msg, 0, startnode);
176 #if CMK_SMP
177     /* second send msgs to my peers on this node */
178     SendToPeers(size, msg);
179 #endif
180 }
181
182 #if CMK_NODE_QUEUE_AVAILABLE
183 static void SendSpanningChildrenNode(int size, char *msg) {
184     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
185     SendSpanningChildren(size, msg, DGRAM_NODEMESSAGE, startnode);
186 }
187 static void SendHyperCubeNode(int size, char *msg) {
188     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
189     SendHyperCube(size, msg, DGRAM_NODEMESSAGE, startnode);
190 }
191 #endif
192
193 #if USE_COMMON_SYNC_BCAST
194 /* Functions regarding broadcat op that sends to every one else except me */
195 void CmiSyncBroadcastFn(int size, char *msg) {
196     int mype = CmiMyPe();
197     int i;
198
199     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
200 #if CMK_SMP
201     /*record the rank to avoid re-sending the msg in  spanning tree or hypercube*/
202     CMI_DEST_RANK(msg) = CmiMyRank();
203 #endif
204
205 #if CMK_BROADCAST_SPANNING_TREE
206     CMI_SET_BROADCAST_ROOT(msg, mype+1);
207     SendSpanningChildrenProc(size, msg);
208 #elif CMK_BROADCAST_HYPERCUBE
209     CMI_SET_BROADCAST_ROOT(msg, mype+1);
210     SendHyperCubeProc(size, msg);
211 #else
212     for ( i=mype+1; i<_Cmi_numpes; i++ )
213         CmiSyncSendFn(i, size, msg) ;
214     for ( i=0; i<mype; i++ )
215         CmiSyncSendFn(i, size, msg) ;
216 #endif
217
218     /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
219 }
220
221 void CmiFreeBroadcastFn(int size, char *msg) {
222     CmiSyncBroadcastFn(size,msg);
223     CmiFree(msg);
224 }
225 #endif
226
227 #if USE_COMMON_ASYNC_BCAST
228 /* FIXME: should use spanning or hypercube, but luckily async is never used */
229 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
230     /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
231     CmiAbort("CmiAsyncBroadcastFn should never be called");
232     return 0;
233 }
234 #endif
235
236 /* Functions regarding broadcat op that sends to every one */
237 void CmiSyncBroadcastAllFn(int size, char *msg) {
238     CmiSyncSendFn(CmiMyPe(), size, msg) ;
239     CmiSyncBroadcastFn(size, msg);
240 }
241
242 void CmiFreeBroadcastAllFn(int size, char *msg) {
243     CmiSendSelf(msg);
244     CmiSyncBroadcastFn(size, msg);
245 }
246
247 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
248     CmiSendSelf(CopyMsg(msg, size));
249     return CmiAsyncBroadcastFn(size, msg);
250 }
251
252 #if CMK_NODE_QUEUE_AVAILABLE
253 #if USE_COMMON_SYNC_BCAST
254 void CmiSyncNodeBroadcastFn(int size, char *msg) {
255     int mynode = CmiMyNode();
256     int i;
257     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
258 #if CMK_BROADCAST_SPANNING_TREE
259     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
260     SendSpanningChildrenNode(size, msg);
261 #elif CMK_BROADCAST_HYPERCUBE
262     CMI_SET_BROADCAST_ROOT(msg, -CmiMyNode()-1);
263     SendHyperCubeNode(size, msg);
264 #else
265     for (i=mynode+1; i<CmiNumNodes(); i++)
266         CmiSyncNodeSendFn(i, size, msg);
267     for (i=0; i<mynode; i++)
268         CmiSyncNodeSendFn(i, size, msg);
269 #endif
270 }
271
272 void CmiFreeNodeBroadcastFn(int size, char *msg) {
273     CmiSyncNodeBroadcastFn(size, msg);
274     CmiFree(msg);
275 }
276 #endif
277
278 #if USE_COMMON_ASYNC_BCAST
279 CmiCommHandle CmiAsyncNodeBroadcastFn(int size, char *msg) {
280     CmiSyncNodeBroadcastFn(size, msg);
281     return 0;
282 }
283 #endif
284
285 void CmiSyncNodeBroadcastAllFn(int size, char *msg) {
286     CmiSyncNodeSendFn(CmiMyNode(), size, msg);
287     CmiSyncNodeBroadcastFn(size, msg);
288 }
289
290 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int size, char *msg) {
291     CmiSendNodeSelf(CopyMsg(msg, size));
292     return CmiAsyncNodeBroadcastFn(size, msg);
293 }
294
295 void CmiFreeNodeBroadcastAllFn(int size, char *msg) {
296     CmiSyncNodeBroadcastFn(size, msg);
297     /* Since it's a node-level msg, the msg could be executed on any other
298      * procs on the same node. This means, the push of this msg to the
299      * node-level queue could be immediately followed a pop of this msg on
300      * other cores on the same node even when this msg has not been sent to
301      * other nodes. This is the reason CmiSendNodeSelf must be called after
302      * CmiSyncNodeBroadcastFn 
303      */
304     CmiSendNodeSelf(msg);
305 }
306 #endif
307 /* ##### End of Functions Related with Message Sending OPs ##### */