d02b227960d48d7d7d8051da6c5796092d1f154f
[charm.git] / src / arch / bluegenep / machine.c
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include "assert.h"
12 #include "malloc.h"
13
14 #include <bpcore/ppc450_inlines.h>
15
16 #include "dcmf.h"
17 #include "dcmf_multisend.h"
18
19 char *ALIGN_16(char *p) {
20     return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
21 }
22
23 #define PROGRESS_PERIOD 1024
24
25 //There are two roles of comm thread:
26 // 1. polling other cores' bcast msg queue
27 // 2. bcast msg is handled only by comm thd
28 #define BCASTMSG_ONLY_TO_COMMTHD 0
29
30 CpvDeclare(PCQueue, broadcast_q);                 //queue to send broadcast messages
31 #if CMK_NODE_QUEUE_AVAILABLE
32 CsvDeclare(PCQueue, node_bcastq);
33 CsvDeclare(CmiNodeLock, node_bcastLock);
34 #endif
35
36 /*
37     To reduce the buffer used in broadcast and distribute the load from
38   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
39   spanning tree broadcast algorithm.
40     This will use the fourth short in message as an indicator of spanning tree
41   root.
42 */
43 #if CMK_SMP
44 #define CMK_BROADCAST_SPANNING_TREE   1
45 #else
46 #define CMK_BROADCAST_SPANNING_TREE    1
47 #define CMK_BROADCAST_HYPERCUBE        0
48 #endif /* CMK_SMP */
49
50 #define BROADCAST_SPANNING_FACTOR     2
51
52 //The root of the message infers the type of the message
53 // 1. root is 0, then it is a normal point-to-point message
54 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
55 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
58 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
59 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
60
61 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
62 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
63
64 /* FIXME: need a random number that everyone agrees ! */
65 #define CHARM_MAGIC_NUMBER               126
66
67 #if !CMK_OPTIMIZE
68 static int checksum_flag = 0;
69 extern unsigned char computeCheckSum(unsigned char *data, int len);
70
71 #define CMI_SET_CHECKSUM(msg, len)      \
72         if (checksum_flag)  {   \
73           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
74           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
75         }
76
77 #define CMI_CHECK_CHECKSUM(msg, len)    \
78         if (checksum_flag)      \
79           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
80             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
81             for(count = 0; count < len; count++) { \
82                 printf("%2x", msg[count]);                 \
83             }                                             \
84             printf("------------------------------\n\n"); \
85             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
86           }
87 #else
88 #define CMI_SET_CHECKSUM(msg, len)
89 #define CMI_CHECK_CHECKSUM(msg, len)
90 #endif
91
92 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
93
94 #if CMK_BROADCAST_HYPERCUBE
95 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
96 #else
97 #  define CMI_SET_CYCLE(msg, cycle)
98 #endif
99
100 int               _Cmi_numpes;
101 int               _Cmi_mynode;    /* Which address space am I */
102 int               _Cmi_mynodesize;/* Number of processors in my address space */
103 int               _Cmi_numnodes;  /* Total number of address spaces */
104 int                Cmi_nodestart; /* First processor in this address space */
105 CpvDeclare(void*, CmiLocalQueue);
106
107
108 #if CMK_NODE_QUEUE_AVAILABLE
109 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
110 // is available
111 #define NODE_BROADCAST_OTHERS (-1)
112 #define NODE_BROADCAST_ALL    (-2)
113 #endif
114
115
116 typedef struct ProcState {
117     /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
118     CmiNodeLock  recvLock;              /* for cs->recv */
119     CmiNodeLock bcastLock;
120 } ProcState;
121
122 static ProcState  *procState;
123
124 #if CMK_SMP && !CMK_MULTICORE
125 static volatile int commThdExit = 0;
126 static CmiNodeLock commThdExitLock = 0;
127 #endif
128
129 void ConverseRunPE(int everReturn);
130 static void CommunicationServer(int sleepTime);
131 static void CommunicationServerThread(int sleepTime);
132
133 //So far we dont define any comm threads
134 int Cmi_commthread = 0;
135
136 #include "machine-smp.c"
137 CsvDeclare(CmiNodeState, NodeState);
138 #include "immediate.c"
139
140 void AdvanceCommunications();
141
142
143 #if !CMK_SMP
144 /************ non SMP **************/
145 static struct CmiStateStruct Cmi_state;
146 int _Cmi_mype;
147 int _Cmi_myrank;
148
149 void CmiMemLock(void) {}
150 void CmiMemUnlock(void) {}
151
152 #define CmiGetState() (&Cmi_state)
153 #define CmiGetStateN(n) (&Cmi_state)
154
155 //void CmiYield(void) { sleep(0); }
156
157 static void CmiStartThreads(char **argv) {
158     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
159     _Cmi_mype = Cmi_nodestart;
160     _Cmi_myrank = 0;
161 }
162 #endif  /* !CMK_SMP */
163
164 //int received_immediate;
165 //int received_broadcast;
166
167 /*Add a message to this processor's receive queue, pe is a rank */
168 void CmiPushPE(int pe,void *msg) {
169     CmiState cs = CmiGetStateN(pe);
170     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
171 #if CMK_IMMEDIATE_MSG
172     if (CmiIsImmediate(msg)) {
173         /**(CmiUInt2 *)msg = pe;*/
174         //received_immediate = 1;
175         //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
176         //CMI_DEST_RANK(msg) = pe;
177         CmiPushImmediateMsg(msg);
178         return;
179     }
180 #endif
181 #if CMK_SMP
182     CmiLock(procState[pe].recvLock);
183 #endif
184
185     PCQueuePush(cs->recv,(char *)msg);
186     //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
187
188 #if CMK_SMP
189     CmiUnlock(procState[pe].recvLock);
190 #endif
191     CmiIdleLock_addMessage(&cs->idle);
192     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
193 }
194
195 #if CMK_NODE_QUEUE_AVAILABLE
196 /*Add a message to this processor's receive queue */
197 static void CmiPushNode(void *msg) {
198     MACHSTATE(3,"Pushing message into NodeRecv queue");
199 #if CMK_IMMEDIATE_MSG
200     if (CmiIsImmediate(msg)) {
201         //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
202         //CMI_DEST_RANK(msg) = 0;
203         CmiPushImmediateMsg(msg);
204         return;
205     }
206 #endif
207     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
208     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
209     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
210     {
211         CmiState cs=CmiGetStateN(0);
212         CmiIdleLock_addMessage(&cs->idle);
213     }
214 }
215 #endif /* CMK_NODE_QUEUE_AVAILABLE */
216
217 volatile int msgQueueLen;
218 volatile int outstanding_recvs;
219
220 static int Cmi_dim;     /* hypercube dim of network */
221
222 static char     **Cmi_argv;
223 static char     **Cmi_argvcopy;
224 static CmiStartFn Cmi_startfn;   /* The start function */
225 static int        Cmi_usrsched;  /* Continue after start function finishes? */
226
227 extern void ConverseCommonInit(char **argv);
228 extern void ConverseCommonExit(void);
229 extern void CthInit(char **argv);
230
231 static void SendMsgsUntil(int);
232
233
234 void SendSpanningChildren(int size, char *msg);
235 #if CMK_NODE_QUEUE_AVAILABLE
236 void SendSpanningChildrenNode(int size, char *msg);
237 #endif
238 void SendHypercube(int size, char *msg);
239
240 DCMF_Protocol_t  cmi_dcmf_short_registration __attribute__((__aligned__(16)));
241 DCMF_Protocol_t  cmi_dcmf_eager_registration __attribute__((__aligned__(16)));
242 DCMF_Protocol_t  cmi_dcmf_rzv_registration   __attribute__((__aligned__(16)));
243 DCMF_Protocol_t  cmi_dcmf_multicast_registration   __attribute__((__aligned__(16)));
244
245 #define BGP_USE_RDMA 1
246 /*#define CMI_DIRECT_DEBUG 1*/
247 #ifdef BGP_USE_RDMA
248
249
250 DCMF_Protocol_t  cmi_dcmf_direct_registration __attribute__((__aligned__(16)));
251 /** The receive side of a put implemented in DCMF_Send */
252
253
254 typedef struct {
255     void *recverBuf;
256   void (*callbackFnPtr)(void *);
257     void *callbackData;
258     DCMF_Request_t *DCMF_rq_t;
259 } dcmfDirectMsgHeader;
260
261 /* nothing for us to do here */
262 #if (DCMF_VERSION_MAJOR >= 2)
263 void direct_send_done_cb(void*nothing, DCMF_Error_t *err) 
264 #else 
265   void direct_send_done_cb(void*nothing) 
266 #endif
267 {
268 #if CMI_DIRECT_DEBUG
269   CmiPrintf("[%d] RDMA send_done_cb\n", CmiMyPe());
270 #endif
271 }
272
273 DCMF_Callback_t  directcb;
274
275 void     direct_short_pkt_recv (void             * clientdata,
276                                 const DCQuad     * info,
277                                 unsigned           count,
278                                 unsigned           senderrank,
279                                 const char       * buffer,
280                                 const unsigned     sndlen) {
281 #if CMI_DIRECT_DEBUG
282     CmiPrintf("[%d] RDMA direct_short_pkt_recv\n", CmiMyPe());
283 #endif
284     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
285     CmiMemcpy(msgHead->recverBuf, buffer, sndlen);
286     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
287 }
288
289
290 #if (DCMF_VERSION_MAJOR >= 2)
291 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
292 #else
293 typedef void (*cbhdlr) (void *);
294 #endif
295
296 DCMF_Request_t * direct_first_pkt_recv_done (void              * clientdata,
297         const DCQuad      * info,
298         unsigned            count,
299         unsigned            senderrank,
300         const unsigned      sndlen,
301         unsigned          * rcvlen,
302         char             ** buffer,
303         DCMF_Callback_t   * cb
304                                             ) {
305 #if CMI_DIRECT_DEBUG
306     CmiPrintf("[%d] RDMA direct_first_pkt_recv_done\n", CmiMyPe());
307 #endif
308     /* pull the data we need out of the header */
309     *rcvlen=sndlen;
310     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
311     cb->function= (cbhdlr)msgHead->callbackFnPtr;
312     cb->clientdata=msgHead->callbackData;
313     *buffer=msgHead->recverBuf;
314     return msgHead->DCMF_rq_t;
315 }
316
317
318 #endif
319
320 typedef struct msg_list {
321     char              * msg;
322     int                 size;
323     int                 destpe;
324     int               * pelist;
325     DCMF_Callback_t     cb;
326     DCQuad              info __attribute__((__aligned__(16)));
327     DCMF_Request_t      send __attribute__((__aligned__(16)));
328 } SMSG_LIST __attribute__((__aligned__(16)));
329
330 #define MAX_NUM_SMSGS   64
331 CpvDeclare(PCQueue, smsg_list_q);
332
333 static inline SMSG_LIST * smsg_allocate() {
334     SMSG_LIST *smsg = (SMSG_LIST *)PCQueuePop(CpvAccess(smsg_list_q));
335     if (smsg != NULL)
336         return smsg;
337
338     void * buf = malloc(sizeof(SMSG_LIST)); 
339     assert(buf!=NULL);
340     assert (((unsigned)buf & 0x0f) == 0);
341
342     return (SMSG_LIST *) buf;
343 }
344
345 static inline void smsg_free (SMSG_LIST *smsg) {
346     int size = PCQueueLength (CpvAccess(smsg_list_q));
347     if (size < MAX_NUM_SMSGS)
348         PCQueuePush (CpvAccess(smsg_list_q), (char *) smsg);
349     else
350         free (smsg);
351 }
352
353 typedef struct {
354     int sleepMs; /*Milliseconds to sleep while idle*/
355     int nIdles; /*Number of times we've been idle in a row*/
356     CmiState cs; /*Machine state*/
357 } CmiIdleState;
358
359 static CmiIdleState *CmiNotifyGetState(void) {
360     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
361     s->sleepMs=0;
362     s->nIdles=0;
363     s->cs=CmiGetState();
364     return s;
365 }
366
367
368 #if (DCMF_VERSION_MAJOR >= 2)
369 static void send_done(void *data, DCMF_Error_t *err) 
370 #else 
371 static void send_done(void *data) 
372 #endif
373 /* send done callback: sets the smsg entry to done */
374 {
375     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
376     CmiFree(msg_tmp->msg);
377     //free(data);
378     smsg_free (msg_tmp);
379
380     msgQueueLen--;
381 }
382
383 #if (DCMF_VERSION_MAJOR >= 2)
384 static void send_multi_done(void *data, DCMF_Error_t *err) 
385 #else 
386 static void send_multi_done(void *data) 
387 #endif
388 /* send done callback: sets the smsg entry to done */
389 {
390     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
391     CmiFree(msg_tmp->msg);
392     free(msg_tmp->pelist);
393
394     smsg_free(msg_tmp);
395
396     msgQueueLen--;
397 }
398
399
400 #if (DCMF_VERSION_MAJOR >= 2)
401 static void recv_done(void *clientdata, DCMF_Error_t * err) 
402 #else 
403 static void recv_done(void *clientdata) 
404 #endif
405 /* recv done callback: push the recved msg to recv queue */
406 {
407
408     char *msg = (char *) clientdata;
409     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
410
411     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
412
413     /* then we do what PumpMsgs used to do:
414      * push msg to recv queue */
415     int count=0;
416     CMI_CHECK_CHECKSUM(msg, sndlen);
417     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
418         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
419         return;
420     }
421
422 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
423     if (CMI_IS_BCAST_ON_CORES(msg) ) {
424         int pe = CMI_DEST_RANK(msg);
425
426         //printf ("%d: Receiving bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, pe);
427
428         char *copymsg;
429         copymsg = (char *)CmiAlloc(sndlen);
430         CmiMemcpy(copymsg,msg,sndlen);
431
432         //received_broadcast = 1;
433 #if CMK_SMP
434         CmiLock(procState[pe].bcastLock);
435         PCQueuePush(CpvAccessOther(broadcast_q, pe), copymsg);
436         CmiUnlock(procState[pe].bcastLock);
437 #else
438         PCQueuePush(CpvAccess(broadcast_q), copymsg);
439 #endif
440     }
441 #endif
442
443 #if CMK_NODE_QUEUE_AVAILABLE
444 #if CMK_BROADCAST_SPANNING_TREE
445     if (CMI_IS_BCAST_ON_NODES(msg)) {
446         //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
447         char *copymsg = (char *)CmiAlloc(sndlen);
448         CmiMemcpy(copymsg,msg,sndlen);
449         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
450         CmiLock(CsvAccess(node_bcastLock));
451         PCQueuePush(CsvAccess(node_bcastq), copymsg);
452         CmiUnlock(CsvAccess(node_bcastLock));
453         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
454     }
455 #endif
456     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
457         CmiPushNode(msg);
458     else
459 #endif
460
461 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
462         if (CMI_DEST_RANK(msg)<_Cmi_mynodesize) { //not the comm thd
463             CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
464         }
465 #else
466         CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
467 #endif
468
469     outstanding_recvs --;
470 }
471
472
473 void     short_pkt_recv (void             * clientdata,
474                          const DCQuad     * info,
475                          unsigned           count,
476                          unsigned           senderrank,
477                          const char       * buffer,
478                          const unsigned     sndlen) {
479     outstanding_recvs ++;
480     int alloc_size = sndlen;
481
482     char * new_buffer = (char *)CmiAlloc(alloc_size);
483     CmiMemcpy (new_buffer, buffer, sndlen);
484     
485 #if (DCMF_VERSION_MAJOR >= 2)
486     recv_done (new_buffer, NULL);
487 #else
488     recv_done (new_buffer);
489 #endif
490 }
491
492
493 DCMF_Request_t * first_multi_pkt_recv_done (const DCQuad      * info,
494                                             unsigned            count,
495                                             unsigned            senderrank,                                    
496                                             const unsigned      sndlen,
497                                             unsigned            connid,
498                                             void              * clientdata,
499                                             unsigned          * rcvlen,
500                                             char             ** buffer,
501                                             unsigned          * pw,
502                                             DCMF_Callback_t   * cb
503                                             ) {
504     outstanding_recvs ++;
505     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
506
507     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
508
509     /* printf ("Receiving %d bytes\n", sndlen); */
510     *rcvlen = sndlen;  /* to avoid malloc(0) which might
511                                    return NULL */
512
513     *buffer = (char *)CmiAlloc(alloc_size);
514     cb->function = recv_done;
515     cb->clientdata = *buffer;
516
517     *pw  = 0x7fffffff;
518     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
519 }
520
521
522 DCMF_Request_t * first_pkt_recv_done (void              * clientdata,
523                                       const DCQuad      * info,
524                                       unsigned            count,
525                                       unsigned            senderrank,
526                                       const unsigned      sndlen,
527                                       unsigned          * rcvlen,
528                                       char             ** buffer,
529                                       DCMF_Callback_t   * cb
530                                      ) {
531     outstanding_recvs ++;
532     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
533
534     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
535
536     /* printf ("Receiving %d bytes\n", sndlen); */
537     *rcvlen = sndlen;  /* to avoid malloc(0) which might
538                                    return NULL */
539
540     *buffer = (char *)CmiAlloc(alloc_size);
541     cb->function = recv_done;
542     cb->clientdata = *buffer;
543
544     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
545 }
546
547
548 #if CMK_NODE_QUEUE_AVAILABLE
549 void sendBroadcastMessagesNode() {
550     if (PCQueueLength(CsvAccess(node_bcastq))==0) return;
551     //node broadcast message could be always handled by any cores (including
552     //comm thd) on this node
553     //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
554     CmiLock(CsvAccess(node_bcastLock));
555     char *msg = PCQueuePop(CsvAccess(node_bcastq));
556     CmiUnlock(CsvAccess(node_bcastLock));
557     //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
558     while (msg) {
559 #if CMK_BROADCAST_SPANNING_TREE
560         //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
561         SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
562 #endif
563         CmiFree(msg);
564         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
565         CmiLock(CsvAccess(node_bcastLock));
566         msg = PCQueuePop(CsvAccess(node_bcastq));
567         CmiUnlock(CsvAccess(node_bcastLock));
568         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
569     }
570 }
571 #endif
572
573 void sendBroadcastMessages() {
574 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
575 //in the presence of comm thd, and it is not responsible for broadcasting msg,
576 //the comm thd will help to pull the msg from rank 0 which is predefined as the
577 //core on a smp node to receive bcast msg.
578     int toPullRank = CmiMyRank();
579     if (CmiMyRank()==_Cmi_mynodesize) toPullRank = 0; //comm thd only pulls msg from rank 0
580 #else
581     int toPullRank = CmiMyRank();
582 #endif
583     PCQueue toPullQ;
584
585     /*
586     if(CmiMyRank()==_Cmi_mynodesize)
587       printf("Comm thd on node [%d] is pulling bcast msg\n", CmiMyNode());
588     else
589       printf("Work thd [%d] on node [%d] is pulling bcast msg\n", CmiMyRank(), CmiMyNode());
590     */
591 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
592     toPullQ = CpvAccessOther(broadcast_q, toPullRank);
593 #else
594     toPullQ = CpvAccess(broadcast_q);
595 #endif
596
597     if (PCQueueLength(toPullQ)==0) return;
598 #if CMK_SMP
599     CmiLock(procState[toPullRank].bcastLock);
600 #endif
601
602     char *msg = (char *) PCQueuePop(toPullQ);
603
604 #if CMK_SMP
605     CmiUnlock(procState[toPullRank].bcastLock);
606 #endif
607
608     while (msg) {
609
610 #if CMK_BROADCAST_SPANNING_TREE
611         SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
612 #elif CMK_BROADCAST_HYPERCUBE
613         SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
614 #endif
615
616         CmiFree (msg);
617
618 #if CMK_SMP
619         CmiLock(procState[toPullRank].bcastLock);
620 #endif
621
622 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
623         toPullQ = CpvAccessOther(broadcast_q, toPullRank);
624 #else
625         toPullQ = CpvAccess(broadcast_q);
626 #endif
627         msg = (char *) PCQueuePop(toPullQ);
628
629 #if CMK_SMP
630         CmiUnlock(procState[toPullRank].bcastLock);
631 #endif
632     }
633 }
634
635 CpvDeclare(unsigned, networkProgressCount);
636 int  networkProgressPeriod;
637
638 #if 0
639 unsigned int *ranklist;
640
641 BGTsC_t        barrier;
642
643 // -----------------------------------------
644 // Rectangular broadcast implementation
645 // -----------------------------------------
646
647 #define MAX_COMM  256
648 static void * comm_table [MAX_COMM];
649
650 typedef struct rectbcast_msg {
651     BGTsRC_t           request;
652     DCMF_Callback_t    cb;
653     char              *msg;
654 } RectBcastInfo;
655
656
657 static void bcast_done (void *data) {
658     RectBcastInfo *rinfo = (RectBcastInfo *) data;
659     CmiFree (rinfo->msg);
660     free (rinfo);
661 }
662
663 static  void *   getRectBcastRequest (unsigned comm) {
664     return comm_table [comm];
665 }
666
667
668 static  void *  bcast_recv     (unsigned               root,
669                                 unsigned               comm,
670                                 const unsigned         sndlen,
671                                 unsigned             * rcvlen,
672                                 char                ** rcvbuf,
673                                 DCMF_Callback_t      * const cb) {
674
675     int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
676
677     *rcvlen = sndlen;  /* to avoid malloc(0) which might
678                                    return NULL */
679
680     *rcvbuf       =  (char *)CmiAlloc(alloc_size);
681     cb->function  =   recv_done;
682     cb->clientdata = *rcvbuf;
683
684     return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
685
686 }
687
688
689 extern void bgl_machine_RectBcast (unsigned                 commid,
690                                        const char             * sndbuf,
691                                        unsigned                 sndlen) {
692     RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));
693     rinfo->cb.function    =   bcast_done;
694     rinfo->cb.clientdata  =   rinfo;
695
696     BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
697
698 }
699
700 extern void        bgl_machine_RectBcastInit  (unsigned               commID,
701             const BGTsRC_Geometry_t* geometry) {
702
703     CmiAssert (commID < 256);
704     CmiAssert (comm_table [commID] == NULL);
705
706     BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
707     comm_table [commID] = request;
708
709     BGTsRC_AsyncBcast_init  (request, commID,  geometry);
710 }
711
712
713
714
715 //--------------------------------------------------------------
716 //----- End Rectangular Broadcast Implementation ---------------
717 //--------------------------------------------------------------
718 #endif
719
720 //approx sleep command
721 void mysleep (int cycles) {
722     unsigned long long start = DCMF_Timebase();
723     unsigned long long end = start + cycles;
724
725     while (start < end)
726         start = DCMF_Timebase();
727
728     return;
729 }
730
731 static void * test_buf;
732
733 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
734     int n, i, count;
735
736     //fprintf(stderr, "Initializing Converse Blue Gene/P machine Layer\n");
737
738     DCMF_Messager_initialize();
739
740 #if CMK_SMP
741     DCMF_Configure_t  config_in, config_out;
742     config_in.thread_level= DCMF_THREAD_MULTIPLE;
743     config_in.interrupts  = DCMF_INTERRUPTS_OFF;
744
745     DCMF_Messager_configure(&config_in, &config_out);
746     //assert (config_out.thread_level == DCMF_THREAD_MULTIPLE); //not supported in vn mode
747 #endif
748
749     DCMF_Send_Configuration_t short_config, eager_config, rzv_config;
750
751
752     short_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
753     short_config.cb_recv_short = short_pkt_recv;
754     short_config.cb_recv       = first_pkt_recv_done;
755
756 #if (DCMF_VERSION_MAJOR >= 3)
757     short_config.network  = DCMF_DEFAULT_NETWORK;
758 #elif (DCMF_VERSION_MAJOR == 2)
759     short_config.network  = DCMF_DefaultNetwork;
760 #endif
761
762     eager_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
763     eager_config.cb_recv_short = short_pkt_recv;
764     eager_config.cb_recv       = first_pkt_recv_done;
765 #if (DCMF_VERSION_MAJOR >= 3)
766     eager_config.network  = DCMF_DEFAULT_NETWORK;
767 #elif (DCMF_VERSION_MAJOR == 2)
768     eager_config.network  = DCMF_DefaultNetwork;
769 #endif
770
771 #ifdef  OPT_RZV
772 #warning "Enabling Optimize Rzv"
773     rzv_config.protocol        = DCMF_RZV_SEND_PROTOCOL;
774 #else
775     rzv_config.protocol        = DCMF_DEFAULT_SEND_PROTOCOL;
776 #endif
777     rzv_config.cb_recv_short   = short_pkt_recv;
778     rzv_config.cb_recv         = first_pkt_recv_done;
779 #if (DCMF_VERSION_MAJOR >= 3)
780     rzv_config.network  = DCMF_DEFAULT_NETWORK;
781 #elif (DCMF_VERSION_MAJOR == 2)
782     rzv_config.network  = DCMF_DefaultNetwork;
783 #endif
784
785     DCMF_Send_register (&cmi_dcmf_short_registration, &short_config);
786     DCMF_Send_register (&cmi_dcmf_eager_registration, &eager_config);
787     DCMF_Send_register (&cmi_dcmf_rzv_registration,   &rzv_config);
788
789 #ifdef BGP_USE_RDMA
790     DCMF_Send_Configuration_t direct_config;
791     direct_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
792     direct_config.cb_recv_short = direct_short_pkt_recv;
793     direct_config.cb_recv       = direct_first_pkt_recv_done;
794 #if (DCMF_VERSION_MAJOR >= 3)
795     direct_config.network  = DCMF_DEFAULT_NETWORK;
796 #elif (DCMF_VERSION_MAJOR == 2)
797     direct_config.network  = DCMF_DefaultNetwork;
798 #endif
799     DCMF_Send_register (&cmi_dcmf_direct_registration,   &direct_config);
800     directcb.function=direct_send_done_cb;
801     directcb.clientdata=NULL;
802 #endif
803
804     //fprintf(stderr, "Initializing Eager Protocol\n");
805
806     _Cmi_numnodes = DCMF_Messager_size();
807     _Cmi_mynode = DCMF_Messager_rank();
808
809     unsigned rank = DCMF_Messager_rank();
810     unsigned size = DCMF_Messager_size();
811
812     CmiBarrier();
813     CmiBarrier();
814     CmiBarrier();
815
816     /* processor per node */
817     _Cmi_mynodesize = 1;
818     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
819 #if ! CMK_SMP
820     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
821         CmiAbort("+ppn cannot be used in non SMP version!\n");
822 #endif
823
824     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
825     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
826     Cmi_argvcopy = CmiCopyArgs(argv);
827     Cmi_argv = argv;
828     Cmi_startfn = fn;
829     Cmi_usrsched = usched;
830
831     //printf ("Starting Charm with %d nodes and %d processors\n", CmiNumNodes(), CmiNumPes());
832
833     DCMF_Multicast_Configuration_t mconfig;
834     mconfig.protocol = DCMF_MEMFIFO_DMA_MSEND_PROTOCOL;
835     mconfig.cb_recv  = first_multi_pkt_recv_done;
836     mconfig.clientdata = NULL;
837     mconfig.connectionlist = (void **) malloc (CmiNumPes() * sizeof(unsigned long));
838     mconfig.nconnections = CmiNumPes();  
839     DCMF_Multicast_register(&cmi_dcmf_multicast_registration, &mconfig);
840
841
842     /* find dim = log2(numpes), to pretend we are a hypercube */
843     for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
844         Cmi_dim++ ;
845
846
847     /* checksum flag */
848     if (CmiGetArgFlag(argv,"+checksum")) {
849 #if !CMK_OPTIMIZE
850         checksum_flag = 1;
851         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
852 #else
853         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
854 #endif
855     }
856
857     CsvInitialize(CmiNodeState, NodeState);
858     CmiNodeStateInit(&CsvAccess(NodeState));
859
860 #if CMK_NODE_QUEUE_AVAILABLE
861     CsvInitialize(PCQueue, node_bcastq);
862     CsvAccess(node_bcastq) = PCQueueCreate();
863     CsvInitialize(CmiNodeLock, node_bcastLock);
864     CsvAccess(node_bcastLock) = CmiCreateLock();
865 #endif
866
867     int actualNodeSize = _Cmi_mynodesize;
868 #if !CMK_MULTICORE
869     actualNodeSize++; //considering the extra comm thread
870 #endif
871
872     procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
873     for (i=0; i<actualNodeSize; i++) {
874         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
875         procState[i].recvLock = CmiCreateLock();
876         procState[i].bcastLock = CmiCreateLock();
877     }
878
879 #if CMK_SMP && !CMK_MULTICORE
880     commThdExitLock = CmiCreateLock();
881 #endif
882
883     /* Network progress function is used to poll the network when for
884        messages. This flushes receive buffers on some  implementations*/
885     networkProgressPeriod = PROGRESS_PERIOD;
886     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
887
888     //printf ("Starting Threads\n");
889
890     CmiStartThreads(argv);
891
892     ConverseRunPE(initret);
893 }
894
895
896 int PerrorExit (char *err) {
897     fprintf (stderr, "err\n\n");
898     exit (-1);
899     return -1;
900 }
901
902
903 void ConverseRunPE(int everReturn) {
904     //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
905
906     CmiIdleState *s=CmiNotifyGetState();
907     CmiState cs;
908     char** CmiMyArgv;
909     CmiNodeAllBarrier();
910
911     cs = CmiGetState();
912     CpvInitialize(void *,CmiLocalQueue);
913     CpvAccess(CmiLocalQueue) = cs->localqueue;
914
915     if (CmiMyRank())
916         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
917     else
918         CmiMyArgv=Cmi_argv;
919
920     CthInit(CmiMyArgv);
921
922     //printf ("Before Converse Common Init\n");
923     ConverseCommonInit(CmiMyArgv);
924
925     /* initialize the network progress counter*/
926     /* Network progress function is used to poll the network when for
927        messages. This flushes receive buffers on some  implementations*/
928     CpvInitialize(int , networkProgressCount);
929     CpvAccess(networkProgressCount) = 0;
930
931     CpvInitialize(PCQueue, broadcast_q);
932     CpvAccess(broadcast_q) = PCQueueCreate();
933
934     CpvInitialize(PCQueue, smsg_list_q);
935     CpvAccess(smsg_list_q) = PCQueueCreate();
936
937     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
938
939     CmiBarrier();
940
941     /* Converse initialization finishes, immediate messages can be processed.
942        node barrier previously should take care of the node synchronization */
943     _immediateReady = 1;
944
945     /* communication thread */
946     if (CmiMyRank() == CmiMyNodeSize()) {
947         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
948         while (1) CommunicationServer(5);
949     } else {
950         //printf ("Calling Start Fn and the scheduler \n");
951
952         if (!everReturn) {
953             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
954             if (Cmi_usrsched==0) CsdScheduler(-1);
955             ConverseExit();
956         }
957     }
958 }
959
960 #if CMK_SMP
961 static int inexit = 0;
962
963 /* test if all processors recv queues are empty */
964 static int RecvQueueEmpty() {
965     int i;
966     for (i=0; i<_Cmi_mynodesize; i++) {
967         CmiState cs=CmiGetStateN(i);
968         if (!PCQueueEmpty(cs->recv)) return 0;
969     }
970     return 1;
971 }
972
973 #endif
974
975
976 //extern void DCMF_Messager_dumpTimers();
977
978 void ConverseExit(void) {
979
980     while (msgQueueLen > 0 || outstanding_recvs > 0) {
981         AdvanceCommunications();
982     }
983
984     CmiNodeBarrier();
985     ConverseCommonExit();
986
987     //  if(CmiMyPe()%101 == 0)
988     //DCMF_Messager_dumpTimers();
989
990     if (CmiMyPe() == 0) {
991         printf("End of program\n");
992     }
993
994     CmiNodeBarrier();
995 //  CmiNodeAllBarrier ();
996
997 #if CMK_SMP && !CMK_MULTICORE
998     //CmiLock(commThdExitLock);
999     commThdExit = 1;
1000     //CmiUnlock(commThdExitLock);
1001
1002     _bgp_msync();
1003 #endif
1004
1005     int rank0 = 0;
1006
1007     if (CmiMyRank() == 0) {
1008         rank0 = 1;
1009         //CmiFree(procState);
1010         DCMF_Messager_finalize();
1011     }
1012
1013     CmiNodeBarrier();
1014 //  CmiNodeAllBarrier ();
1015
1016     if (rank0)
1017         exit(0);
1018     else
1019         pthread_exit(NULL);
1020 }
1021
1022 /* exit() called on any node would abort the whole program */
1023 void CmiAbort(const char * message) {
1024     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1025              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
1026              msgQueueLen, outstanding_recvs, message);
1027     //CmiPrintStackTrace(0);
1028
1029     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1030         AdvanceCommunications();
1031     }
1032
1033     CmiBarrier();
1034     assert (0);
1035 }
1036
1037 static void CommunicationServer(int sleepTime) {
1038 #if CMK_SMP && !CMK_MULTICORE
1039     //CmiLock(commThdExitLock);
1040     if (commThdExit) {
1041         while (msgQueueLen > 0 || outstanding_recvs > 0) {
1042             AdvanceCommunications();
1043         }
1044         CmiUnlock(commThdExitLock);
1045         pthread_exit(NULL);
1046         return;
1047     }
1048     //CmiUnlock(commThdExitLock);
1049 #endif
1050     AdvanceCommunications();
1051
1052 #if CMK_IMMEDIATE_MSG && CMK_SMP && !CMK_MULTICORE
1053     CmiHandleImmediate();
1054 #endif
1055
1056     //mysleep(sleepTime);
1057 }
1058
1059 static void CommunicationServerThread(int sleepTime) {
1060 #if CMK_SMP
1061     CommunicationServer(sleepTime);
1062 #endif
1063 //immediate msgs are handled in AdvanceCommunications
1064 }
1065
1066 #if CMK_NODE_QUEUE_AVAILABLE
1067 char *CmiGetNonLocalNodeQ(void) {
1068     CmiState cs = CmiGetState();
1069     char *result = 0;
1070     CmiIdleLock_checkMessage(&cs->idle);
1071     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1072         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1073
1074         if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1075             //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1076             result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1077             CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1078         }
1079
1080         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1081     }
1082     return result;
1083 }
1084 #endif
1085
1086
1087 void *CmiGetNonLocal() {
1088
1089     CmiState cs = CmiGetState();
1090
1091     void *msg = NULL;
1092     CmiIdleLock_checkMessage(&cs->idle);
1093     /* although it seems that lock is not needed, I found it crashes very often
1094        on mpi-smp without lock */
1095
1096 #if !CMK_SMP || CMK_MULTICORE  /*|| !BCASTMSG_ONLY_TO_COMMTHD*/
1097 //ChaoMei changes
1098     AdvanceCommunications();
1099 #endif
1100
1101     /*if(CmiMyRank()==0) printf("Got stuck here on proc[%d] node[%d]\n", CmiMyPe(), CmiMyNode());*/
1102
1103     if (PCQueueLength(cs->recv)==0) return NULL;
1104
1105 #if CMK_SMP
1106     CmiLock(procState[cs->rank].recvLock);
1107 #endif
1108
1109     msg =  PCQueuePop(cs->recv);
1110
1111 #if CMK_SMP
1112     CmiUnlock(procState[cs->rank].recvLock);
1113 #endif
1114
1115     return msg;
1116 }
1117
1118 static void CmiSendSelf(char *msg) {
1119 #if CMK_IMMEDIATE_MSG
1120     if (CmiIsImmediate(msg)) {
1121         /* CmiBecomeNonImmediate(msg); */
1122         //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1123         CmiPushImmediateMsg(msg);
1124 #if CMK_MULTICORE
1125         CmiHandleImmediate();
1126 #endif
1127         return;
1128     }
1129 #endif
1130     
1131     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1132 }
1133
1134 #if CMK_SMP
1135 static void CmiSendPeer (int rank, int size, char *msg) {
1136 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
1137     if (CMI_BROADCAST_ROOT(msg) != 0) {
1138         char *copymsg;
1139         copymsg = (char *)CmiAlloc(size);
1140         CmiMemcpy(copymsg,msg,size);
1141
1142         CmiLock(procState[rank].bcastLock);
1143         PCQueuePush(CpvAccessOther(broadcast_q, rank), copymsg);
1144         CmiUnlock(procState[rank].bcastLock);
1145     }
1146 #endif
1147     
1148     CmiPushPE (rank, msg);
1149 }
1150 #endif
1151
1152
1153 void machineSend(SMSG_LIST *msg_tmp) {    
1154
1155     if (msg_tmp->destpe == CmiMyNode())
1156         CmiAbort("Sending to self\n");
1157
1158     CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumNodes());
1159     msg_tmp->cb.function     =   send_done;
1160     msg_tmp->cb.clientdata   =   msg_tmp;
1161
1162     DCMF_Protocol_t *protocol = NULL;
1163
1164     if (msg_tmp->size < 224)
1165         protocol = &cmi_dcmf_short_registration;
1166     else if (msg_tmp->size < 2048)
1167         protocol = &cmi_dcmf_eager_registration;
1168     else
1169         protocol = &cmi_dcmf_rzv_registration;
1170
1171
1172 #if CMK_SMP
1173     DCMF_CriticalSection_enter (0);
1174 #endif
1175
1176     msgQueueLen ++;
1177     DCMF_Send (protocol, &msg_tmp->send, msg_tmp->cb,
1178                DCMF_MATCH_CONSISTENCY, msg_tmp->destpe,
1179                msg_tmp->size, msg_tmp->msg, &msg_tmp->info, 1);
1180
1181 /*    
1182     #if CMK_SMP && !CMK_MULTICORE
1183     //Adding this advance call here improves the SMP performance
1184     //a little bit although it is possible that some more bugs are
1185     //introduced
1186     DCMF_Messager_advance();
1187     #endif
1188 */
1189
1190 #if CMK_SMP
1191     DCMF_CriticalSection_exit (0);
1192 #endif
1193 }
1194
1195 #define MAX_MULTICAST 128
1196 DCMF_Opcode_t  CmiOpcodeList [MAX_MULTICAST];
1197
1198 void  machineMulticast(int npes, int *pelist, int size, char* msg){  
1199   CQdCreate(CpvAccess(cQdState), npes);
1200   
1201   CmiAssert (npes < MAX_MULTICAST);
1202
1203   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1204   ((CmiMsgHeaderBasic *)msg)->size = size;  
1205   CMI_SET_BROADCAST_ROOT(msg,0);
1206   CMI_SET_CHECKSUM(msg, size);
1207   
1208   SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1209   
1210   msg_tmp->destpe    = -1;      //multicast operation
1211   msg_tmp->size      = size * npes; //keep track of #bytes outstanding
1212   msg_tmp->msg       = msg;
1213   msg_tmp->pelist    = pelist;
1214   
1215   msgQueueLen ++;
1216   
1217   DCMF_Multicast_t  mcast_info __attribute__((__aligned__(16)));
1218   
1219   mcast_info.registration   = & cmi_dcmf_multicast_registration;
1220   mcast_info.request        = & msg_tmp->send;
1221   mcast_info.cb_done.function    =   send_multi_done;
1222   mcast_info.cb_done.clientdata  =   msg_tmp;
1223   mcast_info.consistency    =   DCMF_MATCH_CONSISTENCY;
1224   mcast_info.connection_id  =   CmiMyPe();
1225   mcast_info.bytes          =   size;
1226   mcast_info.src            =   msg;
1227   mcast_info.nranks         =   npes;
1228   mcast_info.ranks          =   (unsigned *)pelist;
1229   mcast_info.opcodes        =   CmiOpcodeList;   //static list of MAX_MULTICAST entires with 0 in them
1230   mcast_info.flags          =   0;
1231   mcast_info.msginfo        =   &msg_tmp->info;
1232   mcast_info.count          =   1;
1233
1234   DCMF_Multicast (&mcast_info);
1235 }
1236
1237
1238
1239 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg);
1240
1241 /* The general free send function
1242  * Send is synchronous, and free msg after posted
1243  */
1244 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1245
1246   if (destPE < 0 || destPE > CmiNumPes ())
1247     printf ("Sending to %d\n", destPE);
1248
1249   CmiAssert (destPE >= 0 && destPE < CmiNumPes());
1250
1251     CmiState cs = CmiGetState();
1252
1253     if (destPE==cs->pe) {
1254         CmiSendSelf(msg);
1255         return;
1256     }
1257
1258 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1259     //In the presence of comm thd which is responsible for sending bcast msgs, then
1260     //CmiNodeOf and CmiRankOf may return incorrect information if the destPE is considered
1261     //as a comm thd.
1262     if (destPE >= _Cmi_numpes) { //destination is a comm thd
1263         int nid = destPE - _Cmi_numpes;
1264         int rid = _Cmi_mynodesize;
1265         CmiGeneralFreeSendN (nid, rid, size, msg);
1266     } else {
1267         CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1268     }
1269 #else
1270     //printf ("%d: Sending Message to %d \n", CmiMyPe(), destPE);
1271     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1272 #endif
1273 }
1274
1275 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg) {
1276
1277     //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
1278     //  CmiMyNode(), node, rank);
1279
1280 #if CMK_SMP
1281     CMI_DEST_RANK(msg) = rank;
1282     //CMI_SET_CHECKSUM(msg, size);
1283
1284     if (node == CmiMyNode()) {
1285         CmiSendPeer (rank, size, msg);
1286         return;
1287     }
1288 #endif
1289
1290     SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1291     msg_tmp->destpe = node; //destPE;
1292     msg_tmp->size = size;
1293     msg_tmp->msg = msg;
1294
1295     machineSend(msg_tmp);
1296 }
1297
1298 void CmiSyncSendFn(int destPE, int size, char *msg) {
1299     char *copymsg;
1300     copymsg = (char *)CmiAlloc(size);
1301     CmiMemcpy(copymsg,msg,size);
1302     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1303     CmiFreeSendFn(destPE,size,copymsg);
1304 }
1305
1306 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1307     CQdCreate(CpvAccess(cQdState), 1);
1308     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1309
1310     CMI_SET_BROADCAST_ROOT(msg,0);
1311     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1312     ((CmiMsgHeaderBasic *)msg)->size = size;
1313     CMI_SET_CHECKSUM(msg, size);
1314
1315     CmiGeneralFreeSend(destPE,size,msg);
1316 }
1317
1318 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1319 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1320     char *copymsg;
1321     copymsg = (char *)CmiAlloc(size);
1322     CmiMemcpy(copymsg, msg, size);
1323
1324     //  asm volatile("sync" ::: "memory");
1325
1326     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1327     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1328     CMI_SET_CHECKSUM(copymsg, size);
1329
1330     CmiGeneralFreeSend(destPE,size,copymsg);
1331 }
1332
1333 #define NODE_LEVEL_ST_IMPLEMENTATION 1
1334 #if NODE_LEVEL_ST_IMPLEMENTATION
1335 //send msgs to other ranks except the rank specified in the argument
1336 static void CmiSendChildrenPeers(int rank, int size, char *msg) {
1337     //printf ("%d [%d]: Send children peers except rank %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank());
1338     int r=0;
1339
1340 //With comm thd, broadcast msg will be pulled from bcast queue from the comm thd
1341 //And the msg would be finally pushed into other cores on the same node by the
1342 //comm thd. But it's possible a msg has already been pushed into rank 0 in
1343 //recv_done func call. So there's no need to push the bcast msg into the recv
1344 //queue again in the case BCASTMSG_ONLY_TO_COMMTHD is not set
1345
1346 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
1347     //indicate this is called from comm thread.
1348     if (rank == _Cmi_mynodesize) r = 1;
1349 #endif
1350
1351     for (;r<rank; r++) {
1352         char *copymsg;
1353         copymsg = (char *)CmiAlloc(size);
1354         CmiMemcpy(copymsg,msg,size);        
1355         CmiPushPE (r, copymsg);
1356     }
1357
1358     for (r=rank+1; r<_Cmi_mynodesize; r++) {
1359         char *copymsg;
1360         copymsg = (char *)CmiAlloc(size);
1361         CmiMemcpy(copymsg,msg,size);        
1362         CmiPushPE (r, copymsg);
1363     }
1364 }
1365
1366 //In this implementation, msgs are first sent out to the comm thd or rank 0 of other nodes
1367 //then send msgs to the other cores on the same node
1368 void SendSpanningChildren(int size, char *msg) {
1369     CmiState cs = CmiGetState();
1370     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1371     int i;
1372
1373     //printf ("%d [%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), startpe);
1374
1375     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1376
1377     int startNid = CmiNodeOf(startpe);
1378     int thisNid, thisRid;
1379     thisNid = CmiMyNode();
1380     thisRid = CmiMyRank();
1381
1382     //printf ("%d [%d/%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), thisRid, startpe);
1383
1384     //Step1: send to cores that has comm thd on other nodes
1385     int dist = thisNid - startNid;
1386     if (dist<0) dist += _Cmi_numnodes;
1387     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1388         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1389         if (nid > _Cmi_numnodes - 1) break;
1390         nid += startNid;
1391         nid = nid%_Cmi_numnodes;
1392         CmiAssert(nid>=0 && nid<_Cmi_numnodes && nid!=thisNid);
1393 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1394         int p = nid + _Cmi_numpes;
1395 #else
1396         int p = CmiNodeFirst(nid);
1397 #endif
1398         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1399         CmiSyncSendFn1(p, size, msg);
1400     }
1401
1402     //Step2: send to other cores (i.e. excluding myself cs->pe) on the same nodes (just a flat send)
1403     CmiSendChildrenPeers(thisRid, size, msg);
1404
1405 #if !CMK_SMP
1406 #if ENABLE_BROADCAST_THROTTLE
1407     SendMsgsUntil (0);
1408 #endif
1409 #endif
1410 }
1411 #else
1412 /* send msg to its spanning children in broadcast. G. Zheng */
1413 void SendSpanningChildren(int size, char *msg) {
1414     CmiState cs = CmiGetState();
1415     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1416     int i;
1417
1418     //printf ("%d [%d]: In Send Spanning Tree\n",  CmiMyPe(), CmiMyNode());
1419
1420     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1421     int dist = cs->pe-startpe;
1422     if (dist<0) dist+=_Cmi_numpes;
1423     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1424         int p = BROADCAST_SPANNING_FACTOR*dist + i;
1425         if (p > _Cmi_numpes - 1) break;
1426         p += startpe;
1427         p = p%_Cmi_numpes;
1428         CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1429
1430         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1431         CmiSyncSendFn1(p, size, msg);
1432     }    
1433 }
1434 #endif
1435
1436 /* send msg along the hypercube in broadcast. (Sameer) */
1437 void SendHypercube(int size, char *msg) {
1438     CmiState cs = CmiGetState();
1439     int curcycle = CMI_GET_CYCLE(msg);
1440     int i;
1441
1442     double logp = CmiNumPes();
1443     logp = log(logp)/log(2.0);
1444     logp = ceil(logp);
1445
1446     /*  CmiPrintf("In hypercube\n"); */
1447     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1448
1449     for (i = curcycle; i < logp; i++) {
1450         int p = cs->pe ^ (1 << i);
1451         /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1452         if (p < CmiNumPes()) {
1453             CMI_SET_CYCLE(msg, i + 1);
1454
1455             CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1456             CmiSyncSendFn1(p, size, msg);
1457         }
1458     }
1459 }
1460
1461 void CmiSyncBroadcastFn(int size, char *msg) {
1462     char *copymsg;
1463     copymsg = (char *)CmiAlloc(size);
1464     CmiMemcpy(copymsg,msg,size);
1465     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastFn on comm thd on node %d\n", CmiMyNode());
1466     CmiFreeBroadcastFn(size,copymsg);
1467 }
1468
1469 void CmiFreeBroadcastFn(int size, char *msg) {
1470
1471     //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1472
1473     CmiState cs = CmiGetState();
1474 #if CMK_BROADCAST_SPANNING_TREE    
1475     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1476     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastFn on comm thd on node %d\n", CmiMyNode());
1477
1478     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1479
1480     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1481     SendSpanningChildren(size, msg);
1482     CmiFree(msg);
1483 #elif CMK_BROADCAST_HYPERCUBE    
1484     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1485
1486     CMI_SET_CYCLE(msg, 0);
1487     SendHypercube(size, msg);
1488     CmiFree(msg);
1489 #else
1490     int i;
1491
1492     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1493         CmiSyncSendFn(i,size,msg);
1494
1495     for ( i=0; i<cs->pe; i++ )
1496         CmiSyncSendFn(i,size,msg);
1497
1498     CmiFree(msg);
1499 #endif
1500 }
1501
1502 void CmiSyncBroadcastAllFn(int size, char *msg) {
1503     char *copymsg;
1504     copymsg = (char *)CmiAlloc(size);
1505     CmiMemcpy(copymsg,msg,size);
1506     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1507     CmiFreeBroadcastAllFn(size,copymsg);
1508 }
1509
1510 void CmiFreeBroadcastAllFn(int size, char *msg) {
1511
1512     //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1513
1514     CmiState cs = CmiGetState();
1515 #if CMK_BROADCAST_SPANNING_TREE
1516
1517     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1518     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1519
1520     CmiSyncSendFn(cs->pe,size,msg);
1521     
1522     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1523
1524     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1525     SendSpanningChildren(size, msg);
1526     CmiFree(msg);
1527
1528 #elif CMK_BROADCAST_HYPERCUBE
1529     CmiSyncSendFn(cs->pe,size,msg);
1530     
1531     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1532
1533     CMI_SET_CYCLE(msg, 0);
1534     SendHypercube(size, msg);
1535     CmiFree(msg);
1536 #else
1537     int i ;
1538
1539     DCMF_CriticalSection_enter (0);
1540
1541     for ( i=0; i<_Cmi_numpes; i++ ) {
1542         CmiSyncSendFn(i,size,msg);
1543
1544         if ( (i % 32) == 0 )
1545             SendMsgsUntil (0);
1546     }
1547
1548     DCMF_CriticalSection_exit (0);
1549
1550     CmiFree(msg);
1551 #endif
1552 }
1553
1554 void AdvanceCommunications() {
1555
1556 #if CMK_SMP
1557     DCMF_CriticalSection_enter (0);
1558 #endif
1559
1560     while(DCMF_Messager_advance()>0);
1561     //DCMF_Messager_advance();
1562
1563 #if CMK_SMP
1564     DCMF_CriticalSection_exit (0);
1565 #endif
1566
1567     sendBroadcastMessages();
1568 #if CMK_NODE_QUEUE_AVAILABLE
1569     sendBroadcastMessagesNode();
1570 #endif
1571
1572 #if CMK_IMMEDIATE_MSG && CMK_MULTICORE
1573     CmiHandleImmediate();
1574 #endif
1575 }
1576
1577
1578 static void SendMsgsUntil(int targetm) {
1579
1580     while (msgQueueLen>targetm) {
1581       //AdvanceCommunications ();
1582 #if CMK_SMP
1583       DCMF_CriticalSection_enter (0);
1584 #endif
1585       
1586       while(DCMF_Messager_advance()>0);
1587       //DCMF_Messager_advance();
1588       
1589 #if CMK_SMP
1590       DCMF_CriticalSection_exit (0);
1591 #endif
1592     }
1593 }
1594
1595 void CmiNotifyIdle() {
1596 #if !CMK_SMP || CMK_MULTICORE
1597     AdvanceCommunications();
1598 #endif
1599 }
1600
1601
1602 /*==========================================================*/
1603 /*==========================================================*/
1604 /*==========================================================*/
1605
1606 /************ Recommended routines ***********************/
1607 /************ You dont have to implement these but they are supported
1608  in the converse syntax and some rare programs may crash. But most
1609  programs dont need them. *************/
1610
1611 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1612     CmiAbort("CmiAsyncSendFn not implemented.");
1613     return (CmiCommHandle) 0;
1614 }
1615
1616 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1617     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1618     return (CmiCommHandle) 0;
1619 }
1620
1621 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1622     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1623     return (CmiCommHandle) 0;
1624 }
1625
1626 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1627     CmiAbort("CmiAsyncMsgSent not implemented.");
1628     return 0;
1629 }
1630 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1631     CmiAbort("CmiReleaseCommHandle not implemented.");
1632 }
1633
1634
1635 /*==========================================================*/
1636 /*==========================================================*/
1637 /*==========================================================*/
1638
1639 /* Optional routines which could use common code which is shared with
1640    other machine layer implementations. */
1641
1642 /* MULTICAST/VECTOR SENDING FUNCTIONS
1643
1644  * In relations to some flags, some other delivery functions may be needed.
1645  */
1646
1647 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1648
1649 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1650     char *copymsg;
1651     copymsg = (char *)CmiAlloc(size);
1652     CmiMemcpy(copymsg,msg,size);
1653     CmiFreeListSendFn(npes, pes, size, msg);
1654 }
1655
1656 //#define OPTIMIZED_MULTICAST  0
1657
1658 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1659 #if CMK_SMP && !CMK_MULTICORE
1660     //DCMF_CriticalSection_enter (0);
1661 #endif
1662     CMI_SET_BROADCAST_ROOT(msg,0);
1663     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1664     ((CmiMsgHeaderBasic *)msg)->size = size;
1665     CMI_SET_CHECKSUM(msg, size);
1666
1667     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeListSendFn on comm thd on node %d\n", CmiMyNode());
1668
1669     //printf("%d: In Free List Send Fn\n", CmiMyPe());
1670     int new_npes = 0;
1671
1672     int i, count = 0, my_loc = -1;
1673     for (i=0; i<npes; i++) {
1674         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode()) {
1675         if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1676             CmiSyncSend(pes[i], size, msg);
1677             //my_loc = i;
1678         }
1679     }
1680
1681 #if OPTIMIZED_MULTICAST
1682 #warning "Using Optimized Multicast"
1683     if (npes > 1) {    
1684       int *newpelist = (int *) malloc (sizeof(int) * npes);
1685       int new_npes = 0;
1686     
1687       for(i=0; i<npes; i++) {
1688         if(CmiNodeOf(pes[i]) == CmiMyNode()) 
1689           continue;
1690         else
1691           newpelist[new_npes++] = pes[i];
1692       }
1693
1694       if (new_npes >= 1)
1695         machineMulticast (new_npes, newpelist, size, msg);
1696       else
1697         CmiFree (msg);
1698       return;
1699     }
1700 #endif
1701
1702     for (i=0;i<npes;i++) {
1703         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode());
1704         if (CmiNodeOf(pes[i]) == CmiMyNode());
1705         else if (i < npes - 1) {
1706 #if !CMK_SMP /*|| (CMK_SMP && !CMK_MULTICORE)*/
1707             CmiReference(msg);
1708             CmiGeneralFreeSend(pes[i], size, msg);
1709 #else
1710             CmiSyncSend(pes[i], size, msg);
1711 #endif
1712         }
1713     }
1714
1715     //if (npes  && (pes[npes-1] != CmiMyPe() && CmiNodeOf(pes[i]) != CmiMyNode()))
1716     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1717         CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
1718     else
1719         CmiFree(msg);
1720
1721     //AdvanceCommunications();
1722 #if CMK_SMP && !CMK_MULTICORE
1723     //DCMF_CriticalSection_exit (0);
1724 #endif
1725 }
1726
1727 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1728     CmiAbort("CmiAsyncListSendFn not implemented.");
1729     return (CmiCommHandle) 0;
1730 }
1731 #endif
1732
1733 /** NODE SENDING FUNCTIONS
1734
1735  * If there is a node queue, and we consider also nodes as entity (tipically in
1736  * SMP versions), these functions are needed.
1737  */
1738
1739 #if CMK_NODE_QUEUE_AVAILABLE
1740
1741 void          CmiSyncNodeSendFn(int, int, char *);
1742 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1743 void          CmiFreeNodeSendFn(int, int, char *);
1744
1745 void          CmiSyncNodeBroadcastFn(int, char *);
1746 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1747 void          CmiFreeNodeBroadcastFn(int, char *);
1748
1749 void          CmiSyncNodeBroadcastAllFn(int, char *);
1750 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1751 void          CmiFreeNodeBroadcastAllFn(int, char *);
1752
1753 #endif
1754
1755
1756 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1757
1758 int CmiMyPe();
1759 int CmiMyRank();
1760 int CmiNodeFirst(int node);
1761 int CmiNodeSize(int node);
1762 int CmiNodeOf(int pe);
1763 int CmiRankOf(int pe);
1764
1765 int CmiMyPe(void) {
1766     return CmiGetState()->pe;
1767 }
1768
1769 int CmiMyRank(void) {
1770     return CmiGetState()->rank;
1771 }
1772
1773 int CmiNodeFirst(int node) {
1774     return node*_Cmi_mynodesize;
1775 }
1776 int CmiNodeSize(int node)  {
1777     return _Cmi_mynodesize;
1778 }
1779
1780 int CmiNodeOf(int pe)      {
1781     return (pe/_Cmi_mynodesize);
1782 }
1783 int CmiRankOf(int pe)      {
1784     return pe%_Cmi_mynodesize;
1785 }
1786
1787
1788 /* optional, these functions are implemented in "machine-smp.c", so including
1789    this file avoid the necessity to reimplement them.
1790  */
1791 void CmiNodeBarrier(void);
1792 void CmiNodeAllBarrier(void);
1793 CmiNodeLock CmiCreateLock();
1794 void CmiDestroyLock(CmiNodeLock lock);
1795
1796 #endif
1797
1798 /** IMMEDIATE MESSAGES
1799
1800  * If immediate messages are supported, the following function is needed. There
1801  * is an exeption if the machine progress is also defined (see later for this).
1802
1803  * Moreover, the file "immediate.c" should be included, otherwise all its
1804  * functions and variables have to be redefined.
1805 */
1806
1807 #if CMK_CCS_AVAILABLE
1808
1809 #include "immediate.c"
1810
1811 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1812 void CmiProbeImmediateMsg();
1813 #endif
1814
1815 #endif
1816
1817
1818 /** MACHINE PROGRESS DEFINED
1819
1820  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1821  * to be pulled out of the network manually. For this reason the following
1822  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1823  * not be defined anymore.
1824  */
1825
1826 #if CMK_MACHINE_PROGRESS_DEFINED
1827
1828
1829
1830 void CmiMachineProgressImpl() {
1831
1832 #if !CMK_SMP
1833     AdvanceCommunications();
1834 #else
1835     /*Not implemented yet. Communication server does not seem to be
1836       thread safe */
1837 #endif
1838 }
1839
1840 #endif
1841
1842 /* Dummy implementation */
1843 extern int CmiBarrier() {
1844     //Use DCMF barrier later
1845 }
1846
1847 #if CMK_NODE_QUEUE_AVAILABLE
1848 static void CmiSendNodeSelf(char *msg) {
1849 #if CMK_IMMEDIATE_MSG
1850     if (CmiIsImmediate(msg)) {
1851         //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1852         CmiPushImmediateMsg(msg);
1853 #if CMK_MULTICORE
1854         CmiHandleImmediate();
1855 #endif
1856         return;
1857     }
1858 #endif    
1859     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1860     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1861     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1862 }
1863
1864 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1865     CmiAbort ("Async Node Send not supported\n");
1866 }
1867
1868 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1869
1870     CMI_SET_BROADCAST_ROOT(msg,0);
1871     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1872     ((CmiMsgHeaderBasic *)msg)->size = size;
1873     CMI_SET_CHECKSUM(msg, size);
1874     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1875     
1876     CQdCreate(CpvAccess(cQdState), 1);
1877
1878     if (node == _Cmi_mynode) {
1879         CmiSendNodeSelf(msg);
1880     } else {
1881         CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg);
1882     }
1883 }
1884
1885 void CmiSyncNodeSendFn(int p, int s, char *m) {
1886     char *dupmsg;
1887     dupmsg = (char *)CmiAlloc(s);
1888     CmiMemcpy(dupmsg,m,s);
1889     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1890     CmiFreeNodeSendFn(p, s, dupmsg);
1891 }
1892
1893 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1894     return NULL;
1895 }
1896
1897 void SendSpanningChildrenNode(int size, char *msg) {
1898     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1899     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1900     assert(startnode>=0 && startnode<CmiNumNodes());
1901
1902     int dist = CmiMyNode()-startnode;
1903     if (dist<0) dist += CmiNumNodes();
1904     int i;
1905     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1906         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1907         if (nid > CmiNumNodes() - 1) break;
1908         nid += startnode;
1909         nid = nid%CmiNumNodes();
1910         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1911         char *dupmsg = (char *)CmiAlloc(size);
1912         CmiMemcpy(dupmsg,msg,size);
1913         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1914         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg);
1915     }
1916 }
1917
1918 /* need */
1919 void CmiFreeNodeBroadcastFn(int s, char *m) {
1920 #if CMK_BROADCAST_SPANNING_TREE
1921     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1922     
1923     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1924
1925     int mynode = CmiMyNode();
1926     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1927     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1928     ((CmiMsgHeaderBasic *)m)->size = s;
1929     CMI_SET_CHECKSUM(m, s);
1930     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1931
1932     SendSpanningChildrenNode(s, m);
1933 #else
1934     int i;
1935     for (i=0; i<CmiNumNodes(); i++) {
1936         if (i==CmiMyNode()) continue;
1937         char *dupmsg = (char *)CmiAlloc(s);
1938         CmiMemcpy(dupmsg,m,s);
1939         CmiFreeNodeSendFn(i, s, dupmsg);
1940     }
1941 #endif
1942     CmiFree(m);    
1943 }
1944
1945 void CmiSyncNodeBroadcastFn(int s, char *m) {
1946     char *dupmsg;
1947     dupmsg = (char *)CmiAlloc(s);
1948     CmiMemcpy(dupmsg,m,s);
1949     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1950     CmiFreeNodeBroadcastFn(s, dupmsg);
1951 }
1952
1953 /* need */
1954 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1955     char *dupmsg = (char *)CmiAlloc(s);
1956     CmiMemcpy(dupmsg,m,s);
1957     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1958     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1959     CMI_SET_CHECKSUM(dupmsg, s);
1960
1961     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1962     
1963     CQdCreate(CpvAccess(cQdState), 1);
1964     CmiSendNodeSelf(dupmsg);
1965
1966     CmiFreeNodeBroadcastFn(s, m);
1967 }
1968
1969 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1970     char *dupmsg;
1971     dupmsg = (char *)CmiAlloc(s);
1972     CmiMemcpy(dupmsg,m,s);
1973     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1974     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1975 }
1976
1977
1978 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1979     return NULL;
1980 }
1981 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1982
1983 #include "manytomany.c"
1984
1985
1986 /*********************************************************************************************
1987 This section is for CmiDirect. This is a variant of the  persistent communication in which
1988 the user can transfer data between processors without using Charm++ messages. This lets the user
1989 send and receive data from the middle of his arrays without any copying on either send or receive
1990 side
1991 *********************************************************************************************/
1992
1993
1994
1995
1996 #ifdef BGP_USE_RDMA
1997
1998 #include "cmidirect.h"
1999
2000 /* We can avoid a receiver side lookup by just sending the whole shebang.
2001    DCMF header is in units of quad words (16 bytes), so we'd need less than a
2002    quad word for the handle if we just sent that and did a lookup. Or exactly
2003    2 quad words for the buffer pointer, callback pointer, callback
2004    data pointer, and DCMF_Request_t pointer with no lookup.
2005
2006    Since CmiDirect is generally going to be used for messages which aren't
2007    tiny, the extra 16 bytes is not likely to impact performance noticably and
2008    not having to lookup handles in tables simplifies the code enormously.
2009
2010    EJB   2008/4/2
2011 */
2012
2013
2014 /**
2015  To be called on the receiver to create a handle and return its number
2016 **/
2017 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
2018     /* one-sided primitives would require registration of memory */
2019
2020     /* with two-sided primitives we just bundle the buffer and callback info into the handle so the sender can remind us about it later. */
2021     struct infiDirectUserHandle userHandle;
2022     userHandle.handle=1; /* doesn't matter on BG/P*/
2023     userHandle.senderNode=senderNode;
2024     userHandle.recverNode=_Cmi_mynode;
2025     userHandle.recverBufSize=recvBufSize;
2026     userHandle.recverBuf=recvBuf;
2027     userHandle.initialValue=initialValue;
2028     userHandle.callbackFnPtr=callbackFnPtr;
2029     userHandle.callbackData=callbackData;
2030     userHandle.DCMF_rq_trecv=ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2031 #if CMI_DIRECT_DEBUG
2032     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
2033 #endif
2034     return userHandle;
2035 }
2036
2037 /****
2038  To be called on the sender to attach the sender's buffer to this handle
2039 ******/
2040
2041 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
2042
2043     /* one-sided primitives would require registration of memory */
2044
2045     /* with two-sided primitives we just record the sender buf in the handle */
2046     userHandle->senderBuf=sendBuf;
2047     CmiAssert(sendBufSize==userHandle->recverBufSize);
2048     userHandle->DCMF_rq_tsend =ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2049 #if CMI_DIRECT_DEBUG
2050     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2051 #endif
2052
2053 }
2054
2055 /****
2056 To be called on the sender to do the actual data transfer
2057 ******/
2058 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
2059     /** invoke a DCMF_Send with the direct callback */
2060     DCMF_Protocol_t *protocol = NULL;
2061     protocol = &cmi_dcmf_direct_registration;
2062     /* local copy */
2063     CmiAssert(userHandle->recverBuf!=NULL);
2064     CmiAssert(userHandle->senderBuf!=NULL);
2065     CmiAssert(userHandle->recverBufSize>0);
2066     if (userHandle->recverNode== _Cmi_mynode) {
2067 #if CMI_DIRECT_DEBUG
2068         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2069 #endif
2070
2071         CmiMemcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
2072         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2073     } else {
2074         dcmfDirectMsgHeader msgHead;
2075         msgHead.recverBuf=userHandle->recverBuf;
2076         msgHead.callbackFnPtr=userHandle->callbackFnPtr;
2077         msgHead.callbackData=userHandle->callbackData;
2078         msgHead.DCMF_rq_t=(DCMF_Request_t *) userHandle->DCMF_rq_trecv;
2079 #if CMK_SMP
2080         DCMF_CriticalSection_enter (0);
2081 #endif
2082 #if CMI_DIRECT_DEBUG
2083         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2084 #endif
2085         DCMF_Send (protocol,
2086                    (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
2087                    directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2088                    userHandle->recverBufSize, userHandle->senderBuf,
2089                    (struct DCQuad *) &(msgHead), 2);
2090
2091 #if CMK_SMP
2092         DCMF_CriticalSection_exit (0);
2093 #endif
2094     }
2095 }
2096
2097 /**** Should not be called the first time *********/
2098 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2099     /* no op on BGP */
2100 }
2101
2102 /**** Should not be called the first time *********/
2103 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2104     /* no op on BGP */
2105 }
2106
2107 /**** Should not be called the first time *********/
2108 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2109     /* no op on BGP */
2110 }
2111
2112 #endif /* BGP_USE_RDMA*/
2113