0e6753056d6227b3def44a803ba34f2e73698316
[charm.git] / src / arch / bluegenep / machine.c
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include "assert.h"
12 #include "malloc.h"
13
14 #include <bpcore/ppc450_inlines.h>
15
16 #include "dcmf.h"
17 #include "dcmf_multisend.h"
18
19 char *ALIGN_16(char *p) {
20     return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
21 }
22
23 #define PROGRESS_PERIOD 1024
24
25 //There are two roles of comm thread:
26 // 1. polling other cores' bcast msg queue
27 // 2. bcast msg is handled only by comm thd
28 #define BCASTMSG_ONLY_TO_COMMTHD 0
29
30 CpvDeclare(PCQueue, broadcast_q);                 //queue to send broadcast messages
31 #if CMK_NODE_QUEUE_AVAILABLE
32 CsvDeclare(PCQueue, node_bcastq);
33 CsvDeclare(CmiNodeLock, node_bcastLock);
34 #endif
35
36 /*
37     To reduce the buffer used in broadcast and distribute the load from
38   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
39   spanning tree broadcast algorithm.
40     This will use the fourth short in message as an indicator of spanning tree
41   root.
42 */
43 #if CMK_SMP
44 #define CMK_BROADCAST_SPANNING_TREE   1
45 #else
46 #define CMK_BROADCAST_SPANNING_TREE    1
47 #define CMK_BROADCAST_HYPERCUBE        0
48 #endif /* CMK_SMP */
49
50 #define BROADCAST_SPANNING_FACTOR     4
51
52 //The root of the message infers the type of the message
53 // 1. root is 0, then it is a normal point-to-point message
54 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
55 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
58 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
59 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
60
61 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
62 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
63
64 /* FIXME: need a random number that everyone agrees ! */
65 #define CHARM_MAGIC_NUMBER               126
66
67 #if !CMK_OPTIMIZE
68 static int checksum_flag = 0;
69 extern unsigned char computeCheckSum(unsigned char *data, int len);
70
71 #define CMI_SET_CHECKSUM(msg, len)      \
72         if (checksum_flag)  {   \
73           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
74           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
75         }
76
77 #define CMI_CHECK_CHECKSUM(msg, len)    \
78         if (checksum_flag)      \
79           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
80             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
81             for(count = 0; count < len; count++) { \
82                 printf("%2x", msg[count]);                 \
83             }                                             \
84             printf("------------------------------\n\n"); \
85             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
86           }
87 #else
88 #define CMI_SET_CHECKSUM(msg, len)
89 #define CMI_CHECK_CHECKSUM(msg, len)
90 #endif
91
92 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
93
94 #if CMK_BROADCAST_HYPERCUBE
95 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
96 #else
97 #  define CMI_SET_CYCLE(msg, cycle)
98 #endif
99
100 int               _Cmi_numpes;
101 int               _Cmi_mynode;    /* Which address space am I */
102 int               _Cmi_mynodesize;/* Number of processors in my address space */
103 int               _Cmi_numnodes;  /* Total number of address spaces */
104 int                Cmi_nodestart; /* First processor in this address space */
105 CpvDeclare(void*, CmiLocalQueue);
106
107
108 #if CMK_NODE_QUEUE_AVAILABLE
109 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
110 // is available
111 #define NODE_BROADCAST_OTHERS (-1)
112 #define NODE_BROADCAST_ALL    (-2)
113 #endif
114
115
116 typedef struct ProcState {
117     /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
118     CmiNodeLock  recvLock;              /* for cs->recv */
119     CmiNodeLock bcastLock;
120 } ProcState;
121
122 static ProcState  *procState;
123
124 #if CMK_SMP && !CMK_MULTICORE
125 static volatile int commThdExit = 0;
126 static CmiNodeLock commThdExitLock = 0;
127 #endif
128
129 void ConverseRunPE(int everReturn);
130 static void CommunicationServer(int sleepTime);
131 static void CommunicationServerThread(int sleepTime);
132
133 //So far we dont define any comm threads
134 int Cmi_commthread = 0;
135
136 #include "machine-smp.c"
137 CsvDeclare(CmiNodeState, NodeState);
138 #include "immediate.c"
139
140 void AdvanceCommunications();
141
142
143 #if !CMK_SMP
144 /************ non SMP **************/
145 static struct CmiStateStruct Cmi_state;
146 int _Cmi_mype;
147 int _Cmi_myrank;
148
149 void CmiMemLock(void) {}
150 void CmiMemUnlock(void) {}
151
152 #define CmiGetState() (&Cmi_state)
153 #define CmiGetStateN(n) (&Cmi_state)
154
155 //void CmiYield(void) { sleep(0); }
156
157 static void CmiStartThreads(char **argv) {
158     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
159     _Cmi_mype = Cmi_nodestart;
160     _Cmi_myrank = 0;
161 }
162 #endif  /* !CMK_SMP */
163
164 //int received_immediate;
165 //int received_broadcast;
166
167 /*Add a message to this processor's receive queue, pe is a rank */
168 static void CmiPushPE(int pe,void *msg) {
169     CmiState cs = CmiGetStateN(pe);
170     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
171 #if CMK_IMMEDIATE_MSG
172     if (CmiIsImmediate(msg)) {
173         /**(CmiUInt2 *)msg = pe;*/
174         //received_immediate = 1;
175         //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
176         //CMI_DEST_RANK(msg) = pe;
177         CmiPushImmediateMsg(msg);
178         return;
179     }
180 #endif
181 #if CMK_SMP
182     CmiLock(procState[pe].recvLock);
183 #endif
184
185     PCQueuePush(cs->recv,(char *)msg);
186     //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
187
188 #if CMK_SMP
189     CmiUnlock(procState[pe].recvLock);
190 #endif
191     CmiIdleLock_addMessage(&cs->idle);
192     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
193 }
194
195 #if CMK_NODE_QUEUE_AVAILABLE
196 /*Add a message to this processor's receive queue */
197 static void CmiPushNode(void *msg) {
198     MACHSTATE(3,"Pushing message into NodeRecv queue");
199 #if CMK_IMMEDIATE_MSG
200     if (CmiIsImmediate(msg)) {
201         //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
202         //CMI_DEST_RANK(msg) = 0;
203         CmiPushImmediateMsg(msg);
204         return;
205     }
206 #endif
207     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
208     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
209     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
210     {
211         CmiState cs=CmiGetStateN(0);
212         CmiIdleLock_addMessage(&cs->idle);
213     }
214 }
215 #endif /* CMK_NODE_QUEUE_AVAILABLE */
216
217 volatile int msgQueueLen;
218 volatile int outstanding_recvs;
219
220 static int Cmi_dim;     /* hypercube dim of network */
221
222 static char     **Cmi_argv;
223 static char     **Cmi_argvcopy;
224 static CmiStartFn Cmi_startfn;   /* The start function */
225 static int        Cmi_usrsched;  /* Continue after start function finishes? */
226
227 extern void ConverseCommonInit(char **argv);
228 extern void ConverseCommonExit(void);
229 extern void CthInit(char **argv);
230
231 static void SendMsgsUntil(int);
232
233
234 void SendSpanningChildren(int size, char *msg);
235 #if CMK_NODE_QUEUE_AVAILABLE
236 void SendSpanningChildrenNode(int size, char *msg);
237 #endif
238 void SendHypercube(int size, char *msg);
239
240 DCMF_Protocol_t  cmi_dcmf_short_registration __attribute__((__aligned__(16)));
241 DCMF_Protocol_t  cmi_dcmf_eager_registration __attribute__((__aligned__(16)));
242 DCMF_Protocol_t  cmi_dcmf_rzv_registration   __attribute__((__aligned__(16)));
243 DCMF_Protocol_t  cmi_dcmf_multicast_registration   __attribute__((__aligned__(16)));
244
245 #define BGP_USE_RDMA 1
246 /*#define CMI_DIRECT_DEBUG 1*/
247 #ifdef BGP_USE_RDMA
248
249
250 DCMF_Protocol_t  cmi_dcmf_direct_registration __attribute__((__aligned__(16)));
251 /** The receive side of a put implemented in DCMF_Send */
252
253
254 typedef struct {
255     void *recverBuf;
256   void (*callbackFnPtr)(void *);
257     void *callbackData;
258     DCMF_Request_t *DCMF_rq_t;
259 } dcmfDirectMsgHeader;
260
261 /* nothing for us to do here */
262 #if (DCMF_VERSION_MAJOR >= 2)
263 void direct_send_done_cb(void*nothing, DCMF_Error_t *err) 
264 #else 
265   void direct_send_done_cb(void*nothing) 
266 #endif
267 {
268 #if CMI_DIRECT_DEBUG
269   CmiPrintf("[%d] RDMA send_done_cb\n", CmiMyPe());
270 #endif
271 }
272
273 DCMF_Callback_t  directcb;
274
275 void     direct_short_pkt_recv (void             * clientdata,
276                                 const DCQuad     * info,
277                                 unsigned           count,
278                                 unsigned           senderrank,
279                                 const char       * buffer,
280                                 const unsigned     sndlen) {
281 #if CMI_DIRECT_DEBUG
282     CmiPrintf("[%d] RDMA direct_short_pkt_recv\n", CmiMyPe());
283 #endif
284     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
285     CmiMemcpy(msgHead->recverBuf, buffer, sndlen);
286     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
287 }
288
289
290 #if (DCMF_VERSION_MAJOR >= 2)
291 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
292 #else
293 typedef void (*cbhdlr) (void *);
294 #endif
295
296 DCMF_Request_t * direct_first_pkt_recv_done (void              * clientdata,
297         const DCQuad      * info,
298         unsigned            count,
299         unsigned            senderrank,
300         const unsigned      sndlen,
301         unsigned          * rcvlen,
302         char             ** buffer,
303         DCMF_Callback_t   * cb
304                                             ) {
305 #if CMI_DIRECT_DEBUG
306     CmiPrintf("[%d] RDMA direct_first_pkt_recv_done\n", CmiMyPe());
307 #endif
308     /* pull the data we need out of the header */
309     *rcvlen=sndlen;
310     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
311     cb->function= (cbhdlr)msgHead->callbackFnPtr;
312     cb->clientdata=msgHead->callbackData;
313     *buffer=msgHead->recverBuf;
314     return msgHead->DCMF_rq_t;
315 }
316
317
318 #endif
319
320 typedef struct msg_list {
321     char              * msg;
322     int                 size;
323     int                 destpe;
324     int               * pelist;
325     DCMF_Callback_t     cb;
326     DCQuad              info __attribute__((__aligned__(16)));
327     DCMF_Request_t      send __attribute__((__aligned__(16)));
328 } SMSG_LIST __attribute__((__aligned__(16)));
329
330 #define MAX_NUM_SMSGS   64
331 CpvDeclare(PCQueue, smsg_list_q);
332
333 inline SMSG_LIST * smsg_allocate() {
334     SMSG_LIST *smsg = (SMSG_LIST *)PCQueuePop(CpvAccess(smsg_list_q));
335     if (smsg != NULL)
336         return smsg;
337
338     void * buf = malloc(sizeof(SMSG_LIST)); 
339     assert(buf!=NULL);
340     assert (((unsigned)buf & 0x0f) == 0);
341
342     return (SMSG_LIST *) buf;
343 }
344
345 inline void smsg_free (SMSG_LIST *smsg) {
346     int size = PCQueueLength (CpvAccess(smsg_list_q));
347     if (size < MAX_NUM_SMSGS)
348         PCQueuePush (CpvAccess(smsg_list_q), (char *) smsg);
349     else
350         free (smsg);
351 }
352
353 typedef struct {
354     int sleepMs; /*Milliseconds to sleep while idle*/
355     int nIdles; /*Number of times we've been idle in a row*/
356     CmiState cs; /*Machine state*/
357 } CmiIdleState;
358
359 static CmiIdleState *CmiNotifyGetState(void) {
360     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
361     s->sleepMs=0;
362     s->nIdles=0;
363     s->cs=CmiGetState();
364     return s;
365 }
366
367
368 #if (DCMF_VERSION_MAJOR >= 2)
369 static void send_done(void *data, DCMF_Error_t *err) 
370 #else 
371 static void send_done(void *data) 
372 #endif
373 /* send done callback: sets the smsg entry to done */
374 {
375     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
376     CmiFree(msg_tmp->msg);
377     //free(data);
378     smsg_free (msg_tmp);
379
380     msgQueueLen--;
381 }
382
383 #if (DCMF_VERSION_MAJOR >= 2)
384 static void send_multi_done(void *data, DCMF_Error_t *err) 
385 #else 
386 static void send_multi_done(void *data) 
387 #endif
388 /* send done callback: sets the smsg entry to done */
389 {
390     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
391     CmiFree(msg_tmp->msg);
392     free(msg_tmp->pelist);
393
394     smsg_free(msg_tmp);
395
396     msgQueueLen--;
397 }
398
399
400 #if (DCMF_VERSION_MAJOR >= 2)
401 static void recv_done(void *clientdata, DCMF_Error_t * err) 
402 #else 
403 static void recv_done(void *clientdata) 
404 #endif
405 /* recv done callback: push the recved msg to recv queue */
406 {
407
408     char *msg = (char *) clientdata;
409     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
410
411     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
412
413     /* then we do what PumpMsgs used to do:
414      * push msg to recv queue */
415     int count=0;
416     CMI_CHECK_CHECKSUM(msg, sndlen);
417     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
418         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
419         return;
420     }
421
422 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
423     if (CMI_IS_BCAST_ON_CORES(msg) ) {
424         int pe = CMI_DEST_RANK(msg);
425
426         //printf ("%d: Receiving bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, pe);
427
428         char *copymsg;
429         copymsg = (char *)CmiAlloc(sndlen);
430         CmiMemcpy(copymsg,msg,sndlen);
431
432         //received_broadcast = 1;
433 #if CMK_SMP
434         CmiLock(procState[pe].bcastLock);
435         PCQueuePush(CpvAccessOther(broadcast_q, pe), copymsg);
436         CmiUnlock(procState[pe].bcastLock);
437 #else
438         PCQueuePush(CpvAccess(broadcast_q), copymsg);
439 #endif
440     }
441 #endif
442
443 #if CMK_NODE_QUEUE_AVAILABLE
444 #if CMK_BROADCAST_SPANNING_TREE
445     if (CMI_IS_BCAST_ON_NODES(msg)) {
446         //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
447         char *copymsg = (char *)CmiAlloc(sndlen);
448         CmiMemcpy(copymsg,msg,sndlen);
449         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
450         CmiLock(CsvAccess(node_bcastLock));
451         PCQueuePush(CsvAccess(node_bcastq), copymsg);
452         CmiUnlock(CsvAccess(node_bcastLock));
453         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
454     }
455 #endif
456     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
457         CmiPushNode(msg);
458     else
459 #endif
460
461 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
462         if (CMI_DEST_RANK(msg)<_Cmi_mynodesize) { //not the comm thd
463             CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
464         }
465 #else
466         CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
467 #endif
468
469     outstanding_recvs --;
470 }
471
472
473 void     short_pkt_recv (void             * clientdata,
474                          const DCQuad     * info,
475                          unsigned           count,
476                          unsigned           senderrank,
477                          const char       * buffer,
478                          const unsigned     sndlen) {
479     outstanding_recvs ++;
480     int alloc_size = sndlen;
481
482     char * new_buffer = (char *)CmiAlloc(alloc_size);
483     CmiMemcpy (new_buffer, buffer, sndlen);
484     
485 #if (DCMF_VERSION_MAJOR >= 2)
486     recv_done (new_buffer, NULL);
487 #else
488     recv_done (new_buffer);
489 #endif
490 }
491
492
493 DCMF_Request_t * first_multi_pkt_recv_done (const DCQuad      * info,
494                                             unsigned            count,
495                                             unsigned            senderrank,                                    
496                                             const unsigned      sndlen,
497                                             unsigned            connid,
498                                             void              * clientdata,
499                                             unsigned          * rcvlen,
500                                             char             ** buffer,
501                                             unsigned          * pw,
502                                             DCMF_Callback_t   * cb
503                                             ) {
504     outstanding_recvs ++;
505     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
506
507     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
508
509     /* printf ("Receiving %d bytes\n", sndlen); */
510     *rcvlen = sndlen;  /* to avoid malloc(0) which might
511                                    return NULL */
512
513     *buffer = (char *)CmiAlloc(alloc_size);
514     cb->function = recv_done;
515     cb->clientdata = *buffer;
516
517     *pw  = 0x7fffffff;
518     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
519 }
520
521
522 DCMF_Request_t * first_pkt_recv_done (void              * clientdata,
523                                       const DCQuad      * info,
524                                       unsigned            count,
525                                       unsigned            senderrank,
526                                       const unsigned      sndlen,
527                                       unsigned          * rcvlen,
528                                       char             ** buffer,
529                                       DCMF_Callback_t   * cb
530                                      ) {
531     outstanding_recvs ++;
532     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
533
534     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
535
536     /* printf ("Receiving %d bytes\n", sndlen); */
537     *rcvlen = sndlen;  /* to avoid malloc(0) which might
538                                    return NULL */
539
540     *buffer = (char *)CmiAlloc(alloc_size);
541     cb->function = recv_done;
542     cb->clientdata = *buffer;
543
544     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
545 }
546
547
548 #if CMK_NODE_QUEUE_AVAILABLE
549 void sendBroadcastMessagesNode() {
550     if (PCQueueLength(CsvAccess(node_bcastq))==0) return;
551     //node broadcast message could be always handled by any cores (including
552     //comm thd) on this node
553     //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
554     CmiLock(CsvAccess(node_bcastLock));
555     char *msg = PCQueuePop(CsvAccess(node_bcastq));
556     CmiUnlock(CsvAccess(node_bcastLock));
557     //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
558     while (msg) {
559 #if CMK_BROADCAST_SPANNING_TREE
560         //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
561         SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
562 #endif
563         CmiFree(msg);
564         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
565         CmiLock(CsvAccess(node_bcastLock));
566         msg = PCQueuePop(CsvAccess(node_bcastq));
567         CmiUnlock(CsvAccess(node_bcastLock));
568         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
569     }
570 }
571 #endif
572
573 void sendBroadcastMessages() {
574 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
575 //in the presence of comm thd, and it is not responsible for broadcasting msg,
576 //the comm thd will help to pull the msg from rank 0 which is predefined as the
577 //core on a smp node to receive bcast msg.
578     int toPullRank = CmiMyRank();
579     if (CmiMyRank()==_Cmi_mynodesize) toPullRank = 0; //comm thd only pulls msg from rank 0
580 #else
581     int toPullRank = CmiMyRank();
582 #endif
583     PCQueue toPullQ;
584
585     /*
586     if(CmiMyRank()==_Cmi_mynodesize)
587       printf("Comm thd on node [%d] is pulling bcast msg\n", CmiMyNode());
588     else
589       printf("Work thd [%d] on node [%d] is pulling bcast msg\n", CmiMyRank(), CmiMyNode());
590     */
591 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
592     toPullQ = CpvAccessOther(broadcast_q, toPullRank);
593 #else
594     toPullQ = CpvAccess(broadcast_q);
595 #endif
596
597     if (PCQueueLength(toPullQ)==0) return;
598 #if CMK_SMP
599     CmiLock(procState[toPullRank].bcastLock);
600 #endif
601
602     char *msg = (char *) PCQueuePop(toPullQ);
603
604 #if CMK_SMP
605     CmiUnlock(procState[toPullRank].bcastLock);
606 #endif
607
608     while (msg) {
609
610 #if CMK_BROADCAST_SPANNING_TREE
611         SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
612 #elif CMK_BROADCAST_HYPERCUBE
613         SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
614 #endif
615
616         CmiFree (msg);
617
618 #if CMK_SMP
619         CmiLock(procState[toPullRank].bcastLock);
620 #endif
621
622 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
623         toPullQ = CpvAccessOther(broadcast_q, toPullRank);
624 #else
625         toPullQ = CpvAccess(broadcast_q);
626 #endif
627         msg = (char *) PCQueuePop(toPullQ);
628
629 #if CMK_SMP
630         CmiUnlock(procState[toPullRank].bcastLock);
631 #endif
632     }
633 }
634
635 CpvDeclare(unsigned, networkProgressCount);
636 int  networkProgressPeriod;
637
638 #if 0
639 unsigned int *ranklist;
640
641 BGTsC_t        barrier;
642
643 // -----------------------------------------
644 // Rectangular broadcast implementation
645 // -----------------------------------------
646
647 #define MAX_COMM  256
648 static void * comm_table [MAX_COMM];
649
650 typedef struct rectbcast_msg {
651     BGTsRC_t           request;
652     DCMF_Callback_t    cb;
653     char              *msg;
654 } RectBcastInfo;
655
656
657 static void bcast_done (void *data) {
658     RectBcastInfo *rinfo = (RectBcastInfo *) data;
659     CmiFree (rinfo->msg);
660     free (rinfo);
661 }
662
663 static  void *   getRectBcastRequest (unsigned comm) {
664     return comm_table [comm];
665 }
666
667
668 static  void *  bcast_recv     (unsigned               root,
669                                 unsigned               comm,
670                                 const unsigned         sndlen,
671                                 unsigned             * rcvlen,
672                                 char                ** rcvbuf,
673                                 DCMF_Callback_t      * const cb) {
674
675     int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
676
677     *rcvlen = sndlen;  /* to avoid malloc(0) which might
678                                    return NULL */
679
680     *rcvbuf       =  (char *)CmiAlloc(alloc_size);
681     cb->function  =   recv_done;
682     cb->clientdata = *rcvbuf;
683
684     return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
685
686 }
687
688
689 extern void bgl_machine_RectBcast (unsigned                 commid,
690                                        const char             * sndbuf,
691                                        unsigned                 sndlen) {
692     RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));
693     rinfo->cb.function    =   bcast_done;
694     rinfo->cb.clientdata  =   rinfo;
695
696     BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
697
698 }
699
700 extern void        bgl_machine_RectBcastInit  (unsigned               commID,
701             const BGTsRC_Geometry_t* geometry) {
702
703     CmiAssert (commID < 256);
704     CmiAssert (comm_table [commID] == NULL);
705
706     BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
707     comm_table [commID] = request;
708
709     BGTsRC_AsyncBcast_init  (request, commID,  geometry);
710 }
711
712
713
714
715 //--------------------------------------------------------------
716 //----- End Rectangular Broadcast Implementation ---------------
717 //--------------------------------------------------------------
718 #endif
719
720 //approx sleep command
721 void mysleep (int cycles) {
722     unsigned long long start = DCMF_Timebase();
723     unsigned long long end = start + cycles;
724
725     while (start < end)
726         start = DCMF_Timebase();
727
728     return;
729 }
730
731 static void * test_buf;
732
733 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
734     int n, i, count;
735
736     //fprintf(stderr, "Initializing Converse Blue Gene/P machine Layer\n");
737
738     DCMF_Messager_initialize();
739
740 #if CMK_SMP
741     DCMF_Configure_t  config_in, config_out;
742     config_in.thread_level= DCMF_THREAD_MULTIPLE;
743     config_in.interrupts  = DCMF_INTERRUPTS_OFF;
744
745     DCMF_Messager_configure(&config_in, &config_out);
746     //assert (config_out.thread_level == DCMF_THREAD_MULTIPLE); //not supported in vn mode
747 #endif
748
749     DCMF_Send_Configuration_t short_config, eager_config, rzv_config;
750
751
752     short_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
753     short_config.cb_recv_short = short_pkt_recv;
754     short_config.cb_recv       = first_pkt_recv_done;
755
756     eager_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
757     eager_config.cb_recv_short = short_pkt_recv;
758     eager_config.cb_recv       = first_pkt_recv_done;
759
760 #ifdef  OPT_RZV
761 #warning "Enabling Optimize Rzv"
762     rzv_config.protocol        = DCMF_RZV_SEND_PROTOCOL;
763 #else
764     rzv_config.protocol        = DCMF_DEFAULT_SEND_PROTOCOL;
765 #endif
766     rzv_config.cb_recv_short   = short_pkt_recv;
767     rzv_config.cb_recv         = first_pkt_recv_done;
768
769     DCMF_Send_register (&cmi_dcmf_short_registration, &short_config);
770     DCMF_Send_register (&cmi_dcmf_eager_registration, &eager_config);
771     DCMF_Send_register (&cmi_dcmf_rzv_registration,   &rzv_config);
772
773 #ifdef BGP_USE_RDMA
774     DCMF_Send_Configuration_t direct_config;
775     direct_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
776     direct_config.cb_recv_short = direct_short_pkt_recv;
777     direct_config.cb_recv       = direct_first_pkt_recv_done;
778     DCMF_Send_register (&cmi_dcmf_direct_registration,   &direct_config);
779     directcb.function=direct_send_done_cb;
780     directcb.clientdata=NULL;
781 #endif
782
783     //fprintf(stderr, "Initializing Eager Protocol\n");
784
785     _Cmi_numnodes = DCMF_Messager_size();
786     _Cmi_mynode = DCMF_Messager_rank();
787
788     unsigned rank = DCMF_Messager_rank();
789     unsigned size = DCMF_Messager_size();
790
791     CmiBarrier();
792     CmiBarrier();
793     CmiBarrier();
794
795     /* processor per node */
796     _Cmi_mynodesize = 1;
797     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
798 #if ! CMK_SMP
799     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
800         CmiAbort("+ppn cannot be used in non SMP version!\n");
801 #endif
802
803     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
804     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
805     Cmi_argvcopy = CmiCopyArgs(argv);
806     Cmi_argv = argv;
807     Cmi_startfn = fn;
808     Cmi_usrsched = usched;
809
810     //printf ("Starting Charm with %d nodes and %d processors\n", CmiNumNodes(), CmiNumPes());
811
812     DCMF_Multicast_Configuration_t mconfig;
813     mconfig.protocol = DCMF_MEMFIFO_DMA_MSEND_PROTOCOL;
814     mconfig.cb_recv  = first_multi_pkt_recv_done;
815     mconfig.clientdata = NULL;
816     mconfig.connectionlist = (void **) malloc (CmiNumPes() * sizeof(unsigned long));
817     mconfig.nconnections = CmiNumPes();  
818     DCMF_Multicast_register(&cmi_dcmf_multicast_registration, &mconfig);
819
820
821     /* find dim = log2(numpes), to pretend we are a hypercube */
822     for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
823         Cmi_dim++ ;
824
825
826     /* checksum flag */
827     if (CmiGetArgFlag(argv,"+checksum")) {
828 #if !CMK_OPTIMIZE
829         checksum_flag = 1;
830         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
831 #else
832         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
833 #endif
834     }
835
836     CsvInitialize(CmiNodeState, NodeState);
837     CmiNodeStateInit(&CsvAccess(NodeState));
838
839 #if CMK_NODE_QUEUE_AVAILABLE
840     CsvInitialize(PCQueue, node_bcastq);
841     CsvAccess(node_bcastq) = PCQueueCreate();
842     CsvInitialize(CmiNodeLock, node_bcastLock);
843     CsvAccess(node_bcastLock) = CmiCreateLock();
844 #endif
845
846     int actualNodeSize = _Cmi_mynodesize;
847 #if !CMK_MULTICORE
848     actualNodeSize++; //considering the extra comm thread
849 #endif
850
851     procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
852     for (i=0; i<actualNodeSize; i++) {
853         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
854         procState[i].recvLock = CmiCreateLock();
855         procState[i].bcastLock = CmiCreateLock();
856     }
857
858 #if CMK_SMP && !CMK_MULTICORE
859     commThdExitLock = CmiCreateLock();
860 #endif
861
862     /* Network progress function is used to poll the network when for
863        messages. This flushes receive buffers on some  implementations*/
864     networkProgressPeriod = PROGRESS_PERIOD;
865     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
866
867     //printf ("Starting Threads\n");
868
869     CmiStartThreads(argv);
870
871     ConverseRunPE(initret);
872 }
873
874
875 int PerrorExit (char *err) {
876     fprintf (stderr, "err\n\n");
877     exit (-1);
878     return -1;
879 }
880
881
882 void ConverseRunPE(int everReturn) {
883     //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
884
885     CmiIdleState *s=CmiNotifyGetState();
886     CmiState cs;
887     char** CmiMyArgv;
888     CmiNodeAllBarrier();
889
890     cs = CmiGetState();
891     CpvInitialize(void *,CmiLocalQueue);
892     CpvAccess(CmiLocalQueue) = cs->localqueue;
893
894     if (CmiMyRank())
895         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
896     else
897         CmiMyArgv=Cmi_argv;
898
899     CthInit(CmiMyArgv);
900
901     //printf ("Before Converse Common Init\n");
902     ConverseCommonInit(CmiMyArgv);
903
904     /* initialize the network progress counter*/
905     /* Network progress function is used to poll the network when for
906        messages. This flushes receive buffers on some  implementations*/
907     CpvInitialize(int , networkProgressCount);
908     CpvAccess(networkProgressCount) = 0;
909
910     CpvInitialize(PCQueue, broadcast_q);
911     CpvAccess(broadcast_q) = PCQueueCreate();
912
913     CpvInitialize(PCQueue, smsg_list_q);
914     CpvAccess(smsg_list_q) = PCQueueCreate();
915
916     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
917
918     CmiBarrier();
919
920     /* Converse initialization finishes, immediate messages can be processed.
921        node barrier previously should take care of the node synchronization */
922     _immediateReady = 1;
923
924     /* communication thread */
925     if (CmiMyRank() == CmiMyNodeSize()) {
926         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
927         while (1) CommunicationServer(5);
928     } else {
929         //printf ("Calling Start Fn and the scheduler \n");
930
931         if (!everReturn) {
932             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
933             if (Cmi_usrsched==0) CsdScheduler(-1);
934             ConverseExit();
935         }
936     }
937 }
938
939 #if CMK_SMP
940 static int inexit = 0;
941
942 /* test if all processors recv queues are empty */
943 static int RecvQueueEmpty() {
944     int i;
945     for (i=0; i<_Cmi_mynodesize; i++) {
946         CmiState cs=CmiGetStateN(i);
947         if (!PCQueueEmpty(cs->recv)) return 0;
948     }
949     return 1;
950 }
951
952 #endif
953
954
955 //extern void DCMF_Messager_dumpTimers();
956
957 void ConverseExit(void) {
958
959     while (msgQueueLen > 0 || outstanding_recvs > 0) {
960         AdvanceCommunications();
961     }
962
963     CmiNodeBarrier();
964     ConverseCommonExit();
965
966     //  if(CmiMyPe()%101 == 0)
967     //DCMF_Messager_dumpTimers();
968
969     if (CmiMyPe() == 0) {
970         printf("End of program\n");
971     }
972
973     CmiNodeBarrier();
974 //  CmiNodeAllBarrier ();
975
976 #if CMK_SMP && !CMK_MULTICORE
977     //CmiLock(commThdExitLock);
978     commThdExit = 1;
979     //CmiUnlock(commThdExitLock);
980
981     _bgp_msync();
982 #endif
983
984     int rank0 = 0;
985
986     if (CmiMyRank() == 0) {
987         rank0 = 1;
988         //CmiFree(procState);
989         DCMF_Messager_finalize();
990     }
991
992     CmiNodeBarrier();
993 //  CmiNodeAllBarrier ();
994
995     if (rank0)
996         exit(0);
997     else
998         pthread_exit(NULL);
999 }
1000
1001 /* exit() called on any node would abort the whole program */
1002 void CmiAbort(const char * message) {
1003     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1004              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
1005              msgQueueLen, outstanding_recvs, message);
1006     //CmiPrintStackTrace(0);
1007
1008     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1009         AdvanceCommunications();
1010     }
1011
1012     CmiBarrier();
1013     assert (0);
1014 }
1015
1016 static void CommunicationServer(int sleepTime) {
1017 #if CMK_SMP && !CMK_MULTICORE
1018     //CmiLock(commThdExitLock);
1019     if (commThdExit) {
1020         while (msgQueueLen > 0 || outstanding_recvs > 0) {
1021             AdvanceCommunications();
1022         }
1023         CmiUnlock(commThdExitLock);
1024         pthread_exit(NULL);
1025         return;
1026     }
1027     //CmiUnlock(commThdExitLock);
1028 #endif
1029     AdvanceCommunications();
1030
1031 #if CMK_IMMEDIATE_MSG && CMK_SMP && !CMK_MULTICORE
1032     CmiHandleImmediate();
1033 #endif
1034
1035     //mysleep(sleepTime);
1036 }
1037
1038 static void CommunicationServerThread(int sleepTime) {
1039 #if CMK_SMP
1040     CommunicationServer(sleepTime);
1041 #endif
1042 //immediate msgs are handled in AdvanceCommunications
1043 }
1044
1045 #if CMK_NODE_QUEUE_AVAILABLE
1046 char *CmiGetNonLocalNodeQ(void) {
1047     CmiState cs = CmiGetState();
1048     char *result = 0;
1049     CmiIdleLock_checkMessage(&cs->idle);
1050     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1051         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1052
1053         if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1054             //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1055             result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1056             CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1057         }
1058
1059         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1060     }
1061     return result;
1062 }
1063 #endif
1064
1065
1066 void *CmiGetNonLocal() {
1067
1068     CmiState cs = CmiGetState();
1069
1070     void *msg = NULL;
1071     CmiIdleLock_checkMessage(&cs->idle);
1072     /* although it seems that lock is not needed, I found it crashes very often
1073        on mpi-smp without lock */
1074
1075 #if !CMK_SMP || CMK_MULTICORE  /*|| !BCASTMSG_ONLY_TO_COMMTHD*/
1076 //ChaoMei changes
1077     AdvanceCommunications();
1078 #endif
1079
1080     /*if(CmiMyRank()==0) printf("Got stuck here on proc[%d] node[%d]\n", CmiMyPe(), CmiMyNode());*/
1081
1082     if (PCQueueLength(cs->recv)==0) return NULL;
1083
1084 #if CMK_SMP
1085     CmiLock(procState[cs->rank].recvLock);
1086 #endif
1087
1088     msg =  PCQueuePop(cs->recv);
1089
1090 #if CMK_SMP
1091     CmiUnlock(procState[cs->rank].recvLock);
1092 #endif
1093
1094     return msg;
1095 }
1096
1097 static void CmiSendSelf(char *msg) {
1098 #if CMK_IMMEDIATE_MSG
1099     if (CmiIsImmediate(msg)) {
1100         /* CmiBecomeNonImmediate(msg); */
1101         //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1102         CmiPushImmediateMsg(msg);
1103 #if CMK_MULTICORE
1104         CmiHandleImmediate();
1105 #endif
1106         return;
1107     }
1108 #endif
1109     
1110     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1111 }
1112
1113 #if CMK_SMP
1114 static void CmiSendPeer (int rank, int size, char *msg) {
1115 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
1116     if (CMI_BROADCAST_ROOT(msg) != 0) {
1117         char *copymsg;
1118         copymsg = (char *)CmiAlloc(size);
1119         CmiMemcpy(copymsg,msg,size);
1120
1121         CmiLock(procState[rank].bcastLock);
1122         PCQueuePush(CpvAccessOther(broadcast_q, rank), copymsg);
1123         CmiUnlock(procState[rank].bcastLock);
1124     }
1125 #endif
1126     
1127     CmiPushPE (rank, msg);
1128 }
1129 #endif
1130
1131
1132 void machineSend(SMSG_LIST *msg_tmp) {    
1133
1134     if (msg_tmp->destpe == CmiMyNode())
1135         CmiAbort("Sending to self\n");
1136
1137     CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumNodes());
1138     msg_tmp->cb.function     =   send_done;
1139     msg_tmp->cb.clientdata   =   msg_tmp;
1140
1141     DCMF_Protocol_t *protocol = NULL;
1142
1143     if (msg_tmp->size < 224)
1144         protocol = &cmi_dcmf_short_registration;
1145     else if (msg_tmp->size < 2048)
1146         protocol = &cmi_dcmf_eager_registration;
1147     else
1148         protocol = &cmi_dcmf_rzv_registration;
1149
1150
1151 #if CMK_SMP
1152     DCMF_CriticalSection_enter (0);
1153 #endif
1154
1155     msgQueueLen ++;
1156     DCMF_Send (protocol, &msg_tmp->send, msg_tmp->cb,
1157                DCMF_MATCH_CONSISTENCY, msg_tmp->destpe,
1158                msg_tmp->size, msg_tmp->msg, &msg_tmp->info, 1);
1159
1160 /*    
1161     #if CMK_SMP && !CMK_MULTICORE
1162     //Adding this advance call here improves the SMP performance
1163     //a little bit although it is possible that some more bugs are
1164     //introduced
1165     DCMF_Messager_advance();
1166     #endif
1167 */
1168
1169 #if CMK_SMP
1170     DCMF_CriticalSection_exit (0);
1171 #endif
1172 }
1173
1174 #define MAX_MULTICAST 128
1175 DCMF_Opcode_t  CmiOpcodeList [MAX_MULTICAST];
1176
1177 void  machineMulticast(int npes, int *pelist, int size, char* msg){  
1178   CQdCreate(CpvAccess(cQdState), npes);
1179   
1180   CmiAssert (npes < MAX_MULTICAST);
1181
1182   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1183   ((CmiMsgHeaderBasic *)msg)->size = size;  
1184   CMI_SET_BROADCAST_ROOT(msg,0);
1185   CMI_SET_CHECKSUM(msg, size);
1186   
1187   SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1188   
1189   msg_tmp->destpe    = -1;      //multicast operation
1190   msg_tmp->size      = size * npes; //keep track of #bytes outstanding
1191   msg_tmp->msg       = msg;
1192   msg_tmp->pelist    = pelist;
1193   
1194   msgQueueLen ++;
1195   
1196   DCMF_Multicast_t  mcast_info __attribute__((__aligned__(16)));
1197   
1198   mcast_info.registration   = & cmi_dcmf_multicast_registration;
1199   mcast_info.request        = & msg_tmp->send;
1200   mcast_info.cb_done.function    =   send_multi_done;
1201   mcast_info.cb_done.clientdata  =   msg_tmp;
1202   mcast_info.consistency    =   DCMF_MATCH_CONSISTENCY;
1203   mcast_info.connection_id  =   CmiMyPe();
1204   mcast_info.bytes          =   size;
1205   mcast_info.src            =   msg;
1206   mcast_info.nranks         =   npes;
1207   mcast_info.ranks          =   (unsigned *)pelist;
1208   mcast_info.opcodes        =   CmiOpcodeList;   //static list of MAX_MULTICAST entires with 0 in them
1209   mcast_info.flags          =   0;
1210   mcast_info.msginfo        =   &msg_tmp->info;
1211   mcast_info.count          =   1;
1212
1213   DCMF_Multicast (&mcast_info);
1214 }
1215
1216
1217
1218 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg);
1219
1220 /* The general free send function
1221  * Send is synchronous, and free msg after posted
1222  */
1223 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1224
1225   if (destPE < 0 || destPE > CmiNumPes ())
1226     printf ("Sending to %d\n", destPE);
1227
1228   CmiAssert (destPE >= 0 && destPE < CmiNumPes());
1229
1230     CmiState cs = CmiGetState();
1231
1232     if (destPE==cs->pe) {
1233         CmiSendSelf(msg);
1234         return;
1235     }
1236
1237 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1238     //In the presence of comm thd which is responsible for sending bcast msgs, then
1239     //CmiNodeOf and CmiRankOf may return incorrect information if the destPE is considered
1240     //as a comm thd.
1241     if (destPE >= _Cmi_numpes) { //destination is a comm thd
1242         int nid = destPE - _Cmi_numpes;
1243         int rid = _Cmi_mynodesize;
1244         CmiGeneralFreeSendN (nid, rid, size, msg);
1245     } else {
1246         CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1247     }
1248 #else
1249     //printf ("%d: Sending Message to %d \n", CmiMyPe(), destPE);
1250     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1251 #endif
1252 }
1253
1254 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg) {
1255
1256     //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
1257     //  CmiMyNode(), node, rank);
1258
1259 #if CMK_SMP
1260     CMI_DEST_RANK(msg) = rank;
1261     //CMI_SET_CHECKSUM(msg, size);
1262
1263     if (node == CmiMyNode()) {
1264         CmiSendPeer (rank, size, msg);
1265         return;
1266     }
1267 #endif
1268
1269     SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1270     msg_tmp->destpe = node; //destPE;
1271     msg_tmp->size = size;
1272     msg_tmp->msg = msg;
1273
1274     machineSend(msg_tmp);
1275 }
1276
1277 void CmiSyncSendFn(int destPE, int size, char *msg) {
1278     char *copymsg;
1279     copymsg = (char *)CmiAlloc(size);
1280     CmiMemcpy(copymsg,msg,size);
1281     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1282     CmiFreeSendFn(destPE,size,copymsg);
1283 }
1284
1285 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1286     CQdCreate(CpvAccess(cQdState), 1);
1287     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1288
1289     CMI_SET_BROADCAST_ROOT(msg,0);
1290     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1291     ((CmiMsgHeaderBasic *)msg)->size = size;
1292     CMI_SET_CHECKSUM(msg, size);
1293
1294     CmiGeneralFreeSend(destPE,size,msg);
1295 }
1296
1297 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1298 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1299     char *copymsg;
1300     copymsg = (char *)CmiAlloc(size);
1301     CmiMemcpy(copymsg, msg, size);
1302
1303     //  asm volatile("sync" ::: "memory");
1304
1305     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1306     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1307     CMI_SET_CHECKSUM(copymsg, size);
1308
1309     CmiGeneralFreeSend(destPE,size,copymsg);
1310 }
1311
1312 #define NODE_LEVEL_ST_IMPLEMENTATION 1
1313 #if NODE_LEVEL_ST_IMPLEMENTATION
1314 //send msgs to other ranks except the rank specified in the argument
1315 static void CmiSendChildrenPeers(int rank, int size, char *msg) {
1316     //printf ("%d [%d]: Send children peers except rank %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank());
1317     int r=0;
1318
1319 //With comm thd, broadcast msg will be pulled from bcast queue from the comm thd
1320 //And the msg would be finally pushed into other cores on the same node by the
1321 //comm thd. But it's possible a msg has already been pushed into rank 0 in
1322 //recv_done func call. So there's no need to push the bcast msg into the recv
1323 //queue again in the case BCASTMSG_ONLY_TO_COMMTHD is not set
1324
1325 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
1326     //indicate this is called from comm thread.
1327     if (rank == _Cmi_mynodesize) r = 1;
1328 #endif
1329
1330     for (;r<rank; r++) {
1331         char *copymsg;
1332         copymsg = (char *)CmiAlloc(size);
1333         CmiMemcpy(copymsg,msg,size);        
1334         CmiPushPE (r, copymsg);
1335     }
1336
1337     for (r=rank+1; r<_Cmi_mynodesize; r++) {
1338         char *copymsg;
1339         copymsg = (char *)CmiAlloc(size);
1340         CmiMemcpy(copymsg,msg,size);        
1341         CmiPushPE (r, copymsg);
1342     }
1343 }
1344
1345 //In this implementation, msgs are first sent out to the comm thd or rank 0 of other nodes
1346 //then send msgs to the other cores on the same node
1347 void SendSpanningChildren(int size, char *msg) {
1348     CmiState cs = CmiGetState();
1349     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1350     int i;
1351
1352     //printf ("%d [%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), startpe);
1353
1354     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1355
1356     int startNid = CmiNodeOf(startpe);
1357     int thisNid, thisRid;
1358     thisNid = CmiMyNode();
1359     thisRid = CmiMyRank();
1360
1361     //printf ("%d [%d/%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), thisRid, startpe);
1362
1363     //Step1: send to cores that has comm thd on other nodes
1364     int dist = thisNid - startNid;
1365     if (dist<0) dist += _Cmi_numnodes;
1366     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1367         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1368         if (nid > _Cmi_numnodes - 1) break;
1369         nid += startNid;
1370         nid = nid%_Cmi_numnodes;
1371         CmiAssert(nid>=0 && nid<_Cmi_numnodes && nid!=thisNid);
1372 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1373         int p = nid + _Cmi_numpes;
1374 #else
1375         int p = CmiNodeFirst(nid);
1376 #endif
1377         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1378         CmiSyncSendFn1(p, size, msg);
1379     }
1380
1381     //Step2: send to other cores (i.e. excluding myself cs->pe) on the same nodes (just a flat send)
1382     CmiSendChildrenPeers(thisRid, size, msg);
1383
1384 }
1385 #else
1386 /* send msg to its spanning children in broadcast. G. Zheng */
1387 void SendSpanningChildren(int size, char *msg) {
1388     CmiState cs = CmiGetState();
1389     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1390     int i;
1391
1392     //printf ("%d [%d]: In Send Spanning Tree\n",  CmiMyPe(), CmiMyNode());
1393
1394     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1395     int dist = cs->pe-startpe;
1396     if (dist<0) dist+=_Cmi_numpes;
1397     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1398         int p = BROADCAST_SPANNING_FACTOR*dist + i;
1399         if (p > _Cmi_numpes - 1) break;
1400         p += startpe;
1401         p = p%_Cmi_numpes;
1402         CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1403
1404         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1405         CmiSyncSendFn1(p, size, msg);
1406     }
1407 }
1408 #endif
1409
1410 /* send msg along the hypercube in broadcast. (Sameer) */
1411 void SendHypercube(int size, char *msg) {
1412     CmiState cs = CmiGetState();
1413     int curcycle = CMI_GET_CYCLE(msg);
1414     int i;
1415
1416     double logp = CmiNumPes();
1417     logp = log(logp)/log(2.0);
1418     logp = ceil(logp);
1419
1420     /*  CmiPrintf("In hypercube\n"); */
1421     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1422
1423     for (i = curcycle; i < logp; i++) {
1424         int p = cs->pe ^ (1 << i);
1425         /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1426         if (p < CmiNumPes()) {
1427             CMI_SET_CYCLE(msg, i + 1);
1428
1429             CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1430             CmiSyncSendFn1(p, size, msg);
1431         }
1432     }
1433 }
1434
1435 void CmiSyncBroadcastFn(int size, char *msg) {
1436     char *copymsg;
1437     copymsg = (char *)CmiAlloc(size);
1438     CmiMemcpy(copymsg,msg,size);
1439     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastFn on comm thd on node %d\n", CmiMyNode());
1440     CmiFreeBroadcastFn(size,copymsg);
1441 }
1442
1443 void CmiFreeBroadcastFn(int size, char *msg) {
1444
1445     //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1446
1447     CmiState cs = CmiGetState();
1448 #if CMK_BROADCAST_SPANNING_TREE    
1449     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1450     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastFn on comm thd on node %d\n", CmiMyNode());
1451
1452     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1453
1454     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1455     SendSpanningChildren(size, msg);
1456     CmiFree(msg);
1457 #elif CMK_BROADCAST_HYPERCUBE    
1458     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1459
1460     CMI_SET_CYCLE(msg, 0);
1461     SendHypercube(size, msg);
1462     CmiFree(msg);
1463 #else
1464     int i;
1465
1466     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1467         CmiSyncSendFn(i,size,msg);
1468
1469     for ( i=0; i<cs->pe; i++ )
1470         CmiSyncSendFn(i,size,msg);
1471
1472     CmiFree(msg);
1473 #endif
1474 }
1475
1476 void CmiSyncBroadcastAllFn(int size, char *msg) {
1477     char *copymsg;
1478     copymsg = (char *)CmiAlloc(size);
1479     CmiMemcpy(copymsg,msg,size);
1480     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1481     CmiFreeBroadcastAllFn(size,copymsg);
1482 }
1483
1484 void CmiFreeBroadcastAllFn(int size, char *msg) {
1485
1486     //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1487
1488     CmiState cs = CmiGetState();
1489 #if CMK_BROADCAST_SPANNING_TREE
1490
1491     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1492     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1493
1494     CmiSyncSendFn(cs->pe,size,msg);
1495     
1496     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1497
1498     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1499     SendSpanningChildren(size, msg);
1500     CmiFree(msg);
1501
1502 #elif CMK_BROADCAST_HYPERCUBE
1503     CmiSyncSendFn(cs->pe,size,msg);
1504     
1505     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1506
1507     CMI_SET_CYCLE(msg, 0);
1508     SendHypercube(size, msg);
1509     CmiFree(msg);
1510 #else
1511     int i ;
1512
1513     DCMF_CriticalSection_enter (0);
1514
1515     for ( i=0; i<_Cmi_numpes; i++ ) {
1516         CmiSyncSendFn(i,size,msg);
1517
1518         if ( (i % 32) == 0 )
1519             SendMsgsUntil (0);
1520     }
1521
1522     DCMF_CriticalSection_exit (0);
1523
1524     CmiFree(msg);
1525 #endif
1526 }
1527
1528 void AdvanceCommunications() {
1529
1530 #if CMK_SMP
1531     DCMF_CriticalSection_enter (0);
1532 #endif
1533
1534     while(DCMF_Messager_advance()>0);
1535     //DCMF_Messager_advance();
1536
1537 #if CMK_SMP
1538     DCMF_CriticalSection_exit (0);
1539 #endif
1540
1541     sendBroadcastMessages();
1542 #if CMK_NODE_QUEUE_AVAILABLE
1543     sendBroadcastMessagesNode();
1544 #endif
1545
1546 #if CMK_IMMEDIATE_MSG && CMK_MULTICORE
1547     CmiHandleImmediate();
1548 #endif
1549 }
1550
1551
1552 static void SendMsgsUntil(int targetm) {
1553
1554     while (msgQueueLen>targetm) {
1555         AdvanceCommunications ();
1556     }
1557 }
1558
1559 void CmiNotifyIdle() {
1560 #if !CMK_SMP || CMK_MULTICORE
1561     AdvanceCommunications();
1562 #endif
1563 }
1564
1565
1566 /*==========================================================*/
1567 /*==========================================================*/
1568 /*==========================================================*/
1569
1570 /************ Recommended routines ***********************/
1571 /************ You dont have to implement these but they are supported
1572  in the converse syntax and some rare programs may crash. But most
1573  programs dont need them. *************/
1574
1575 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1576     CmiAbort("CmiAsyncSendFn not implemented.");
1577     return (CmiCommHandle) 0;
1578 }
1579
1580 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1581     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1582     return (CmiCommHandle) 0;
1583 }
1584
1585 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1586     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1587     return (CmiCommHandle) 0;
1588 }
1589
1590 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1591     CmiAbort("CmiAsyncMsgSent not implemented.");
1592     return 0;
1593 }
1594 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1595     CmiAbort("CmiReleaseCommHandle not implemented.");
1596 }
1597
1598
1599 /*==========================================================*/
1600 /*==========================================================*/
1601 /*==========================================================*/
1602
1603 /* Optional routines which could use common code which is shared with
1604    other machine layer implementations. */
1605
1606 /* MULTICAST/VECTOR SENDING FUNCTIONS
1607
1608  * In relations to some flags, some other delivery functions may be needed.
1609  */
1610
1611 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1612
1613 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1614     char *copymsg;
1615     copymsg = (char *)CmiAlloc(size);
1616     CmiMemcpy(copymsg,msg,size);
1617     CmiFreeListSendFn(npes, pes, size, msg);
1618 }
1619
1620 //#define OPTIMIZED_MULTICAST  0
1621
1622 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1623 #if CMK_SMP && !CMK_MULTICORE
1624     //DCMF_CriticalSection_enter (0);
1625 #endif
1626     CMI_SET_BROADCAST_ROOT(msg,0);
1627     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1628     ((CmiMsgHeaderBasic *)msg)->size = size;
1629     CMI_SET_CHECKSUM(msg, size);
1630
1631     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeListSendFn on comm thd on node %d\n", CmiMyNode());
1632
1633     //printf("%d: In Free List Send Fn\n", CmiMyPe());
1634     int new_npes = 0;
1635
1636     int i, count = 0, my_loc = -1;
1637     for (i=0; i<npes; i++) {
1638         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode()) {
1639         if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1640             CmiSyncSend(pes[i], size, msg);
1641             //my_loc = i;
1642         }
1643     }
1644
1645 #if OPTIMIZED_MULTICAST
1646 #warning "Using Optimized Multicast"
1647     if (npes > 1) {    
1648       int *newpelist = (int *) malloc (sizeof(int) * npes);
1649       int new_npes = 0;
1650     
1651       for(i=0; i<npes; i++) {
1652         if(CmiNodeOf(pes[i]) == CmiMyNode()) 
1653           continue;
1654         else
1655           newpelist[new_npes++] = pes[i];
1656       }
1657
1658       if (new_npes >= 1)
1659         machineMulticast (new_npes, newpelist, size, msg);
1660       else
1661         CmiFree (msg);
1662       return;
1663     }
1664 #endif
1665
1666     for (i=0;i<npes;i++) {
1667         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode());
1668         if (CmiNodeOf(pes[i]) == CmiMyNode());
1669         else if (i < npes - 1) {
1670 #if !CMK_SMP /*|| (CMK_SMP && !CMK_MULTICORE)*/
1671             CmiReference(msg);
1672             CmiGeneralFreeSend(pes[i], size, msg);
1673 #else
1674             CmiSyncSend(pes[i], size, msg);
1675 #endif
1676         }
1677     }
1678
1679     //if (npes  && (pes[npes-1] != CmiMyPe() && CmiNodeOf(pes[i]) != CmiMyNode()))
1680     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1681         CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
1682     else
1683         CmiFree(msg);
1684
1685     //AdvanceCommunications();
1686 #if CMK_SMP && !CMK_MULTICORE
1687     //DCMF_CriticalSection_exit (0);
1688 #endif
1689 }
1690
1691 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1692     CmiAbort("CmiAsyncListSendFn not implemented.");
1693     return (CmiCommHandle) 0;
1694 }
1695 #endif
1696
1697 /** NODE SENDING FUNCTIONS
1698
1699  * If there is a node queue, and we consider also nodes as entity (tipically in
1700  * SMP versions), these functions are needed.
1701  */
1702
1703 #if CMK_NODE_QUEUE_AVAILABLE
1704
1705 void          CmiSyncNodeSendFn(int, int, char *);
1706 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1707 void          CmiFreeNodeSendFn(int, int, char *);
1708
1709 void          CmiSyncNodeBroadcastFn(int, char *);
1710 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1711 void          CmiFreeNodeBroadcastFn(int, char *);
1712
1713 void          CmiSyncNodeBroadcastAllFn(int, char *);
1714 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1715 void          CmiFreeNodeBroadcastAllFn(int, char *);
1716
1717 #endif
1718
1719
1720 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1721
1722 int CmiMyPe();
1723 int CmiMyRank();
1724 int CmiNodeFirst(int node);
1725 int CmiNodeSize(int node);
1726 int CmiNodeOf(int pe);
1727 int CmiRankOf(int pe);
1728
1729 int CmiMyPe(void) {
1730     return CmiGetState()->pe;
1731 }
1732
1733 int CmiMyRank(void) {
1734     return CmiGetState()->rank;
1735 }
1736
1737 int CmiNodeFirst(int node) {
1738     return node*_Cmi_mynodesize;
1739 }
1740 int CmiNodeSize(int node)  {
1741     return _Cmi_mynodesize;
1742 }
1743
1744 int CmiNodeOf(int pe)      {
1745     return (pe/_Cmi_mynodesize);
1746 }
1747 int CmiRankOf(int pe)      {
1748     return pe%_Cmi_mynodesize;
1749 }
1750
1751
1752 /* optional, these functions are implemented in "machine-smp.c", so including
1753    this file avoid the necessity to reimplement them.
1754  */
1755 void CmiNodeBarrier(void);
1756 void CmiNodeAllBarrier(void);
1757 CmiNodeLock CmiCreateLock();
1758 void CmiDestroyLock(CmiNodeLock lock);
1759
1760 #endif
1761
1762 /** IMMEDIATE MESSAGES
1763
1764  * If immediate messages are supported, the following function is needed. There
1765  * is an exeption if the machine progress is also defined (see later for this).
1766
1767  * Moreover, the file "immediate.c" should be included, otherwise all its
1768  * functions and variables have to be redefined.
1769 */
1770
1771 #if CMK_CCS_AVAILABLE
1772
1773 #include "immediate.c"
1774
1775 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1776 void CmiProbeImmediateMsg();
1777 #endif
1778
1779 #endif
1780
1781
1782 /** MACHINE PROGRESS DEFINED
1783
1784  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1785  * to be pulled out of the network manually. For this reason the following
1786  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1787  * not be defined anymore.
1788  */
1789
1790 #if CMK_MACHINE_PROGRESS_DEFINED
1791
1792
1793
1794 void CmiMachineProgressImpl() {
1795
1796 #if !CMK_SMP
1797     AdvanceCommunications();
1798 #else
1799     /*Not implemented yet. Communication server does not seem to be
1800       thread safe */
1801 #endif
1802 }
1803
1804 #endif
1805
1806 /* Dummy implementation */
1807 extern int CmiBarrier() {
1808     //Use DCMF barrier later
1809 }
1810
1811 #if CMK_NODE_QUEUE_AVAILABLE
1812 static void CmiSendNodeSelf(char *msg) {
1813 #if CMK_IMMEDIATE_MSG
1814     if (CmiIsImmediate(msg)) {
1815         //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1816         CmiPushImmediateMsg(msg);
1817 #if CMK_MULTICORE
1818         CmiHandleImmediate();
1819 #endif
1820         return;
1821     }
1822 #endif    
1823     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1824     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1825     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1826 }
1827
1828 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1829     CmiAbort ("Async Node Send not supported\n");
1830 }
1831
1832 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1833
1834     CMI_SET_BROADCAST_ROOT(msg,0);
1835     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1836     ((CmiMsgHeaderBasic *)msg)->size = size;
1837     CMI_SET_CHECKSUM(msg, size);
1838     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1839     
1840     CQdCreate(CpvAccess(cQdState), 1);
1841
1842     if (node == _Cmi_mynode) {
1843         CmiSendNodeSelf(msg);
1844     } else {
1845         CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg);
1846     }
1847 }
1848
1849 void CmiSyncNodeSendFn(int p, int s, char *m) {
1850     char *dupmsg;
1851     dupmsg = (char *)CmiAlloc(s);
1852     CmiMemcpy(dupmsg,m,s);
1853     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1854     CmiFreeNodeSendFn(p, s, dupmsg);
1855 }
1856
1857 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1858     return NULL;
1859 }
1860
1861 void SendSpanningChildrenNode(int size, char *msg) {
1862     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1863     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1864     assert(startnode>=0 && startnode<CmiNumNodes());
1865
1866     int dist = CmiMyNode()-startnode;
1867     if (dist<0) dist += CmiNumNodes();
1868     int i;
1869     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1870         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1871         if (nid > CmiNumNodes() - 1) break;
1872         nid += startnode;
1873         nid = nid%CmiNumNodes();
1874         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1875         char *dupmsg = (char *)CmiAlloc(size);
1876         CmiMemcpy(dupmsg,msg,size);
1877         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1878         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg);
1879     }
1880 }
1881
1882 /* need */
1883 void CmiFreeNodeBroadcastFn(int s, char *m) {
1884 #if CMK_BROADCAST_SPANNING_TREE
1885     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1886     
1887     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1888
1889     int mynode = CmiMyNode();
1890     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1891     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1892     ((CmiMsgHeaderBasic *)m)->size = s;
1893     CMI_SET_CHECKSUM(m, s);
1894     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1895
1896     SendSpanningChildrenNode(s, m);
1897 #else
1898     int i;
1899     for (i=0; i<CmiNumNodes(); i++) {
1900         if (i==CmiMyNode()) continue;
1901         char *dupmsg = (char *)CmiAlloc(s);
1902         CmiMemcpy(dupmsg,m,s);
1903         CmiFreeNodeSendFn(i, s, dupmsg);
1904     }
1905 #endif
1906     CmiFree(m);    
1907 }
1908
1909 void CmiSyncNodeBroadcastFn(int s, char *m) {
1910     char *dupmsg;
1911     dupmsg = (char *)CmiAlloc(s);
1912     CmiMemcpy(dupmsg,m,s);
1913     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1914     CmiFreeNodeBroadcastFn(s, dupmsg);
1915 }
1916
1917 /* need */
1918 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1919     char *dupmsg = (char *)CmiAlloc(s);
1920     CmiMemcpy(dupmsg,m,s);
1921     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1922     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1923     CMI_SET_CHECKSUM(dupmsg, s);
1924
1925     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1926     
1927     CQdCreate(CpvAccess(cQdState), 1);
1928     CmiSendNodeSelf(dupmsg);
1929
1930     CmiFreeNodeBroadcastFn(s, m);
1931 }
1932
1933 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1934     char *dupmsg;
1935     dupmsg = (char *)CmiAlloc(s);
1936     CmiMemcpy(dupmsg,m,s);
1937     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1938     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1939 }
1940
1941
1942 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1943     return NULL;
1944 }
1945 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1946
1947 #include "manytomany.c"
1948
1949
1950 /*********************************************************************************************
1951 This section is for CmiDirect. This is a variant of the  persistent communication in which
1952 the user can transfer data between processors without using Charm++ messages. This lets the user
1953 send and receive data from the middle of his arrays without any copying on either send or receive
1954 side
1955 *********************************************************************************************/
1956
1957
1958
1959
1960 #ifdef BGP_USE_RDMA
1961
1962 #include "cmidirect.h"
1963
1964 /* We can avoid a receiver side lookup by just sending the whole shebang.
1965    DCMF header is in units of quad words (16 bytes), so we'd need less than a
1966    quad word for the handle if we just sent that and did a lookup. Or exactly
1967    2 quad words for the buffer pointer, callback pointer, callback
1968    data pointer, and DCMF_Request_t pointer with no lookup.
1969
1970    Since CmiDirect is generally going to be used for messages which aren't
1971    tiny, the extra 16 bytes is not likely to impact performance noticably and
1972    not having to lookup handles in tables simplifies the code enormously.
1973
1974    EJB   2008/4/2
1975 */
1976
1977
1978 /**
1979  To be called on the receiver to create a handle and return its number
1980 **/
1981 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
1982     /* one-sided primitives would require registration of memory */
1983
1984     /* with two-sided primitives we just bundle the buffer and callback info into the handle so the sender can remind us about it later. */
1985     struct infiDirectUserHandle userHandle;
1986     userHandle.handle=1; /* doesn't matter on BG/P*/
1987     userHandle.senderNode=senderNode;
1988     userHandle.recverNode=_Cmi_mynode;
1989     userHandle.recverBufSize=recvBufSize;
1990     userHandle.recverBuf=recvBuf;
1991     userHandle.initialValue=initialValue;
1992     userHandle.callbackFnPtr=callbackFnPtr;
1993     userHandle.callbackData=callbackData;
1994     userHandle.DCMF_rq_trecv=ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
1995 #if CMI_DIRECT_DEBUG
1996     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
1997 #endif
1998     return userHandle;
1999 }
2000
2001 /****
2002  To be called on the sender to attach the sender's buffer to this handle
2003 ******/
2004
2005 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
2006
2007     /* one-sided primitives would require registration of memory */
2008
2009     /* with two-sided primitives we just record the sender buf in the handle */
2010     userHandle->senderBuf=sendBuf;
2011     CmiAssert(sendBufSize==userHandle->recverBufSize);
2012     userHandle->DCMF_rq_tsend =ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2013 #if CMI_DIRECT_DEBUG
2014     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2015 #endif
2016
2017 }
2018
2019 /****
2020 To be called on the sender to do the actual data transfer
2021 ******/
2022 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
2023     /** invoke a DCMF_Send with the direct callback */
2024     DCMF_Protocol_t *protocol = NULL;
2025     protocol = &cmi_dcmf_direct_registration;
2026     /* local copy */
2027     CmiAssert(userHandle->recverBuf!=NULL);
2028     CmiAssert(userHandle->senderBuf!=NULL);
2029     CmiAssert(userHandle->recverBufSize>0);
2030     if (userHandle->recverNode== _Cmi_mynode) {
2031 #if CMI_DIRECT_DEBUG
2032         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2033 #endif
2034
2035         memcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
2036         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2037     } else {
2038         dcmfDirectMsgHeader msgHead;
2039         msgHead.recverBuf=userHandle->recverBuf;
2040         msgHead.callbackFnPtr=userHandle->callbackFnPtr;
2041         msgHead.callbackData=userHandle->callbackData;
2042         msgHead.DCMF_rq_t=(DCMF_Request_t *) userHandle->DCMF_rq_trecv;
2043 #if CMK_SMP
2044         DCMF_CriticalSection_enter (0);
2045 #endif
2046 #if CMI_DIRECT_DEBUG
2047         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2048 #endif
2049         DCMF_Send (protocol,
2050                    (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
2051                    directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2052                    userHandle->recverBufSize, userHandle->senderBuf,
2053                    (struct DCQuad *) &(msgHead), 2);
2054
2055 #if CMK_SMP
2056         DCMF_CriticalSection_exit (0);
2057 #endif
2058     }
2059 }
2060
2061 /**** Should not be called the first time *********/
2062 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2063     /* no op on BGP */
2064 }
2065
2066 /**** Should not be called the first time *********/
2067 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2068     /* no op on BGP */
2069 }
2070
2071 /**** Should not be called the first time *********/
2072 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2073     /* no op on BGP */
2074 }
2075
2076 #endif /* BGP_USE_RDMA*/
2077