Adding a flag to throttle broadcast.
[charm.git] / src / arch / bluegenep / machine.c
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include "assert.h"
12 #include "malloc.h"
13
14 #include <bpcore/ppc450_inlines.h>
15
16 #include "dcmf.h"
17 #include "dcmf_multisend.h"
18
19 char *ALIGN_16(char *p) {
20     return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
21 }
22
23 #define PROGRESS_PERIOD 1024
24
25 //There are two roles of comm thread:
26 // 1. polling other cores' bcast msg queue
27 // 2. bcast msg is handled only by comm thd
28 #define BCASTMSG_ONLY_TO_COMMTHD 0
29
30 CpvDeclare(PCQueue, broadcast_q);                 //queue to send broadcast messages
31 #if CMK_NODE_QUEUE_AVAILABLE
32 CsvDeclare(PCQueue, node_bcastq);
33 CsvDeclare(CmiNodeLock, node_bcastLock);
34 #endif
35
36 /*
37     To reduce the buffer used in broadcast and distribute the load from
38   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
39   spanning tree broadcast algorithm.
40     This will use the fourth short in message as an indicator of spanning tree
41   root.
42 */
43 #if CMK_SMP
44 #define CMK_BROADCAST_SPANNING_TREE   1
45 #else
46 #define CMK_BROADCAST_SPANNING_TREE    1
47 #define CMK_BROADCAST_HYPERCUBE        0
48 #endif /* CMK_SMP */
49
50 #define BROADCAST_SPANNING_FACTOR     2
51
52 //The root of the message infers the type of the message
53 // 1. root is 0, then it is a normal point-to-point message
54 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
55 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
58 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
59 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
60
61 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
62 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
63
64 /* FIXME: need a random number that everyone agrees ! */
65 #define CHARM_MAGIC_NUMBER               126
66
67 #if !CMK_OPTIMIZE
68 static int checksum_flag = 0;
69 extern unsigned char computeCheckSum(unsigned char *data, int len);
70
71 #define CMI_SET_CHECKSUM(msg, len)      \
72         if (checksum_flag)  {   \
73           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
74           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
75         }
76
77 #define CMI_CHECK_CHECKSUM(msg, len)    \
78         if (checksum_flag)      \
79           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
80             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
81             for(count = 0; count < len; count++) { \
82                 printf("%2x", msg[count]);                 \
83             }                                             \
84             printf("------------------------------\n\n"); \
85             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
86           }
87 #else
88 #define CMI_SET_CHECKSUM(msg, len)
89 #define CMI_CHECK_CHECKSUM(msg, len)
90 #endif
91
92 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
93
94 #if CMK_BROADCAST_HYPERCUBE
95 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
96 #else
97 #  define CMI_SET_CYCLE(msg, cycle)
98 #endif
99
100 int               _Cmi_numpes;
101 int               _Cmi_mynode;    /* Which address space am I */
102 int               _Cmi_mynodesize;/* Number of processors in my address space */
103 int               _Cmi_numnodes;  /* Total number of address spaces */
104 int                Cmi_nodestart; /* First processor in this address space */
105 CpvDeclare(void*, CmiLocalQueue);
106
107
108 #if CMK_NODE_QUEUE_AVAILABLE
109 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
110 // is available
111 #define NODE_BROADCAST_OTHERS (-1)
112 #define NODE_BROADCAST_ALL    (-2)
113 #endif
114
115
116 typedef struct ProcState {
117     /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
118     CmiNodeLock  recvLock;              /* for cs->recv */
119     CmiNodeLock bcastLock;
120 } ProcState;
121
122 static ProcState  *procState;
123
124 #if CMK_SMP && !CMK_MULTICORE
125 static volatile int commThdExit = 0;
126 static CmiNodeLock commThdExitLock = 0;
127 #endif
128
129 void ConverseRunPE(int everReturn);
130 static void CommunicationServer(int sleepTime);
131 static void CommunicationServerThread(int sleepTime);
132
133 //So far we dont define any comm threads
134 int Cmi_commthread = 0;
135
136 #include "machine-smp.c"
137 CsvDeclare(CmiNodeState, NodeState);
138 #include "immediate.c"
139
140 void AdvanceCommunications();
141
142
143 #if !CMK_SMP
144 /************ non SMP **************/
145 static struct CmiStateStruct Cmi_state;
146 int _Cmi_mype;
147 int _Cmi_myrank;
148
149 void CmiMemLock(void) {}
150 void CmiMemUnlock(void) {}
151
152 #define CmiGetState() (&Cmi_state)
153 #define CmiGetStateN(n) (&Cmi_state)
154
155 //void CmiYield(void) { sleep(0); }
156
157 static void CmiStartThreads(char **argv) {
158     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
159     _Cmi_mype = Cmi_nodestart;
160     _Cmi_myrank = 0;
161 }
162 #endif  /* !CMK_SMP */
163
164 //int received_immediate;
165 //int received_broadcast;
166
167 /*Add a message to this processor's receive queue, pe is a rank */
168 static void CmiPushPE(int pe,void *msg) {
169     CmiState cs = CmiGetStateN(pe);
170     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
171 #if CMK_IMMEDIATE_MSG
172     if (CmiIsImmediate(msg)) {
173         /**(CmiUInt2 *)msg = pe;*/
174         //received_immediate = 1;
175         //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
176         //CMI_DEST_RANK(msg) = pe;
177         CmiPushImmediateMsg(msg);
178         return;
179     }
180 #endif
181 #if CMK_SMP
182     CmiLock(procState[pe].recvLock);
183 #endif
184
185     PCQueuePush(cs->recv,(char *)msg);
186     //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
187
188 #if CMK_SMP
189     CmiUnlock(procState[pe].recvLock);
190 #endif
191     CmiIdleLock_addMessage(&cs->idle);
192     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
193 }
194
195 #if CMK_NODE_QUEUE_AVAILABLE
196 /*Add a message to this processor's receive queue */
197 static void CmiPushNode(void *msg) {
198     MACHSTATE(3,"Pushing message into NodeRecv queue");
199 #if CMK_IMMEDIATE_MSG
200     if (CmiIsImmediate(msg)) {
201         //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
202         //CMI_DEST_RANK(msg) = 0;
203         CmiPushImmediateMsg(msg);
204         return;
205     }
206 #endif
207     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
208     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
209     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
210     {
211         CmiState cs=CmiGetStateN(0);
212         CmiIdleLock_addMessage(&cs->idle);
213     }
214 }
215 #endif /* CMK_NODE_QUEUE_AVAILABLE */
216
217 volatile int msgQueueLen;
218 volatile int outstanding_recvs;
219
220 static int Cmi_dim;     /* hypercube dim of network */
221
222 static char     **Cmi_argv;
223 static char     **Cmi_argvcopy;
224 static CmiStartFn Cmi_startfn;   /* The start function */
225 static int        Cmi_usrsched;  /* Continue after start function finishes? */
226
227 extern void ConverseCommonInit(char **argv);
228 extern void ConverseCommonExit(void);
229 extern void CthInit(char **argv);
230
231 static void SendMsgsUntil(int);
232
233
234 void SendSpanningChildren(int size, char *msg);
235 #if CMK_NODE_QUEUE_AVAILABLE
236 void SendSpanningChildrenNode(int size, char *msg);
237 #endif
238 void SendHypercube(int size, char *msg);
239
240 DCMF_Protocol_t  cmi_dcmf_short_registration __attribute__((__aligned__(16)));
241 DCMF_Protocol_t  cmi_dcmf_eager_registration __attribute__((__aligned__(16)));
242 DCMF_Protocol_t  cmi_dcmf_rzv_registration   __attribute__((__aligned__(16)));
243 DCMF_Protocol_t  cmi_dcmf_multicast_registration   __attribute__((__aligned__(16)));
244
245 #define BGP_USE_RDMA 1
246 /*#define CMI_DIRECT_DEBUG 1*/
247 #ifdef BGP_USE_RDMA
248
249
250 DCMF_Protocol_t  cmi_dcmf_direct_registration __attribute__((__aligned__(16)));
251 /** The receive side of a put implemented in DCMF_Send */
252
253
254 typedef struct {
255     void *recverBuf;
256   void (*callbackFnPtr)(void *);
257     void *callbackData;
258     DCMF_Request_t *DCMF_rq_t;
259 } dcmfDirectMsgHeader;
260
261 /* nothing for us to do here */
262 #if (DCMF_VERSION_MAJOR >= 2)
263 void direct_send_done_cb(void*nothing, DCMF_Error_t *err) 
264 #else 
265   void direct_send_done_cb(void*nothing) 
266 #endif
267 {
268 #if CMI_DIRECT_DEBUG
269   CmiPrintf("[%d] RDMA send_done_cb\n", CmiMyPe());
270 #endif
271 }
272
273 DCMF_Callback_t  directcb;
274
275 void     direct_short_pkt_recv (void             * clientdata,
276                                 const DCQuad     * info,
277                                 unsigned           count,
278                                 unsigned           senderrank,
279                                 const char       * buffer,
280                                 const unsigned     sndlen) {
281 #if CMI_DIRECT_DEBUG
282     CmiPrintf("[%d] RDMA direct_short_pkt_recv\n", CmiMyPe());
283 #endif
284     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
285     CmiMemcpy(msgHead->recverBuf, buffer, sndlen);
286     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
287 }
288
289
290 #if (DCMF_VERSION_MAJOR >= 2)
291 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
292 #else
293 typedef void (*cbhdlr) (void *);
294 #endif
295
296 DCMF_Request_t * direct_first_pkt_recv_done (void              * clientdata,
297         const DCQuad      * info,
298         unsigned            count,
299         unsigned            senderrank,
300         const unsigned      sndlen,
301         unsigned          * rcvlen,
302         char             ** buffer,
303         DCMF_Callback_t   * cb
304                                             ) {
305 #if CMI_DIRECT_DEBUG
306     CmiPrintf("[%d] RDMA direct_first_pkt_recv_done\n", CmiMyPe());
307 #endif
308     /* pull the data we need out of the header */
309     *rcvlen=sndlen;
310     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
311     cb->function= (cbhdlr)msgHead->callbackFnPtr;
312     cb->clientdata=msgHead->callbackData;
313     *buffer=msgHead->recverBuf;
314     return msgHead->DCMF_rq_t;
315 }
316
317
318 #endif
319
320 typedef struct msg_list {
321     char              * msg;
322     int                 size;
323     int                 destpe;
324     int               * pelist;
325     DCMF_Callback_t     cb;
326     DCQuad              info __attribute__((__aligned__(16)));
327     DCMF_Request_t      send __attribute__((__aligned__(16)));
328 } SMSG_LIST __attribute__((__aligned__(16)));
329
330 #define MAX_NUM_SMSGS   64
331 CpvDeclare(PCQueue, smsg_list_q);
332
333 static inline SMSG_LIST * smsg_allocate() {
334     SMSG_LIST *smsg = (SMSG_LIST *)PCQueuePop(CpvAccess(smsg_list_q));
335     if (smsg != NULL)
336         return smsg;
337
338     void * buf = malloc(sizeof(SMSG_LIST)); 
339     assert(buf!=NULL);
340     assert (((unsigned)buf & 0x0f) == 0);
341
342     return (SMSG_LIST *) buf;
343 }
344
345 static inline void smsg_free (SMSG_LIST *smsg) {
346     int size = PCQueueLength (CpvAccess(smsg_list_q));
347     if (size < MAX_NUM_SMSGS)
348         PCQueuePush (CpvAccess(smsg_list_q), (char *) smsg);
349     else
350         free (smsg);
351 }
352
353 typedef struct {
354     int sleepMs; /*Milliseconds to sleep while idle*/
355     int nIdles; /*Number of times we've been idle in a row*/
356     CmiState cs; /*Machine state*/
357 } CmiIdleState;
358
359 static CmiIdleState *CmiNotifyGetState(void) {
360     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
361     s->sleepMs=0;
362     s->nIdles=0;
363     s->cs=CmiGetState();
364     return s;
365 }
366
367
368 #if (DCMF_VERSION_MAJOR >= 2)
369 static void send_done(void *data, DCMF_Error_t *err) 
370 #else 
371 static void send_done(void *data) 
372 #endif
373 /* send done callback: sets the smsg entry to done */
374 {
375     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
376     CmiFree(msg_tmp->msg);
377     //free(data);
378     smsg_free (msg_tmp);
379
380     msgQueueLen--;
381 }
382
383 #if (DCMF_VERSION_MAJOR >= 2)
384 static void send_multi_done(void *data, DCMF_Error_t *err) 
385 #else 
386 static void send_multi_done(void *data) 
387 #endif
388 /* send done callback: sets the smsg entry to done */
389 {
390     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
391     CmiFree(msg_tmp->msg);
392     free(msg_tmp->pelist);
393
394     smsg_free(msg_tmp);
395
396     msgQueueLen--;
397 }
398
399
400 #if (DCMF_VERSION_MAJOR >= 2)
401 static void recv_done(void *clientdata, DCMF_Error_t * err) 
402 #else 
403 static void recv_done(void *clientdata) 
404 #endif
405 /* recv done callback: push the recved msg to recv queue */
406 {
407
408     char *msg = (char *) clientdata;
409     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
410
411     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
412
413     /* then we do what PumpMsgs used to do:
414      * push msg to recv queue */
415     int count=0;
416     CMI_CHECK_CHECKSUM(msg, sndlen);
417     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
418         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
419         return;
420     }
421
422 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
423     if (CMI_IS_BCAST_ON_CORES(msg) ) {
424         int pe = CMI_DEST_RANK(msg);
425
426         //printf ("%d: Receiving bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, pe);
427
428         char *copymsg;
429         copymsg = (char *)CmiAlloc(sndlen);
430         CmiMemcpy(copymsg,msg,sndlen);
431
432         //received_broadcast = 1;
433 #if CMK_SMP
434         CmiLock(procState[pe].bcastLock);
435         PCQueuePush(CpvAccessOther(broadcast_q, pe), copymsg);
436         CmiUnlock(procState[pe].bcastLock);
437 #else
438         PCQueuePush(CpvAccess(broadcast_q), copymsg);
439 #endif
440     }
441 #endif
442
443 #if CMK_NODE_QUEUE_AVAILABLE
444 #if CMK_BROADCAST_SPANNING_TREE
445     if (CMI_IS_BCAST_ON_NODES(msg)) {
446         //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
447         char *copymsg = (char *)CmiAlloc(sndlen);
448         CmiMemcpy(copymsg,msg,sndlen);
449         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
450         CmiLock(CsvAccess(node_bcastLock));
451         PCQueuePush(CsvAccess(node_bcastq), copymsg);
452         CmiUnlock(CsvAccess(node_bcastLock));
453         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
454     }
455 #endif
456     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
457         CmiPushNode(msg);
458     else
459 #endif
460
461 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
462         if (CMI_DEST_RANK(msg)<_Cmi_mynodesize) { //not the comm thd
463             CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
464         }
465 #else
466         CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
467 #endif
468
469     outstanding_recvs --;
470 }
471
472
473 void     short_pkt_recv (void             * clientdata,
474                          const DCQuad     * info,
475                          unsigned           count,
476                          unsigned           senderrank,
477                          const char       * buffer,
478                          const unsigned     sndlen) {
479     outstanding_recvs ++;
480     int alloc_size = sndlen;
481
482     char * new_buffer = (char *)CmiAlloc(alloc_size);
483     CmiMemcpy (new_buffer, buffer, sndlen);
484     
485 #if (DCMF_VERSION_MAJOR >= 2)
486     recv_done (new_buffer, NULL);
487 #else
488     recv_done (new_buffer);
489 #endif
490 }
491
492
493 DCMF_Request_t * first_multi_pkt_recv_done (const DCQuad      * info,
494                                             unsigned            count,
495                                             unsigned            senderrank,                                    
496                                             const unsigned      sndlen,
497                                             unsigned            connid,
498                                             void              * clientdata,
499                                             unsigned          * rcvlen,
500                                             char             ** buffer,
501                                             unsigned          * pw,
502                                             DCMF_Callback_t   * cb
503                                             ) {
504     outstanding_recvs ++;
505     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
506
507     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
508
509     /* printf ("Receiving %d bytes\n", sndlen); */
510     *rcvlen = sndlen;  /* to avoid malloc(0) which might
511                                    return NULL */
512
513     *buffer = (char *)CmiAlloc(alloc_size);
514     cb->function = recv_done;
515     cb->clientdata = *buffer;
516
517     *pw  = 0x7fffffff;
518     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
519 }
520
521
522 DCMF_Request_t * first_pkt_recv_done (void              * clientdata,
523                                       const DCQuad      * info,
524                                       unsigned            count,
525                                       unsigned            senderrank,
526                                       const unsigned      sndlen,
527                                       unsigned          * rcvlen,
528                                       char             ** buffer,
529                                       DCMF_Callback_t   * cb
530                                      ) {
531     outstanding_recvs ++;
532     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
533
534     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
535
536     /* printf ("Receiving %d bytes\n", sndlen); */
537     *rcvlen = sndlen;  /* to avoid malloc(0) which might
538                                    return NULL */
539
540     *buffer = (char *)CmiAlloc(alloc_size);
541     cb->function = recv_done;
542     cb->clientdata = *buffer;
543
544     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
545 }
546
547
548 #if CMK_NODE_QUEUE_AVAILABLE
549 void sendBroadcastMessagesNode() {
550     if (PCQueueLength(CsvAccess(node_bcastq))==0) return;
551     //node broadcast message could be always handled by any cores (including
552     //comm thd) on this node
553     //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
554     CmiLock(CsvAccess(node_bcastLock));
555     char *msg = PCQueuePop(CsvAccess(node_bcastq));
556     CmiUnlock(CsvAccess(node_bcastLock));
557     //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
558     while (msg) {
559 #if CMK_BROADCAST_SPANNING_TREE
560         //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
561         SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
562 #endif
563         CmiFree(msg);
564         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
565         CmiLock(CsvAccess(node_bcastLock));
566         msg = PCQueuePop(CsvAccess(node_bcastq));
567         CmiUnlock(CsvAccess(node_bcastLock));
568         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
569     }
570 }
571 #endif
572
573 void sendBroadcastMessages() {
574 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
575 //in the presence of comm thd, and it is not responsible for broadcasting msg,
576 //the comm thd will help to pull the msg from rank 0 which is predefined as the
577 //core on a smp node to receive bcast msg.
578     int toPullRank = CmiMyRank();
579     if (CmiMyRank()==_Cmi_mynodesize) toPullRank = 0; //comm thd only pulls msg from rank 0
580 #else
581     int toPullRank = CmiMyRank();
582 #endif
583     PCQueue toPullQ;
584
585     /*
586     if(CmiMyRank()==_Cmi_mynodesize)
587       printf("Comm thd on node [%d] is pulling bcast msg\n", CmiMyNode());
588     else
589       printf("Work thd [%d] on node [%d] is pulling bcast msg\n", CmiMyRank(), CmiMyNode());
590     */
591 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
592     toPullQ = CpvAccessOther(broadcast_q, toPullRank);
593 #else
594     toPullQ = CpvAccess(broadcast_q);
595 #endif
596
597     if (PCQueueLength(toPullQ)==0) return;
598 #if CMK_SMP
599     CmiLock(procState[toPullRank].bcastLock);
600 #endif
601
602     char *msg = (char *) PCQueuePop(toPullQ);
603
604 #if CMK_SMP
605     CmiUnlock(procState[toPullRank].bcastLock);
606 #endif
607
608     while (msg) {
609
610 #if CMK_BROADCAST_SPANNING_TREE
611         SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
612 #elif CMK_BROADCAST_HYPERCUBE
613         SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
614 #endif
615
616         CmiFree (msg);
617
618 #if CMK_SMP
619         CmiLock(procState[toPullRank].bcastLock);
620 #endif
621
622 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
623         toPullQ = CpvAccessOther(broadcast_q, toPullRank);
624 #else
625         toPullQ = CpvAccess(broadcast_q);
626 #endif
627         msg = (char *) PCQueuePop(toPullQ);
628
629 #if CMK_SMP
630         CmiUnlock(procState[toPullRank].bcastLock);
631 #endif
632     }
633 }
634
635 CpvDeclare(unsigned, networkProgressCount);
636 int  networkProgressPeriod;
637
638 #if 0
639 unsigned int *ranklist;
640
641 BGTsC_t        barrier;
642
643 // -----------------------------------------
644 // Rectangular broadcast implementation
645 // -----------------------------------------
646
647 #define MAX_COMM  256
648 static void * comm_table [MAX_COMM];
649
650 typedef struct rectbcast_msg {
651     BGTsRC_t           request;
652     DCMF_Callback_t    cb;
653     char              *msg;
654 } RectBcastInfo;
655
656
657 static void bcast_done (void *data) {
658     RectBcastInfo *rinfo = (RectBcastInfo *) data;
659     CmiFree (rinfo->msg);
660     free (rinfo);
661 }
662
663 static  void *   getRectBcastRequest (unsigned comm) {
664     return comm_table [comm];
665 }
666
667
668 static  void *  bcast_recv     (unsigned               root,
669                                 unsigned               comm,
670                                 const unsigned         sndlen,
671                                 unsigned             * rcvlen,
672                                 char                ** rcvbuf,
673                                 DCMF_Callback_t      * const cb) {
674
675     int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
676
677     *rcvlen = sndlen;  /* to avoid malloc(0) which might
678                                    return NULL */
679
680     *rcvbuf       =  (char *)CmiAlloc(alloc_size);
681     cb->function  =   recv_done;
682     cb->clientdata = *rcvbuf;
683
684     return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
685
686 }
687
688
689 extern void bgl_machine_RectBcast (unsigned                 commid,
690                                        const char             * sndbuf,
691                                        unsigned                 sndlen) {
692     RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));
693     rinfo->cb.function    =   bcast_done;
694     rinfo->cb.clientdata  =   rinfo;
695
696     BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
697
698 }
699
700 extern void        bgl_machine_RectBcastInit  (unsigned               commID,
701             const BGTsRC_Geometry_t* geometry) {
702
703     CmiAssert (commID < 256);
704     CmiAssert (comm_table [commID] == NULL);
705
706     BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
707     comm_table [commID] = request;
708
709     BGTsRC_AsyncBcast_init  (request, commID,  geometry);
710 }
711
712
713
714
715 //--------------------------------------------------------------
716 //----- End Rectangular Broadcast Implementation ---------------
717 //--------------------------------------------------------------
718 #endif
719
720 //approx sleep command
721 void mysleep (int cycles) {
722     unsigned long long start = DCMF_Timebase();
723     unsigned long long end = start + cycles;
724
725     while (start < end)
726         start = DCMF_Timebase();
727
728     return;
729 }
730
731 static void * test_buf;
732
733 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
734     int n, i, count;
735
736     //fprintf(stderr, "Initializing Converse Blue Gene/P machine Layer\n");
737
738     DCMF_Messager_initialize();
739
740 #if CMK_SMP
741     DCMF_Configure_t  config_in, config_out;
742     config_in.thread_level= DCMF_THREAD_MULTIPLE;
743     config_in.interrupts  = DCMF_INTERRUPTS_OFF;
744
745     DCMF_Messager_configure(&config_in, &config_out);
746     //assert (config_out.thread_level == DCMF_THREAD_MULTIPLE); //not supported in vn mode
747 #endif
748
749     DCMF_Send_Configuration_t short_config, eager_config, rzv_config;
750
751
752     short_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
753     short_config.cb_recv_short = short_pkt_recv;
754     short_config.cb_recv       = first_pkt_recv_done;
755 #if (DCMF_VERSION_MAJOR >= 2)
756     short_config.network  = DCMF_DefaultNetwork;
757 #endif
758
759     eager_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
760     eager_config.cb_recv_short = short_pkt_recv;
761     eager_config.cb_recv       = first_pkt_recv_done;
762 #if (DCMF_VERSION_MAJOR >= 2)
763     eager_config.network  = DCMF_DefaultNetwork;
764 #endif
765
766 #ifdef  OPT_RZV
767 #warning "Enabling Optimize Rzv"
768     rzv_config.protocol        = DCMF_RZV_SEND_PROTOCOL;
769 #else
770     rzv_config.protocol        = DCMF_DEFAULT_SEND_PROTOCOL;
771 #endif
772     rzv_config.cb_recv_short   = short_pkt_recv;
773     rzv_config.cb_recv         = first_pkt_recv_done;
774 #if (DCMF_VERSION_MAJOR >= 2)
775     rzv_config.network  = DCMF_DefaultNetwork;
776 #endif
777
778     DCMF_Send_register (&cmi_dcmf_short_registration, &short_config);
779     DCMF_Send_register (&cmi_dcmf_eager_registration, &eager_config);
780     DCMF_Send_register (&cmi_dcmf_rzv_registration,   &rzv_config);
781
782 #ifdef BGP_USE_RDMA
783     DCMF_Send_Configuration_t direct_config;
784     direct_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
785     direct_config.cb_recv_short = direct_short_pkt_recv;
786     direct_config.cb_recv       = direct_first_pkt_recv_done;
787     DCMF_Send_register (&cmi_dcmf_direct_registration,   &direct_config);
788     directcb.function=direct_send_done_cb;
789     directcb.clientdata=NULL;
790 #endif
791
792     //fprintf(stderr, "Initializing Eager Protocol\n");
793
794     _Cmi_numnodes = DCMF_Messager_size();
795     _Cmi_mynode = DCMF_Messager_rank();
796
797     unsigned rank = DCMF_Messager_rank();
798     unsigned size = DCMF_Messager_size();
799
800     CmiBarrier();
801     CmiBarrier();
802     CmiBarrier();
803
804     /* processor per node */
805     _Cmi_mynodesize = 1;
806     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
807 #if ! CMK_SMP
808     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
809         CmiAbort("+ppn cannot be used in non SMP version!\n");
810 #endif
811
812     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
813     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
814     Cmi_argvcopy = CmiCopyArgs(argv);
815     Cmi_argv = argv;
816     Cmi_startfn = fn;
817     Cmi_usrsched = usched;
818
819     //printf ("Starting Charm with %d nodes and %d processors\n", CmiNumNodes(), CmiNumPes());
820
821     DCMF_Multicast_Configuration_t mconfig;
822     mconfig.protocol = DCMF_MEMFIFO_DMA_MSEND_PROTOCOL;
823     mconfig.cb_recv  = first_multi_pkt_recv_done;
824     mconfig.clientdata = NULL;
825     mconfig.connectionlist = (void **) malloc (CmiNumPes() * sizeof(unsigned long));
826     mconfig.nconnections = CmiNumPes();  
827     DCMF_Multicast_register(&cmi_dcmf_multicast_registration, &mconfig);
828
829
830     /* find dim = log2(numpes), to pretend we are a hypercube */
831     for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
832         Cmi_dim++ ;
833
834
835     /* checksum flag */
836     if (CmiGetArgFlag(argv,"+checksum")) {
837 #if !CMK_OPTIMIZE
838         checksum_flag = 1;
839         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
840 #else
841         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
842 #endif
843     }
844
845     CsvInitialize(CmiNodeState, NodeState);
846     CmiNodeStateInit(&CsvAccess(NodeState));
847
848 #if CMK_NODE_QUEUE_AVAILABLE
849     CsvInitialize(PCQueue, node_bcastq);
850     CsvAccess(node_bcastq) = PCQueueCreate();
851     CsvInitialize(CmiNodeLock, node_bcastLock);
852     CsvAccess(node_bcastLock) = CmiCreateLock();
853 #endif
854
855     int actualNodeSize = _Cmi_mynodesize;
856 #if !CMK_MULTICORE
857     actualNodeSize++; //considering the extra comm thread
858 #endif
859
860     procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
861     for (i=0; i<actualNodeSize; i++) {
862         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
863         procState[i].recvLock = CmiCreateLock();
864         procState[i].bcastLock = CmiCreateLock();
865     }
866
867 #if CMK_SMP && !CMK_MULTICORE
868     commThdExitLock = CmiCreateLock();
869 #endif
870
871     /* Network progress function is used to poll the network when for
872        messages. This flushes receive buffers on some  implementations*/
873     networkProgressPeriod = PROGRESS_PERIOD;
874     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
875
876     //printf ("Starting Threads\n");
877
878     CmiStartThreads(argv);
879
880     ConverseRunPE(initret);
881 }
882
883
884 int PerrorExit (char *err) {
885     fprintf (stderr, "err\n\n");
886     exit (-1);
887     return -1;
888 }
889
890
891 void ConverseRunPE(int everReturn) {
892     //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
893
894     CmiIdleState *s=CmiNotifyGetState();
895     CmiState cs;
896     char** CmiMyArgv;
897     CmiNodeAllBarrier();
898
899     cs = CmiGetState();
900     CpvInitialize(void *,CmiLocalQueue);
901     CpvAccess(CmiLocalQueue) = cs->localqueue;
902
903     if (CmiMyRank())
904         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
905     else
906         CmiMyArgv=Cmi_argv;
907
908     CthInit(CmiMyArgv);
909
910     //printf ("Before Converse Common Init\n");
911     ConverseCommonInit(CmiMyArgv);
912
913     /* initialize the network progress counter*/
914     /* Network progress function is used to poll the network when for
915        messages. This flushes receive buffers on some  implementations*/
916     CpvInitialize(int , networkProgressCount);
917     CpvAccess(networkProgressCount) = 0;
918
919     CpvInitialize(PCQueue, broadcast_q);
920     CpvAccess(broadcast_q) = PCQueueCreate();
921
922     CpvInitialize(PCQueue, smsg_list_q);
923     CpvAccess(smsg_list_q) = PCQueueCreate();
924
925     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
926
927     CmiBarrier();
928
929     /* Converse initialization finishes, immediate messages can be processed.
930        node barrier previously should take care of the node synchronization */
931     _immediateReady = 1;
932
933     /* communication thread */
934     if (CmiMyRank() == CmiMyNodeSize()) {
935         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
936         while (1) CommunicationServer(5);
937     } else {
938         //printf ("Calling Start Fn and the scheduler \n");
939
940         if (!everReturn) {
941             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
942             if (Cmi_usrsched==0) CsdScheduler(-1);
943             ConverseExit();
944         }
945     }
946 }
947
948 #if CMK_SMP
949 static int inexit = 0;
950
951 /* test if all processors recv queues are empty */
952 static int RecvQueueEmpty() {
953     int i;
954     for (i=0; i<_Cmi_mynodesize; i++) {
955         CmiState cs=CmiGetStateN(i);
956         if (!PCQueueEmpty(cs->recv)) return 0;
957     }
958     return 1;
959 }
960
961 #endif
962
963
964 //extern void DCMF_Messager_dumpTimers();
965
966 void ConverseExit(void) {
967
968     while (msgQueueLen > 0 || outstanding_recvs > 0) {
969         AdvanceCommunications();
970     }
971
972     CmiNodeBarrier();
973     ConverseCommonExit();
974
975     //  if(CmiMyPe()%101 == 0)
976     //DCMF_Messager_dumpTimers();
977
978     if (CmiMyPe() == 0) {
979         printf("End of program\n");
980     }
981
982     CmiNodeBarrier();
983 //  CmiNodeAllBarrier ();
984
985 #if CMK_SMP && !CMK_MULTICORE
986     //CmiLock(commThdExitLock);
987     commThdExit = 1;
988     //CmiUnlock(commThdExitLock);
989
990     _bgp_msync();
991 #endif
992
993     int rank0 = 0;
994
995     if (CmiMyRank() == 0) {
996         rank0 = 1;
997         //CmiFree(procState);
998         DCMF_Messager_finalize();
999     }
1000
1001     CmiNodeBarrier();
1002 //  CmiNodeAllBarrier ();
1003
1004     if (rank0)
1005         exit(0);
1006     else
1007         pthread_exit(NULL);
1008 }
1009
1010 /* exit() called on any node would abort the whole program */
1011 void CmiAbort(const char * message) {
1012     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1013              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
1014              msgQueueLen, outstanding_recvs, message);
1015     //CmiPrintStackTrace(0);
1016
1017     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1018         AdvanceCommunications();
1019     }
1020
1021     CmiBarrier();
1022     assert (0);
1023 }
1024
1025 static void CommunicationServer(int sleepTime) {
1026 #if CMK_SMP && !CMK_MULTICORE
1027     //CmiLock(commThdExitLock);
1028     if (commThdExit) {
1029         while (msgQueueLen > 0 || outstanding_recvs > 0) {
1030             AdvanceCommunications();
1031         }
1032         CmiUnlock(commThdExitLock);
1033         pthread_exit(NULL);
1034         return;
1035     }
1036     //CmiUnlock(commThdExitLock);
1037 #endif
1038     AdvanceCommunications();
1039
1040 #if CMK_IMMEDIATE_MSG && CMK_SMP && !CMK_MULTICORE
1041     CmiHandleImmediate();
1042 #endif
1043
1044     //mysleep(sleepTime);
1045 }
1046
1047 static void CommunicationServerThread(int sleepTime) {
1048 #if CMK_SMP
1049     CommunicationServer(sleepTime);
1050 #endif
1051 //immediate msgs are handled in AdvanceCommunications
1052 }
1053
1054 #if CMK_NODE_QUEUE_AVAILABLE
1055 char *CmiGetNonLocalNodeQ(void) {
1056     CmiState cs = CmiGetState();
1057     char *result = 0;
1058     CmiIdleLock_checkMessage(&cs->idle);
1059     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1060         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1061
1062         if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1063             //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1064             result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1065             CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1066         }
1067
1068         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1069     }
1070     return result;
1071 }
1072 #endif
1073
1074
1075 void *CmiGetNonLocal() {
1076
1077     CmiState cs = CmiGetState();
1078
1079     void *msg = NULL;
1080     CmiIdleLock_checkMessage(&cs->idle);
1081     /* although it seems that lock is not needed, I found it crashes very often
1082        on mpi-smp without lock */
1083
1084 #if !CMK_SMP || CMK_MULTICORE  /*|| !BCASTMSG_ONLY_TO_COMMTHD*/
1085 //ChaoMei changes
1086     AdvanceCommunications();
1087 #endif
1088
1089     /*if(CmiMyRank()==0) printf("Got stuck here on proc[%d] node[%d]\n", CmiMyPe(), CmiMyNode());*/
1090
1091     if (PCQueueLength(cs->recv)==0) return NULL;
1092
1093 #if CMK_SMP
1094     CmiLock(procState[cs->rank].recvLock);
1095 #endif
1096
1097     msg =  PCQueuePop(cs->recv);
1098
1099 #if CMK_SMP
1100     CmiUnlock(procState[cs->rank].recvLock);
1101 #endif
1102
1103     return msg;
1104 }
1105
1106 static void CmiSendSelf(char *msg) {
1107 #if CMK_IMMEDIATE_MSG
1108     if (CmiIsImmediate(msg)) {
1109         /* CmiBecomeNonImmediate(msg); */
1110         //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1111         CmiPushImmediateMsg(msg);
1112 #if CMK_MULTICORE
1113         CmiHandleImmediate();
1114 #endif
1115         return;
1116     }
1117 #endif
1118     
1119     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1120 }
1121
1122 #if CMK_SMP
1123 static void CmiSendPeer (int rank, int size, char *msg) {
1124 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
1125     if (CMI_BROADCAST_ROOT(msg) != 0) {
1126         char *copymsg;
1127         copymsg = (char *)CmiAlloc(size);
1128         CmiMemcpy(copymsg,msg,size);
1129
1130         CmiLock(procState[rank].bcastLock);
1131         PCQueuePush(CpvAccessOther(broadcast_q, rank), copymsg);
1132         CmiUnlock(procState[rank].bcastLock);
1133     }
1134 #endif
1135     
1136     CmiPushPE (rank, msg);
1137 }
1138 #endif
1139
1140
1141 void machineSend(SMSG_LIST *msg_tmp) {    
1142
1143     if (msg_tmp->destpe == CmiMyNode())
1144         CmiAbort("Sending to self\n");
1145
1146     CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumNodes());
1147     msg_tmp->cb.function     =   send_done;
1148     msg_tmp->cb.clientdata   =   msg_tmp;
1149
1150     DCMF_Protocol_t *protocol = NULL;
1151
1152     if (msg_tmp->size < 224)
1153         protocol = &cmi_dcmf_short_registration;
1154     else if (msg_tmp->size < 2048)
1155         protocol = &cmi_dcmf_eager_registration;
1156     else
1157         protocol = &cmi_dcmf_rzv_registration;
1158
1159
1160 #if CMK_SMP
1161     DCMF_CriticalSection_enter (0);
1162 #endif
1163
1164     msgQueueLen ++;
1165     DCMF_Send (protocol, &msg_tmp->send, msg_tmp->cb,
1166                DCMF_MATCH_CONSISTENCY, msg_tmp->destpe,
1167                msg_tmp->size, msg_tmp->msg, &msg_tmp->info, 1);
1168
1169 /*    
1170     #if CMK_SMP && !CMK_MULTICORE
1171     //Adding this advance call here improves the SMP performance
1172     //a little bit although it is possible that some more bugs are
1173     //introduced
1174     DCMF_Messager_advance();
1175     #endif
1176 */
1177
1178 #if CMK_SMP
1179     DCMF_CriticalSection_exit (0);
1180 #endif
1181 }
1182
1183 #define MAX_MULTICAST 128
1184 DCMF_Opcode_t  CmiOpcodeList [MAX_MULTICAST];
1185
1186 void  machineMulticast(int npes, int *pelist, int size, char* msg){  
1187   CQdCreate(CpvAccess(cQdState), npes);
1188   
1189   CmiAssert (npes < MAX_MULTICAST);
1190
1191   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1192   ((CmiMsgHeaderBasic *)msg)->size = size;  
1193   CMI_SET_BROADCAST_ROOT(msg,0);
1194   CMI_SET_CHECKSUM(msg, size);
1195   
1196   SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1197   
1198   msg_tmp->destpe    = -1;      //multicast operation
1199   msg_tmp->size      = size * npes; //keep track of #bytes outstanding
1200   msg_tmp->msg       = msg;
1201   msg_tmp->pelist    = pelist;
1202   
1203   msgQueueLen ++;
1204   
1205   DCMF_Multicast_t  mcast_info __attribute__((__aligned__(16)));
1206   
1207   mcast_info.registration   = & cmi_dcmf_multicast_registration;
1208   mcast_info.request        = & msg_tmp->send;
1209   mcast_info.cb_done.function    =   send_multi_done;
1210   mcast_info.cb_done.clientdata  =   msg_tmp;
1211   mcast_info.consistency    =   DCMF_MATCH_CONSISTENCY;
1212   mcast_info.connection_id  =   CmiMyPe();
1213   mcast_info.bytes          =   size;
1214   mcast_info.src            =   msg;
1215   mcast_info.nranks         =   npes;
1216   mcast_info.ranks          =   (unsigned *)pelist;
1217   mcast_info.opcodes        =   CmiOpcodeList;   //static list of MAX_MULTICAST entires with 0 in them
1218   mcast_info.flags          =   0;
1219   mcast_info.msginfo        =   &msg_tmp->info;
1220   mcast_info.count          =   1;
1221
1222   DCMF_Multicast (&mcast_info);
1223 }
1224
1225
1226
1227 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg);
1228
1229 /* The general free send function
1230  * Send is synchronous, and free msg after posted
1231  */
1232 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1233
1234   if (destPE < 0 || destPE > CmiNumPes ())
1235     printf ("Sending to %d\n", destPE);
1236
1237   CmiAssert (destPE >= 0 && destPE < CmiNumPes());
1238
1239     CmiState cs = CmiGetState();
1240
1241     if (destPE==cs->pe) {
1242         CmiSendSelf(msg);
1243         return;
1244     }
1245
1246 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1247     //In the presence of comm thd which is responsible for sending bcast msgs, then
1248     //CmiNodeOf and CmiRankOf may return incorrect information if the destPE is considered
1249     //as a comm thd.
1250     if (destPE >= _Cmi_numpes) { //destination is a comm thd
1251         int nid = destPE - _Cmi_numpes;
1252         int rid = _Cmi_mynodesize;
1253         CmiGeneralFreeSendN (nid, rid, size, msg);
1254     } else {
1255         CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1256     }
1257 #else
1258     //printf ("%d: Sending Message to %d \n", CmiMyPe(), destPE);
1259     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1260 #endif
1261 }
1262
1263 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg) {
1264
1265     //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
1266     //  CmiMyNode(), node, rank);
1267
1268 #if CMK_SMP
1269     CMI_DEST_RANK(msg) = rank;
1270     //CMI_SET_CHECKSUM(msg, size);
1271
1272     if (node == CmiMyNode()) {
1273         CmiSendPeer (rank, size, msg);
1274         return;
1275     }
1276 #endif
1277
1278     SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1279     msg_tmp->destpe = node; //destPE;
1280     msg_tmp->size = size;
1281     msg_tmp->msg = msg;
1282
1283     machineSend(msg_tmp);
1284 }
1285
1286 void CmiSyncSendFn(int destPE, int size, char *msg) {
1287     char *copymsg;
1288     copymsg = (char *)CmiAlloc(size);
1289     CmiMemcpy(copymsg,msg,size);
1290     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1291     CmiFreeSendFn(destPE,size,copymsg);
1292 }
1293
1294 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1295     CQdCreate(CpvAccess(cQdState), 1);
1296     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1297
1298     CMI_SET_BROADCAST_ROOT(msg,0);
1299     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1300     ((CmiMsgHeaderBasic *)msg)->size = size;
1301     CMI_SET_CHECKSUM(msg, size);
1302
1303     CmiGeneralFreeSend(destPE,size,msg);
1304 }
1305
1306 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1307 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1308     char *copymsg;
1309     copymsg = (char *)CmiAlloc(size);
1310     CmiMemcpy(copymsg, msg, size);
1311
1312     //  asm volatile("sync" ::: "memory");
1313
1314     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1315     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1316     CMI_SET_CHECKSUM(copymsg, size);
1317
1318     CmiGeneralFreeSend(destPE,size,copymsg);
1319 }
1320
1321 #define NODE_LEVEL_ST_IMPLEMENTATION 1
1322 #if NODE_LEVEL_ST_IMPLEMENTATION
1323 //send msgs to other ranks except the rank specified in the argument
1324 static void CmiSendChildrenPeers(int rank, int size, char *msg) {
1325     //printf ("%d [%d]: Send children peers except rank %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank());
1326     int r=0;
1327
1328 //With comm thd, broadcast msg will be pulled from bcast queue from the comm thd
1329 //And the msg would be finally pushed into other cores on the same node by the
1330 //comm thd. But it's possible a msg has already been pushed into rank 0 in
1331 //recv_done func call. So there's no need to push the bcast msg into the recv
1332 //queue again in the case BCASTMSG_ONLY_TO_COMMTHD is not set
1333
1334 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
1335     //indicate this is called from comm thread.
1336     if (rank == _Cmi_mynodesize) r = 1;
1337 #endif
1338
1339     for (;r<rank; r++) {
1340         char *copymsg;
1341         copymsg = (char *)CmiAlloc(size);
1342         CmiMemcpy(copymsg,msg,size);        
1343         CmiPushPE (r, copymsg);
1344     }
1345
1346     for (r=rank+1; r<_Cmi_mynodesize; r++) {
1347         char *copymsg;
1348         copymsg = (char *)CmiAlloc(size);
1349         CmiMemcpy(copymsg,msg,size);        
1350         CmiPushPE (r, copymsg);
1351     }
1352 }
1353
1354 //In this implementation, msgs are first sent out to the comm thd or rank 0 of other nodes
1355 //then send msgs to the other cores on the same node
1356 void SendSpanningChildren(int size, char *msg) {
1357     CmiState cs = CmiGetState();
1358     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1359     int i;
1360
1361     //printf ("%d [%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), startpe);
1362
1363     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1364
1365     int startNid = CmiNodeOf(startpe);
1366     int thisNid, thisRid;
1367     thisNid = CmiMyNode();
1368     thisRid = CmiMyRank();
1369
1370     //printf ("%d [%d/%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), thisRid, startpe);
1371
1372     //Step1: send to cores that has comm thd on other nodes
1373     int dist = thisNid - startNid;
1374     if (dist<0) dist += _Cmi_numnodes;
1375     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1376         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1377         if (nid > _Cmi_numnodes - 1) break;
1378         nid += startNid;
1379         nid = nid%_Cmi_numnodes;
1380         CmiAssert(nid>=0 && nid<_Cmi_numnodes && nid!=thisNid);
1381 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1382         int p = nid + _Cmi_numpes;
1383 #else
1384         int p = CmiNodeFirst(nid);
1385 #endif
1386         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1387         CmiSyncSendFn1(p, size, msg);
1388     }
1389
1390     //Step2: send to other cores (i.e. excluding myself cs->pe) on the same nodes (just a flat send)
1391     CmiSendChildrenPeers(thisRid, size, msg);
1392
1393 #if !CMK_SMP
1394 #if ENABLE_BROADCAST_THROTTLE
1395     SendMsgsUntil (0);
1396 #endif
1397 #endif
1398 }
1399 #else
1400 /* send msg to its spanning children in broadcast. G. Zheng */
1401 void SendSpanningChildren(int size, char *msg) {
1402     CmiState cs = CmiGetState();
1403     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1404     int i;
1405
1406     //printf ("%d [%d]: In Send Spanning Tree\n",  CmiMyPe(), CmiMyNode());
1407
1408     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1409     int dist = cs->pe-startpe;
1410     if (dist<0) dist+=_Cmi_numpes;
1411     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1412         int p = BROADCAST_SPANNING_FACTOR*dist + i;
1413         if (p > _Cmi_numpes - 1) break;
1414         p += startpe;
1415         p = p%_Cmi_numpes;
1416         CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1417
1418         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1419         CmiSyncSendFn1(p, size, msg);
1420     }    
1421 }
1422 #endif
1423
1424 /* send msg along the hypercube in broadcast. (Sameer) */
1425 void SendHypercube(int size, char *msg) {
1426     CmiState cs = CmiGetState();
1427     int curcycle = CMI_GET_CYCLE(msg);
1428     int i;
1429
1430     double logp = CmiNumPes();
1431     logp = log(logp)/log(2.0);
1432     logp = ceil(logp);
1433
1434     /*  CmiPrintf("In hypercube\n"); */
1435     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1436
1437     for (i = curcycle; i < logp; i++) {
1438         int p = cs->pe ^ (1 << i);
1439         /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1440         if (p < CmiNumPes()) {
1441             CMI_SET_CYCLE(msg, i + 1);
1442
1443             CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1444             CmiSyncSendFn1(p, size, msg);
1445         }
1446     }
1447 }
1448
1449 void CmiSyncBroadcastFn(int size, char *msg) {
1450     char *copymsg;
1451     copymsg = (char *)CmiAlloc(size);
1452     CmiMemcpy(copymsg,msg,size);
1453     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastFn on comm thd on node %d\n", CmiMyNode());
1454     CmiFreeBroadcastFn(size,copymsg);
1455 }
1456
1457 void CmiFreeBroadcastFn(int size, char *msg) {
1458
1459     //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1460
1461     CmiState cs = CmiGetState();
1462 #if CMK_BROADCAST_SPANNING_TREE    
1463     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1464     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastFn on comm thd on node %d\n", CmiMyNode());
1465
1466     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1467
1468     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1469     SendSpanningChildren(size, msg);
1470     CmiFree(msg);
1471 #elif CMK_BROADCAST_HYPERCUBE    
1472     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1473
1474     CMI_SET_CYCLE(msg, 0);
1475     SendHypercube(size, msg);
1476     CmiFree(msg);
1477 #else
1478     int i;
1479
1480     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1481         CmiSyncSendFn(i,size,msg);
1482
1483     for ( i=0; i<cs->pe; i++ )
1484         CmiSyncSendFn(i,size,msg);
1485
1486     CmiFree(msg);
1487 #endif
1488 }
1489
1490 void CmiSyncBroadcastAllFn(int size, char *msg) {
1491     char *copymsg;
1492     copymsg = (char *)CmiAlloc(size);
1493     CmiMemcpy(copymsg,msg,size);
1494     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1495     CmiFreeBroadcastAllFn(size,copymsg);
1496 }
1497
1498 void CmiFreeBroadcastAllFn(int size, char *msg) {
1499
1500     //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1501
1502     CmiState cs = CmiGetState();
1503 #if CMK_BROADCAST_SPANNING_TREE
1504
1505     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1506     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1507
1508     CmiSyncSendFn(cs->pe,size,msg);
1509     
1510     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1511
1512     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1513     SendSpanningChildren(size, msg);
1514     CmiFree(msg);
1515
1516 #elif CMK_BROADCAST_HYPERCUBE
1517     CmiSyncSendFn(cs->pe,size,msg);
1518     
1519     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1520
1521     CMI_SET_CYCLE(msg, 0);
1522     SendHypercube(size, msg);
1523     CmiFree(msg);
1524 #else
1525     int i ;
1526
1527     DCMF_CriticalSection_enter (0);
1528
1529     for ( i=0; i<_Cmi_numpes; i++ ) {
1530         CmiSyncSendFn(i,size,msg);
1531
1532         if ( (i % 32) == 0 )
1533             SendMsgsUntil (0);
1534     }
1535
1536     DCMF_CriticalSection_exit (0);
1537
1538     CmiFree(msg);
1539 #endif
1540 }
1541
1542 void AdvanceCommunications() {
1543
1544 #if CMK_SMP
1545     DCMF_CriticalSection_enter (0);
1546 #endif
1547
1548     while(DCMF_Messager_advance()>0);
1549     //DCMF_Messager_advance();
1550
1551 #if CMK_SMP
1552     DCMF_CriticalSection_exit (0);
1553 #endif
1554
1555     sendBroadcastMessages();
1556 #if CMK_NODE_QUEUE_AVAILABLE
1557     sendBroadcastMessagesNode();
1558 #endif
1559
1560 #if CMK_IMMEDIATE_MSG && CMK_MULTICORE
1561     CmiHandleImmediate();
1562 #endif
1563 }
1564
1565
1566 static void SendMsgsUntil(int targetm) {
1567
1568     while (msgQueueLen>targetm) {
1569       //AdvanceCommunications ();
1570 #if CMK_SMP
1571       DCMF_CriticalSection_enter (0);
1572 #endif
1573       
1574       while(DCMF_Messager_advance()>0);
1575       //DCMF_Messager_advance();
1576       
1577 #if CMK_SMP
1578       DCMF_CriticalSection_exit (0);
1579 #endif
1580     }
1581 }
1582
1583 void CmiNotifyIdle() {
1584 #if !CMK_SMP || CMK_MULTICORE
1585     AdvanceCommunications();
1586 #endif
1587 }
1588
1589
1590 /*==========================================================*/
1591 /*==========================================================*/
1592 /*==========================================================*/
1593
1594 /************ Recommended routines ***********************/
1595 /************ You dont have to implement these but they are supported
1596  in the converse syntax and some rare programs may crash. But most
1597  programs dont need them. *************/
1598
1599 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1600     CmiAbort("CmiAsyncSendFn not implemented.");
1601     return (CmiCommHandle) 0;
1602 }
1603
1604 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1605     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1606     return (CmiCommHandle) 0;
1607 }
1608
1609 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1610     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1611     return (CmiCommHandle) 0;
1612 }
1613
1614 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1615     CmiAbort("CmiAsyncMsgSent not implemented.");
1616     return 0;
1617 }
1618 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1619     CmiAbort("CmiReleaseCommHandle not implemented.");
1620 }
1621
1622
1623 /*==========================================================*/
1624 /*==========================================================*/
1625 /*==========================================================*/
1626
1627 /* Optional routines which could use common code which is shared with
1628    other machine layer implementations. */
1629
1630 /* MULTICAST/VECTOR SENDING FUNCTIONS
1631
1632  * In relations to some flags, some other delivery functions may be needed.
1633  */
1634
1635 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1636
1637 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1638     char *copymsg;
1639     copymsg = (char *)CmiAlloc(size);
1640     CmiMemcpy(copymsg,msg,size);
1641     CmiFreeListSendFn(npes, pes, size, msg);
1642 }
1643
1644 //#define OPTIMIZED_MULTICAST  0
1645
1646 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1647 #if CMK_SMP && !CMK_MULTICORE
1648     //DCMF_CriticalSection_enter (0);
1649 #endif
1650     CMI_SET_BROADCAST_ROOT(msg,0);
1651     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1652     ((CmiMsgHeaderBasic *)msg)->size = size;
1653     CMI_SET_CHECKSUM(msg, size);
1654
1655     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeListSendFn on comm thd on node %d\n", CmiMyNode());
1656
1657     //printf("%d: In Free List Send Fn\n", CmiMyPe());
1658     int new_npes = 0;
1659
1660     int i, count = 0, my_loc = -1;
1661     for (i=0; i<npes; i++) {
1662         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode()) {
1663         if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1664             CmiSyncSend(pes[i], size, msg);
1665             //my_loc = i;
1666         }
1667     }
1668
1669 #if OPTIMIZED_MULTICAST
1670 #warning "Using Optimized Multicast"
1671     if (npes > 1) {    
1672       int *newpelist = (int *) malloc (sizeof(int) * npes);
1673       int new_npes = 0;
1674     
1675       for(i=0; i<npes; i++) {
1676         if(CmiNodeOf(pes[i]) == CmiMyNode()) 
1677           continue;
1678         else
1679           newpelist[new_npes++] = pes[i];
1680       }
1681
1682       if (new_npes >= 1)
1683         machineMulticast (new_npes, newpelist, size, msg);
1684       else
1685         CmiFree (msg);
1686       return;
1687     }
1688 #endif
1689
1690     for (i=0;i<npes;i++) {
1691         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode());
1692         if (CmiNodeOf(pes[i]) == CmiMyNode());
1693         else if (i < npes - 1) {
1694 #if !CMK_SMP /*|| (CMK_SMP && !CMK_MULTICORE)*/
1695             CmiReference(msg);
1696             CmiGeneralFreeSend(pes[i], size, msg);
1697 #else
1698             CmiSyncSend(pes[i], size, msg);
1699 #endif
1700         }
1701     }
1702
1703     //if (npes  && (pes[npes-1] != CmiMyPe() && CmiNodeOf(pes[i]) != CmiMyNode()))
1704     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1705         CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
1706     else
1707         CmiFree(msg);
1708
1709     //AdvanceCommunications();
1710 #if CMK_SMP && !CMK_MULTICORE
1711     //DCMF_CriticalSection_exit (0);
1712 #endif
1713 }
1714
1715 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1716     CmiAbort("CmiAsyncListSendFn not implemented.");
1717     return (CmiCommHandle) 0;
1718 }
1719 #endif
1720
1721 /** NODE SENDING FUNCTIONS
1722
1723  * If there is a node queue, and we consider also nodes as entity (tipically in
1724  * SMP versions), these functions are needed.
1725  */
1726
1727 #if CMK_NODE_QUEUE_AVAILABLE
1728
1729 void          CmiSyncNodeSendFn(int, int, char *);
1730 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1731 void          CmiFreeNodeSendFn(int, int, char *);
1732
1733 void          CmiSyncNodeBroadcastFn(int, char *);
1734 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1735 void          CmiFreeNodeBroadcastFn(int, char *);
1736
1737 void          CmiSyncNodeBroadcastAllFn(int, char *);
1738 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1739 void          CmiFreeNodeBroadcastAllFn(int, char *);
1740
1741 #endif
1742
1743
1744 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1745
1746 int CmiMyPe();
1747 int CmiMyRank();
1748 int CmiNodeFirst(int node);
1749 int CmiNodeSize(int node);
1750 int CmiNodeOf(int pe);
1751 int CmiRankOf(int pe);
1752
1753 int CmiMyPe(void) {
1754     return CmiGetState()->pe;
1755 }
1756
1757 int CmiMyRank(void) {
1758     return CmiGetState()->rank;
1759 }
1760
1761 int CmiNodeFirst(int node) {
1762     return node*_Cmi_mynodesize;
1763 }
1764 int CmiNodeSize(int node)  {
1765     return _Cmi_mynodesize;
1766 }
1767
1768 int CmiNodeOf(int pe)      {
1769     return (pe/_Cmi_mynodesize);
1770 }
1771 int CmiRankOf(int pe)      {
1772     return pe%_Cmi_mynodesize;
1773 }
1774
1775
1776 /* optional, these functions are implemented in "machine-smp.c", so including
1777    this file avoid the necessity to reimplement them.
1778  */
1779 void CmiNodeBarrier(void);
1780 void CmiNodeAllBarrier(void);
1781 CmiNodeLock CmiCreateLock();
1782 void CmiDestroyLock(CmiNodeLock lock);
1783
1784 #endif
1785
1786 /** IMMEDIATE MESSAGES
1787
1788  * If immediate messages are supported, the following function is needed. There
1789  * is an exeption if the machine progress is also defined (see later for this).
1790
1791  * Moreover, the file "immediate.c" should be included, otherwise all its
1792  * functions and variables have to be redefined.
1793 */
1794
1795 #if CMK_CCS_AVAILABLE
1796
1797 #include "immediate.c"
1798
1799 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1800 void CmiProbeImmediateMsg();
1801 #endif
1802
1803 #endif
1804
1805
1806 /** MACHINE PROGRESS DEFINED
1807
1808  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1809  * to be pulled out of the network manually. For this reason the following
1810  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1811  * not be defined anymore.
1812  */
1813
1814 #if CMK_MACHINE_PROGRESS_DEFINED
1815
1816
1817
1818 void CmiMachineProgressImpl() {
1819
1820 #if !CMK_SMP
1821     AdvanceCommunications();
1822 #else
1823     /*Not implemented yet. Communication server does not seem to be
1824       thread safe */
1825 #endif
1826 }
1827
1828 #endif
1829
1830 /* Dummy implementation */
1831 extern int CmiBarrier() {
1832     //Use DCMF barrier later
1833 }
1834
1835 #if CMK_NODE_QUEUE_AVAILABLE
1836 static void CmiSendNodeSelf(char *msg) {
1837 #if CMK_IMMEDIATE_MSG
1838     if (CmiIsImmediate(msg)) {
1839         //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1840         CmiPushImmediateMsg(msg);
1841 #if CMK_MULTICORE
1842         CmiHandleImmediate();
1843 #endif
1844         return;
1845     }
1846 #endif    
1847     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1848     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1849     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1850 }
1851
1852 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1853     CmiAbort ("Async Node Send not supported\n");
1854 }
1855
1856 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1857
1858     CMI_SET_BROADCAST_ROOT(msg,0);
1859     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1860     ((CmiMsgHeaderBasic *)msg)->size = size;
1861     CMI_SET_CHECKSUM(msg, size);
1862     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1863     
1864     CQdCreate(CpvAccess(cQdState), 1);
1865
1866     if (node == _Cmi_mynode) {
1867         CmiSendNodeSelf(msg);
1868     } else {
1869         CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg);
1870     }
1871 }
1872
1873 void CmiSyncNodeSendFn(int p, int s, char *m) {
1874     char *dupmsg;
1875     dupmsg = (char *)CmiAlloc(s);
1876     CmiMemcpy(dupmsg,m,s);
1877     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1878     CmiFreeNodeSendFn(p, s, dupmsg);
1879 }
1880
1881 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1882     return NULL;
1883 }
1884
1885 void SendSpanningChildrenNode(int size, char *msg) {
1886     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1887     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1888     assert(startnode>=0 && startnode<CmiNumNodes());
1889
1890     int dist = CmiMyNode()-startnode;
1891     if (dist<0) dist += CmiNumNodes();
1892     int i;
1893     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1894         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1895         if (nid > CmiNumNodes() - 1) break;
1896         nid += startnode;
1897         nid = nid%CmiNumNodes();
1898         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1899         char *dupmsg = (char *)CmiAlloc(size);
1900         CmiMemcpy(dupmsg,msg,size);
1901         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1902         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg);
1903     }
1904 }
1905
1906 /* need */
1907 void CmiFreeNodeBroadcastFn(int s, char *m) {
1908 #if CMK_BROADCAST_SPANNING_TREE
1909     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1910     
1911     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1912
1913     int mynode = CmiMyNode();
1914     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1915     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1916     ((CmiMsgHeaderBasic *)m)->size = s;
1917     CMI_SET_CHECKSUM(m, s);
1918     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1919
1920     SendSpanningChildrenNode(s, m);
1921 #else
1922     int i;
1923     for (i=0; i<CmiNumNodes(); i++) {
1924         if (i==CmiMyNode()) continue;
1925         char *dupmsg = (char *)CmiAlloc(s);
1926         CmiMemcpy(dupmsg,m,s);
1927         CmiFreeNodeSendFn(i, s, dupmsg);
1928     }
1929 #endif
1930     CmiFree(m);    
1931 }
1932
1933 void CmiSyncNodeBroadcastFn(int s, char *m) {
1934     char *dupmsg;
1935     dupmsg = (char *)CmiAlloc(s);
1936     CmiMemcpy(dupmsg,m,s);
1937     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1938     CmiFreeNodeBroadcastFn(s, dupmsg);
1939 }
1940
1941 /* need */
1942 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1943     char *dupmsg = (char *)CmiAlloc(s);
1944     CmiMemcpy(dupmsg,m,s);
1945     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1946     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1947     CMI_SET_CHECKSUM(dupmsg, s);
1948
1949     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1950     
1951     CQdCreate(CpvAccess(cQdState), 1);
1952     CmiSendNodeSelf(dupmsg);
1953
1954     CmiFreeNodeBroadcastFn(s, m);
1955 }
1956
1957 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1958     char *dupmsg;
1959     dupmsg = (char *)CmiAlloc(s);
1960     CmiMemcpy(dupmsg,m,s);
1961     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1962     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1963 }
1964
1965
1966 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1967     return NULL;
1968 }
1969 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1970
1971 #include "manytomany.c"
1972
1973
1974 /*********************************************************************************************
1975 This section is for CmiDirect. This is a variant of the  persistent communication in which
1976 the user can transfer data between processors without using Charm++ messages. This lets the user
1977 send and receive data from the middle of his arrays without any copying on either send or receive
1978 side
1979 *********************************************************************************************/
1980
1981
1982
1983
1984 #ifdef BGP_USE_RDMA
1985
1986 #include "cmidirect.h"
1987
1988 /* We can avoid a receiver side lookup by just sending the whole shebang.
1989    DCMF header is in units of quad words (16 bytes), so we'd need less than a
1990    quad word for the handle if we just sent that and did a lookup. Or exactly
1991    2 quad words for the buffer pointer, callback pointer, callback
1992    data pointer, and DCMF_Request_t pointer with no lookup.
1993
1994    Since CmiDirect is generally going to be used for messages which aren't
1995    tiny, the extra 16 bytes is not likely to impact performance noticably and
1996    not having to lookup handles in tables simplifies the code enormously.
1997
1998    EJB   2008/4/2
1999 */
2000
2001
2002 /**
2003  To be called on the receiver to create a handle and return its number
2004 **/
2005 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
2006     /* one-sided primitives would require registration of memory */
2007
2008     /* with two-sided primitives we just bundle the buffer and callback info into the handle so the sender can remind us about it later. */
2009     struct infiDirectUserHandle userHandle;
2010     userHandle.handle=1; /* doesn't matter on BG/P*/
2011     userHandle.senderNode=senderNode;
2012     userHandle.recverNode=_Cmi_mynode;
2013     userHandle.recverBufSize=recvBufSize;
2014     userHandle.recverBuf=recvBuf;
2015     userHandle.initialValue=initialValue;
2016     userHandle.callbackFnPtr=callbackFnPtr;
2017     userHandle.callbackData=callbackData;
2018     userHandle.DCMF_rq_trecv=ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2019 #if CMI_DIRECT_DEBUG
2020     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
2021 #endif
2022     return userHandle;
2023 }
2024
2025 /****
2026  To be called on the sender to attach the sender's buffer to this handle
2027 ******/
2028
2029 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
2030
2031     /* one-sided primitives would require registration of memory */
2032
2033     /* with two-sided primitives we just record the sender buf in the handle */
2034     userHandle->senderBuf=sendBuf;
2035     CmiAssert(sendBufSize==userHandle->recverBufSize);
2036     userHandle->DCMF_rq_tsend =ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2037 #if CMI_DIRECT_DEBUG
2038     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2039 #endif
2040
2041 }
2042
2043 /****
2044 To be called on the sender to do the actual data transfer
2045 ******/
2046 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
2047     /** invoke a DCMF_Send with the direct callback */
2048     DCMF_Protocol_t *protocol = NULL;
2049     protocol = &cmi_dcmf_direct_registration;
2050     /* local copy */
2051     CmiAssert(userHandle->recverBuf!=NULL);
2052     CmiAssert(userHandle->senderBuf!=NULL);
2053     CmiAssert(userHandle->recverBufSize>0);
2054     if (userHandle->recverNode== _Cmi_mynode) {
2055 #if CMI_DIRECT_DEBUG
2056         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2057 #endif
2058
2059         memcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
2060         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2061     } else {
2062         dcmfDirectMsgHeader msgHead;
2063         msgHead.recverBuf=userHandle->recverBuf;
2064         msgHead.callbackFnPtr=userHandle->callbackFnPtr;
2065         msgHead.callbackData=userHandle->callbackData;
2066         msgHead.DCMF_rq_t=(DCMF_Request_t *) userHandle->DCMF_rq_trecv;
2067 #if CMK_SMP
2068         DCMF_CriticalSection_enter (0);
2069 #endif
2070 #if CMI_DIRECT_DEBUG
2071         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2072 #endif
2073         DCMF_Send (protocol,
2074                    (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
2075                    directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2076                    userHandle->recverBufSize, userHandle->senderBuf,
2077                    (struct DCQuad *) &(msgHead), 2);
2078
2079 #if CMK_SMP
2080         DCMF_CriticalSection_exit (0);
2081 #endif
2082     }
2083 }
2084
2085 /**** Should not be called the first time *********/
2086 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2087     /* no op on BGP */
2088 }
2089
2090 /**** Should not be called the first time *********/
2091 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2092     /* no op on BGP */
2093 }
2094
2095 /**** Should not be called the first time *********/
2096 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2097     /* no op on BGP */
2098 }
2099
2100 #endif /* BGP_USE_RDMA*/
2101