Updates for xl compiler and V1R3.
[charm.git] / src / arch / bluegenep / machine.c
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include "assert.h"
12 #include "malloc.h"
13
14 #include <bpcore/ppc450_inlines.h>
15
16 #include "dcmf.h"
17 #include "dcmf_multisend.h"
18
19 char *ALIGN_16(char *p) {
20     return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
21 }
22
23 #define PROGRESS_PERIOD 1024
24
25 //There are two roles of comm thread:
26 // 1. polling other cores' bcast msg queue
27 // 2. bcast msg is handled only by comm thd
28 #define BCASTMSG_ONLY_TO_COMMTHD 0
29
30 CpvDeclare(PCQueue, broadcast_q);                 //queue to send broadcast messages
31 #if CMK_NODE_QUEUE_AVAILABLE
32 CsvDeclare(PCQueue, node_bcastq);
33 CsvDeclare(CmiNodeLock, node_bcastLock);
34 #endif
35
36 /*
37     To reduce the buffer used in broadcast and distribute the load from
38   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
39   spanning tree broadcast algorithm.
40     This will use the fourth short in message as an indicator of spanning tree
41   root.
42 */
43 #if CMK_SMP
44 #define CMK_BROADCAST_SPANNING_TREE   1
45 #else
46 #define CMK_BROADCAST_SPANNING_TREE    1
47 #define CMK_BROADCAST_HYPERCUBE        0
48 #endif /* CMK_SMP */
49
50 #define BROADCAST_SPANNING_FACTOR     4
51
52 //The root of the message infers the type of the message
53 // 1. root is 0, then it is a normal point-to-point message
54 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
55 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
58 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
59 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
60
61 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
62 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
63
64 /* FIXME: need a random number that everyone agrees ! */
65 #define CHARM_MAGIC_NUMBER               126
66
67 #if !CMK_OPTIMIZE
68 static int checksum_flag = 0;
69 extern unsigned char computeCheckSum(unsigned char *data, int len);
70
71 #define CMI_SET_CHECKSUM(msg, len)      \
72         if (checksum_flag)  {   \
73           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
74           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
75         }
76
77 #define CMI_CHECK_CHECKSUM(msg, len)    \
78         if (checksum_flag)      \
79           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
80             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
81             for(count = 0; count < len; count++) { \
82                 printf("%2x", msg[count]);                 \
83             }                                             \
84             printf("------------------------------\n\n"); \
85             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
86           }
87 #else
88 #define CMI_SET_CHECKSUM(msg, len)
89 #define CMI_CHECK_CHECKSUM(msg, len)
90 #endif
91
92 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
93
94 #if CMK_BROADCAST_HYPERCUBE
95 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
96 #else
97 #  define CMI_SET_CYCLE(msg, cycle)
98 #endif
99
100 int               _Cmi_numpes;
101 int               _Cmi_mynode;    /* Which address space am I */
102 int               _Cmi_mynodesize;/* Number of processors in my address space */
103 int               _Cmi_numnodes;  /* Total number of address spaces */
104 int                Cmi_nodestart; /* First processor in this address space */
105 CpvDeclare(void*, CmiLocalQueue);
106
107
108 #if CMK_NODE_QUEUE_AVAILABLE
109 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
110 // is available
111 #define NODE_BROADCAST_OTHERS (-1)
112 #define NODE_BROADCAST_ALL    (-2)
113 #endif
114
115
116 typedef struct ProcState {
117     /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
118     CmiNodeLock  recvLock;              /* for cs->recv */
119     CmiNodeLock bcastLock;
120 } ProcState;
121
122 static ProcState  *procState;
123
124 #if CMK_SMP && !CMK_MULTICORE
125 static volatile int commThdExit = 0;
126 static CmiNodeLock commThdExitLock = 0;
127 #endif
128
129 void ConverseRunPE(int everReturn);
130 static void CommunicationServer(int sleepTime);
131 static void CommunicationServerThread(int sleepTime);
132
133 //So far we dont define any comm threads
134 int Cmi_commthread = 0;
135
136 #include "machine-smp.c"
137 CsvDeclare(CmiNodeState, NodeState);
138 #include "immediate.c"
139
140 void AdvanceCommunications();
141
142
143 #if !CMK_SMP
144 /************ non SMP **************/
145 static struct CmiStateStruct Cmi_state;
146 int _Cmi_mype;
147 int _Cmi_myrank;
148
149 void CmiMemLock(void) {}
150 void CmiMemUnlock(void) {}
151
152 #define CmiGetState() (&Cmi_state)
153 #define CmiGetStateN(n) (&Cmi_state)
154
155 //void CmiYield(void) { sleep(0); }
156
157 static void CmiStartThreads(char **argv) {
158     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
159     _Cmi_mype = Cmi_nodestart;
160     _Cmi_myrank = 0;
161 }
162 #endif  /* !CMK_SMP */
163
164 //int received_immediate;
165 //int received_broadcast;
166
167 /*Add a message to this processor's receive queue, pe is a rank */
168 static void CmiPushPE(int pe,void *msg) {
169     CmiState cs = CmiGetStateN(pe);
170     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
171 #if CMK_IMMEDIATE_MSG
172     if (CmiIsImmediate(msg)) {
173         /**(CmiUInt2 *)msg = pe;*/
174         //received_immediate = 1;
175         //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
176         //CMI_DEST_RANK(msg) = pe;
177         CmiPushImmediateMsg(msg);
178         return;
179     }
180 #endif
181 #if CMK_SMP
182     CmiLock(procState[pe].recvLock);
183 #endif
184
185     PCQueuePush(cs->recv,(char *)msg);
186     //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
187
188 #if CMK_SMP
189     CmiUnlock(procState[pe].recvLock);
190 #endif
191     CmiIdleLock_addMessage(&cs->idle);
192     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
193 }
194
195 #if CMK_NODE_QUEUE_AVAILABLE
196 /*Add a message to this processor's receive queue */
197 static void CmiPushNode(void *msg) {
198     MACHSTATE(3,"Pushing message into NodeRecv queue");
199 #if CMK_IMMEDIATE_MSG
200     if (CmiIsImmediate(msg)) {
201         //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
202         //CMI_DEST_RANK(msg) = 0;
203         CmiPushImmediateMsg(msg);
204         return;
205     }
206 #endif
207     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
208     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
209     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
210     {
211         CmiState cs=CmiGetStateN(0);
212         CmiIdleLock_addMessage(&cs->idle);
213     }
214 }
215 #endif /* CMK_NODE_QUEUE_AVAILABLE */
216
217 volatile int msgQueueLen;
218 volatile int outstanding_recvs;
219
220 static int Cmi_dim;     /* hypercube dim of network */
221
222 static char     **Cmi_argv;
223 static char     **Cmi_argvcopy;
224 static CmiStartFn Cmi_startfn;   /* The start function */
225 static int        Cmi_usrsched;  /* Continue after start function finishes? */
226
227 extern void ConverseCommonInit(char **argv);
228 extern void ConverseCommonExit(void);
229 extern void CthInit(char **argv);
230
231 static void SendMsgsUntil(int);
232
233
234 void SendSpanningChildren(int size, char *msg);
235 #if CMK_NODE_QUEUE_AVAILABLE
236 void SendSpanningChildrenNode(int size, char *msg);
237 #endif
238 void SendHypercube(int size, char *msg);
239
240 DCMF_Protocol_t  cmi_dcmf_short_registration __attribute__((__aligned__(16)));
241 DCMF_Protocol_t  cmi_dcmf_eager_registration __attribute__((__aligned__(16)));
242 DCMF_Protocol_t  cmi_dcmf_rzv_registration   __attribute__((__aligned__(16)));
243 DCMF_Protocol_t  cmi_dcmf_multicast_registration   __attribute__((__aligned__(16)));
244
245 #define BGP_USE_RDMA 1
246 /*#define CMI_DIRECT_DEBUG 1*/
247 #ifdef BGP_USE_RDMA
248
249
250 DCMF_Protocol_t  cmi_dcmf_direct_registration __attribute__((__aligned__(16)));
251 /** The receive side of a put implemented in DCMF_Send */
252
253
254 typedef struct {
255     void *recverBuf;
256   void (*callbackFnPtr)(void *);
257     void *callbackData;
258     DCMF_Request_t *DCMF_rq_t;
259 } dcmfDirectMsgHeader;
260
261 /* nothing for us to do here */
262 #if (DCMF_VERSION_MAJOR >= 2)
263 void direct_send_done_cb(void*nothing, DCMF_Error_t *err) 
264 #else 
265   void direct_send_done_cb(void*nothing) 
266 #endif
267 {
268 #if CMI_DIRECT_DEBUG
269   CmiPrintf("[%d] RDMA send_done_cb\n", CmiMyPe());
270 #endif
271 }
272
273 DCMF_Callback_t  directcb;
274
275 void     direct_short_pkt_recv (void             * clientdata,
276                                 const DCQuad     * info,
277                                 unsigned           count,
278                                 unsigned           senderrank,
279                                 const char       * buffer,
280                                 const unsigned     sndlen) {
281 #if CMI_DIRECT_DEBUG
282     CmiPrintf("[%d] RDMA direct_short_pkt_recv\n", CmiMyPe());
283 #endif
284     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
285     CmiMemcpy(msgHead->recverBuf, buffer, sndlen);
286     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
287 }
288
289
290 #if (DCMF_VERSION_MAJOR >= 2)
291 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
292 #else
293 typedef void (*cbhdlr) (void *);
294 #endif
295
296 DCMF_Request_t * direct_first_pkt_recv_done (void              * clientdata,
297         const DCQuad      * info,
298         unsigned            count,
299         unsigned            senderrank,
300         const unsigned      sndlen,
301         unsigned          * rcvlen,
302         char             ** buffer,
303         DCMF_Callback_t   * cb
304                                             ) {
305 #if CMI_DIRECT_DEBUG
306     CmiPrintf("[%d] RDMA direct_first_pkt_recv_done\n", CmiMyPe());
307 #endif
308     /* pull the data we need out of the header */
309     *rcvlen=sndlen;
310     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
311     cb->function= (cbhdlr)msgHead->callbackFnPtr;
312     cb->clientdata=msgHead->callbackData;
313     *buffer=msgHead->recverBuf;
314     return msgHead->DCMF_rq_t;
315 }
316
317
318 #endif
319
320 typedef struct msg_list {
321     char              * msg;
322     int                 size;
323     int                 destpe;
324     int               * pelist;
325     DCMF_Callback_t     cb;
326     DCQuad              info __attribute__((__aligned__(16)));
327     DCMF_Request_t      send __attribute__((__aligned__(16)));
328 } SMSG_LIST __attribute__((__aligned__(16)));
329
330 #define MAX_NUM_SMSGS   64
331 CpvDeclare(PCQueue, smsg_list_q);
332
333 static inline SMSG_LIST * smsg_allocate() {
334     SMSG_LIST *smsg = (SMSG_LIST *)PCQueuePop(CpvAccess(smsg_list_q));
335     if (smsg != NULL)
336         return smsg;
337
338     void * buf = malloc(sizeof(SMSG_LIST)); 
339     assert(buf!=NULL);
340     assert (((unsigned)buf & 0x0f) == 0);
341
342     return (SMSG_LIST *) buf;
343 }
344
345 static inline void smsg_free (SMSG_LIST *smsg) {
346     int size = PCQueueLength (CpvAccess(smsg_list_q));
347     if (size < MAX_NUM_SMSGS)
348         PCQueuePush (CpvAccess(smsg_list_q), (char *) smsg);
349     else
350         free (smsg);
351 }
352
353 typedef struct {
354     int sleepMs; /*Milliseconds to sleep while idle*/
355     int nIdles; /*Number of times we've been idle in a row*/
356     CmiState cs; /*Machine state*/
357 } CmiIdleState;
358
359 static CmiIdleState *CmiNotifyGetState(void) {
360     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
361     s->sleepMs=0;
362     s->nIdles=0;
363     s->cs=CmiGetState();
364     return s;
365 }
366
367
368 #if (DCMF_VERSION_MAJOR >= 2)
369 static void send_done(void *data, DCMF_Error_t *err) 
370 #else 
371 static void send_done(void *data) 
372 #endif
373 /* send done callback: sets the smsg entry to done */
374 {
375     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
376     CmiFree(msg_tmp->msg);
377     //free(data);
378     smsg_free (msg_tmp);
379
380     msgQueueLen--;
381 }
382
383 #if (DCMF_VERSION_MAJOR >= 2)
384 static void send_multi_done(void *data, DCMF_Error_t *err) 
385 #else 
386 static void send_multi_done(void *data) 
387 #endif
388 /* send done callback: sets the smsg entry to done */
389 {
390     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
391     CmiFree(msg_tmp->msg);
392     free(msg_tmp->pelist);
393
394     smsg_free(msg_tmp);
395
396     msgQueueLen--;
397 }
398
399
400 #if (DCMF_VERSION_MAJOR >= 2)
401 static void recv_done(void *clientdata, DCMF_Error_t * err) 
402 #else 
403 static void recv_done(void *clientdata) 
404 #endif
405 /* recv done callback: push the recved msg to recv queue */
406 {
407
408     char *msg = (char *) clientdata;
409     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
410
411     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
412
413     /* then we do what PumpMsgs used to do:
414      * push msg to recv queue */
415     int count=0;
416     CMI_CHECK_CHECKSUM(msg, sndlen);
417     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
418         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
419         return;
420     }
421
422 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
423     if (CMI_IS_BCAST_ON_CORES(msg) ) {
424         int pe = CMI_DEST_RANK(msg);
425
426         //printf ("%d: Receiving bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, pe);
427
428         char *copymsg;
429         copymsg = (char *)CmiAlloc(sndlen);
430         CmiMemcpy(copymsg,msg,sndlen);
431
432         //received_broadcast = 1;
433 #if CMK_SMP
434         CmiLock(procState[pe].bcastLock);
435         PCQueuePush(CpvAccessOther(broadcast_q, pe), copymsg);
436         CmiUnlock(procState[pe].bcastLock);
437 #else
438         PCQueuePush(CpvAccess(broadcast_q), copymsg);
439 #endif
440     }
441 #endif
442
443 #if CMK_NODE_QUEUE_AVAILABLE
444 #if CMK_BROADCAST_SPANNING_TREE
445     if (CMI_IS_BCAST_ON_NODES(msg)) {
446         //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
447         char *copymsg = (char *)CmiAlloc(sndlen);
448         CmiMemcpy(copymsg,msg,sndlen);
449         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
450         CmiLock(CsvAccess(node_bcastLock));
451         PCQueuePush(CsvAccess(node_bcastq), copymsg);
452         CmiUnlock(CsvAccess(node_bcastLock));
453         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
454     }
455 #endif
456     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
457         CmiPushNode(msg);
458     else
459 #endif
460
461 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
462         if (CMI_DEST_RANK(msg)<_Cmi_mynodesize) { //not the comm thd
463             CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
464         }
465 #else
466         CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
467 #endif
468
469     outstanding_recvs --;
470 }
471
472
473 void     short_pkt_recv (void             * clientdata,
474                          const DCQuad     * info,
475                          unsigned           count,
476                          unsigned           senderrank,
477                          const char       * buffer,
478                          const unsigned     sndlen) {
479     outstanding_recvs ++;
480     int alloc_size = sndlen;
481
482     char * new_buffer = (char *)CmiAlloc(alloc_size);
483     CmiMemcpy (new_buffer, buffer, sndlen);
484     
485 #if (DCMF_VERSION_MAJOR >= 2)
486     recv_done (new_buffer, NULL);
487 #else
488     recv_done (new_buffer);
489 #endif
490 }
491
492
493 DCMF_Request_t * first_multi_pkt_recv_done (const DCQuad      * info,
494                                             unsigned            count,
495                                             unsigned            senderrank,                                    
496                                             const unsigned      sndlen,
497                                             unsigned            connid,
498                                             void              * clientdata,
499                                             unsigned          * rcvlen,
500                                             char             ** buffer,
501                                             unsigned          * pw,
502                                             DCMF_Callback_t   * cb
503                                             ) {
504     outstanding_recvs ++;
505     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
506
507     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
508
509     /* printf ("Receiving %d bytes\n", sndlen); */
510     *rcvlen = sndlen;  /* to avoid malloc(0) which might
511                                    return NULL */
512
513     *buffer = (char *)CmiAlloc(alloc_size);
514     cb->function = recv_done;
515     cb->clientdata = *buffer;
516
517     *pw  = 0x7fffffff;
518     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
519 }
520
521
522 DCMF_Request_t * first_pkt_recv_done (void              * clientdata,
523                                       const DCQuad      * info,
524                                       unsigned            count,
525                                       unsigned            senderrank,
526                                       const unsigned      sndlen,
527                                       unsigned          * rcvlen,
528                                       char             ** buffer,
529                                       DCMF_Callback_t   * cb
530                                      ) {
531     outstanding_recvs ++;
532     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
533
534     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
535
536     /* printf ("Receiving %d bytes\n", sndlen); */
537     *rcvlen = sndlen;  /* to avoid malloc(0) which might
538                                    return NULL */
539
540     *buffer = (char *)CmiAlloc(alloc_size);
541     cb->function = recv_done;
542     cb->clientdata = *buffer;
543
544     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
545 }
546
547
548 #if CMK_NODE_QUEUE_AVAILABLE
549 void sendBroadcastMessagesNode() {
550     if (PCQueueLength(CsvAccess(node_bcastq))==0) return;
551     //node broadcast message could be always handled by any cores (including
552     //comm thd) on this node
553     //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
554     CmiLock(CsvAccess(node_bcastLock));
555     char *msg = PCQueuePop(CsvAccess(node_bcastq));
556     CmiUnlock(CsvAccess(node_bcastLock));
557     //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
558     while (msg) {
559 #if CMK_BROADCAST_SPANNING_TREE
560         //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
561         SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
562 #endif
563         CmiFree(msg);
564         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
565         CmiLock(CsvAccess(node_bcastLock));
566         msg = PCQueuePop(CsvAccess(node_bcastq));
567         CmiUnlock(CsvAccess(node_bcastLock));
568         //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
569     }
570 }
571 #endif
572
573 void sendBroadcastMessages() {
574 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
575 //in the presence of comm thd, and it is not responsible for broadcasting msg,
576 //the comm thd will help to pull the msg from rank 0 which is predefined as the
577 //core on a smp node to receive bcast msg.
578     int toPullRank = CmiMyRank();
579     if (CmiMyRank()==_Cmi_mynodesize) toPullRank = 0; //comm thd only pulls msg from rank 0
580 #else
581     int toPullRank = CmiMyRank();
582 #endif
583     PCQueue toPullQ;
584
585     /*
586     if(CmiMyRank()==_Cmi_mynodesize)
587       printf("Comm thd on node [%d] is pulling bcast msg\n", CmiMyNode());
588     else
589       printf("Work thd [%d] on node [%d] is pulling bcast msg\n", CmiMyRank(), CmiMyNode());
590     */
591 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
592     toPullQ = CpvAccessOther(broadcast_q, toPullRank);
593 #else
594     toPullQ = CpvAccess(broadcast_q);
595 #endif
596
597     if (PCQueueLength(toPullQ)==0) return;
598 #if CMK_SMP
599     CmiLock(procState[toPullRank].bcastLock);
600 #endif
601
602     char *msg = (char *) PCQueuePop(toPullQ);
603
604 #if CMK_SMP
605     CmiUnlock(procState[toPullRank].bcastLock);
606 #endif
607
608     while (msg) {
609
610 #if CMK_BROADCAST_SPANNING_TREE
611         SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
612 #elif CMK_BROADCAST_HYPERCUBE
613         SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
614 #endif
615
616         CmiFree (msg);
617
618 #if CMK_SMP
619         CmiLock(procState[toPullRank].bcastLock);
620 #endif
621
622 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
623         toPullQ = CpvAccessOther(broadcast_q, toPullRank);
624 #else
625         toPullQ = CpvAccess(broadcast_q);
626 #endif
627         msg = (char *) PCQueuePop(toPullQ);
628
629 #if CMK_SMP
630         CmiUnlock(procState[toPullRank].bcastLock);
631 #endif
632     }
633 }
634
635 CpvDeclare(unsigned, networkProgressCount);
636 int  networkProgressPeriod;
637
638 #if 0
639 unsigned int *ranklist;
640
641 BGTsC_t        barrier;
642
643 // -----------------------------------------
644 // Rectangular broadcast implementation
645 // -----------------------------------------
646
647 #define MAX_COMM  256
648 static void * comm_table [MAX_COMM];
649
650 typedef struct rectbcast_msg {
651     BGTsRC_t           request;
652     DCMF_Callback_t    cb;
653     char              *msg;
654 } RectBcastInfo;
655
656
657 static void bcast_done (void *data) {
658     RectBcastInfo *rinfo = (RectBcastInfo *) data;
659     CmiFree (rinfo->msg);
660     free (rinfo);
661 }
662
663 static  void *   getRectBcastRequest (unsigned comm) {
664     return comm_table [comm];
665 }
666
667
668 static  void *  bcast_recv     (unsigned               root,
669                                 unsigned               comm,
670                                 const unsigned         sndlen,
671                                 unsigned             * rcvlen,
672                                 char                ** rcvbuf,
673                                 DCMF_Callback_t      * const cb) {
674
675     int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
676
677     *rcvlen = sndlen;  /* to avoid malloc(0) which might
678                                    return NULL */
679
680     *rcvbuf       =  (char *)CmiAlloc(alloc_size);
681     cb->function  =   recv_done;
682     cb->clientdata = *rcvbuf;
683
684     return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
685
686 }
687
688
689 extern void bgl_machine_RectBcast (unsigned                 commid,
690                                        const char             * sndbuf,
691                                        unsigned                 sndlen) {
692     RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));
693     rinfo->cb.function    =   bcast_done;
694     rinfo->cb.clientdata  =   rinfo;
695
696     BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
697
698 }
699
700 extern void        bgl_machine_RectBcastInit  (unsigned               commID,
701             const BGTsRC_Geometry_t* geometry) {
702
703     CmiAssert (commID < 256);
704     CmiAssert (comm_table [commID] == NULL);
705
706     BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
707     comm_table [commID] = request;
708
709     BGTsRC_AsyncBcast_init  (request, commID,  geometry);
710 }
711
712
713
714
715 //--------------------------------------------------------------
716 //----- End Rectangular Broadcast Implementation ---------------
717 //--------------------------------------------------------------
718 #endif
719
720 //approx sleep command
721 void mysleep (int cycles) {
722     unsigned long long start = DCMF_Timebase();
723     unsigned long long end = start + cycles;
724
725     while (start < end)
726         start = DCMF_Timebase();
727
728     return;
729 }
730
731 static void * test_buf;
732
733 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
734     int n, i, count;
735
736     //fprintf(stderr, "Initializing Converse Blue Gene/P machine Layer\n");
737
738     DCMF_Messager_initialize();
739
740 #if CMK_SMP
741     DCMF_Configure_t  config_in, config_out;
742     config_in.thread_level= DCMF_THREAD_MULTIPLE;
743     config_in.interrupts  = DCMF_INTERRUPTS_OFF;
744
745     DCMF_Messager_configure(&config_in, &config_out);
746     //assert (config_out.thread_level == DCMF_THREAD_MULTIPLE); //not supported in vn mode
747 #endif
748
749     DCMF_Send_Configuration_t short_config, eager_config, rzv_config;
750
751
752     short_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
753     short_config.cb_recv_short = short_pkt_recv;
754     short_config.cb_recv       = first_pkt_recv_done;
755 #if (DCMF_VERSION_MAJOR >= 2)
756     short_config.network  = DCMF_DefaultNetwork;
757 #endif
758
759     eager_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
760     eager_config.cb_recv_short = short_pkt_recv;
761     eager_config.cb_recv       = first_pkt_recv_done;
762 #if (DCMF_VERSION_MAJOR >= 2)
763     eager_config.network  = DCMF_DefaultNetwork;
764 #endif
765
766 #ifdef  OPT_RZV
767 #warning "Enabling Optimize Rzv"
768     rzv_config.protocol        = DCMF_RZV_SEND_PROTOCOL;
769 #else
770     rzv_config.protocol        = DCMF_DEFAULT_SEND_PROTOCOL;
771 #endif
772     rzv_config.cb_recv_short   = short_pkt_recv;
773     rzv_config.cb_recv         = first_pkt_recv_done;
774 #if (DCMF_VERSION_MAJOR >= 2)
775     rzv_config.network  = DCMF_DefaultNetwork;
776 #endif
777
778     DCMF_Send_register (&cmi_dcmf_short_registration, &short_config);
779     DCMF_Send_register (&cmi_dcmf_eager_registration, &eager_config);
780     DCMF_Send_register (&cmi_dcmf_rzv_registration,   &rzv_config);
781
782 #ifdef BGP_USE_RDMA
783     DCMF_Send_Configuration_t direct_config;
784     direct_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
785     direct_config.cb_recv_short = direct_short_pkt_recv;
786     direct_config.cb_recv       = direct_first_pkt_recv_done;
787     DCMF_Send_register (&cmi_dcmf_direct_registration,   &direct_config);
788     directcb.function=direct_send_done_cb;
789     directcb.clientdata=NULL;
790 #endif
791
792     //fprintf(stderr, "Initializing Eager Protocol\n");
793
794     _Cmi_numnodes = DCMF_Messager_size();
795     _Cmi_mynode = DCMF_Messager_rank();
796
797     unsigned rank = DCMF_Messager_rank();
798     unsigned size = DCMF_Messager_size();
799
800     CmiBarrier();
801     CmiBarrier();
802     CmiBarrier();
803
804     /* processor per node */
805     _Cmi_mynodesize = 1;
806     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
807 #if ! CMK_SMP
808     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
809         CmiAbort("+ppn cannot be used in non SMP version!\n");
810 #endif
811
812     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
813     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
814     Cmi_argvcopy = CmiCopyArgs(argv);
815     Cmi_argv = argv;
816     Cmi_startfn = fn;
817     Cmi_usrsched = usched;
818
819     //printf ("Starting Charm with %d nodes and %d processors\n", CmiNumNodes(), CmiNumPes());
820
821     DCMF_Multicast_Configuration_t mconfig;
822     mconfig.protocol = DCMF_MEMFIFO_DMA_MSEND_PROTOCOL;
823     mconfig.cb_recv  = first_multi_pkt_recv_done;
824     mconfig.clientdata = NULL;
825     mconfig.connectionlist = (void **) malloc (CmiNumPes() * sizeof(unsigned long));
826     mconfig.nconnections = CmiNumPes();  
827     DCMF_Multicast_register(&cmi_dcmf_multicast_registration, &mconfig);
828
829
830     /* find dim = log2(numpes), to pretend we are a hypercube */
831     for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
832         Cmi_dim++ ;
833
834
835     /* checksum flag */
836     if (CmiGetArgFlag(argv,"+checksum")) {
837 #if !CMK_OPTIMIZE
838         checksum_flag = 1;
839         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
840 #else
841         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
842 #endif
843     }
844
845     CsvInitialize(CmiNodeState, NodeState);
846     CmiNodeStateInit(&CsvAccess(NodeState));
847
848 #if CMK_NODE_QUEUE_AVAILABLE
849     CsvInitialize(PCQueue, node_bcastq);
850     CsvAccess(node_bcastq) = PCQueueCreate();
851     CsvInitialize(CmiNodeLock, node_bcastLock);
852     CsvAccess(node_bcastLock) = CmiCreateLock();
853 #endif
854
855     int actualNodeSize = _Cmi_mynodesize;
856 #if !CMK_MULTICORE
857     actualNodeSize++; //considering the extra comm thread
858 #endif
859
860     procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
861     for (i=0; i<actualNodeSize; i++) {
862         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
863         procState[i].recvLock = CmiCreateLock();
864         procState[i].bcastLock = CmiCreateLock();
865     }
866
867 #if CMK_SMP && !CMK_MULTICORE
868     commThdExitLock = CmiCreateLock();
869 #endif
870
871     /* Network progress function is used to poll the network when for
872        messages. This flushes receive buffers on some  implementations*/
873     networkProgressPeriod = PROGRESS_PERIOD;
874     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
875
876     //printf ("Starting Threads\n");
877
878     CmiStartThreads(argv);
879
880     ConverseRunPE(initret);
881 }
882
883
884 int PerrorExit (char *err) {
885     fprintf (stderr, "err\n\n");
886     exit (-1);
887     return -1;
888 }
889
890
891 void ConverseRunPE(int everReturn) {
892     //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
893
894     CmiIdleState *s=CmiNotifyGetState();
895     CmiState cs;
896     char** CmiMyArgv;
897     CmiNodeAllBarrier();
898
899     cs = CmiGetState();
900     CpvInitialize(void *,CmiLocalQueue);
901     CpvAccess(CmiLocalQueue) = cs->localqueue;
902
903     if (CmiMyRank())
904         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
905     else
906         CmiMyArgv=Cmi_argv;
907
908     CthInit(CmiMyArgv);
909
910     //printf ("Before Converse Common Init\n");
911     ConverseCommonInit(CmiMyArgv);
912
913     /* initialize the network progress counter*/
914     /* Network progress function is used to poll the network when for
915        messages. This flushes receive buffers on some  implementations*/
916     CpvInitialize(int , networkProgressCount);
917     CpvAccess(networkProgressCount) = 0;
918
919     CpvInitialize(PCQueue, broadcast_q);
920     CpvAccess(broadcast_q) = PCQueueCreate();
921
922     CpvInitialize(PCQueue, smsg_list_q);
923     CpvAccess(smsg_list_q) = PCQueueCreate();
924
925     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
926
927     CmiBarrier();
928
929     /* Converse initialization finishes, immediate messages can be processed.
930        node barrier previously should take care of the node synchronization */
931     _immediateReady = 1;
932
933     /* communication thread */
934     if (CmiMyRank() == CmiMyNodeSize()) {
935         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
936         while (1) CommunicationServer(5);
937     } else {
938         //printf ("Calling Start Fn and the scheduler \n");
939
940         if (!everReturn) {
941             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
942             if (Cmi_usrsched==0) CsdScheduler(-1);
943             ConverseExit();
944         }
945     }
946 }
947
948 #if CMK_SMP
949 static int inexit = 0;
950
951 /* test if all processors recv queues are empty */
952 static int RecvQueueEmpty() {
953     int i;
954     for (i=0; i<_Cmi_mynodesize; i++) {
955         CmiState cs=CmiGetStateN(i);
956         if (!PCQueueEmpty(cs->recv)) return 0;
957     }
958     return 1;
959 }
960
961 #endif
962
963
964 //extern void DCMF_Messager_dumpTimers();
965
966 void ConverseExit(void) {
967
968     while (msgQueueLen > 0 || outstanding_recvs > 0) {
969         AdvanceCommunications();
970     }
971
972     CmiNodeBarrier();
973     ConverseCommonExit();
974
975     //  if(CmiMyPe()%101 == 0)
976     //DCMF_Messager_dumpTimers();
977
978     if (CmiMyPe() == 0) {
979         printf("End of program\n");
980     }
981
982     CmiNodeBarrier();
983 //  CmiNodeAllBarrier ();
984
985 #if CMK_SMP && !CMK_MULTICORE
986     //CmiLock(commThdExitLock);
987     commThdExit = 1;
988     //CmiUnlock(commThdExitLock);
989
990     _bgp_msync();
991 #endif
992
993     int rank0 = 0;
994
995     if (CmiMyRank() == 0) {
996         rank0 = 1;
997         //CmiFree(procState);
998         DCMF_Messager_finalize();
999     }
1000
1001     CmiNodeBarrier();
1002 //  CmiNodeAllBarrier ();
1003
1004     if (rank0)
1005         exit(0);
1006     else
1007         pthread_exit(NULL);
1008 }
1009
1010 /* exit() called on any node would abort the whole program */
1011 void CmiAbort(const char * message) {
1012     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1013              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
1014              msgQueueLen, outstanding_recvs, message);
1015     //CmiPrintStackTrace(0);
1016
1017     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1018         AdvanceCommunications();
1019     }
1020
1021     CmiBarrier();
1022     assert (0);
1023 }
1024
1025 static void CommunicationServer(int sleepTime) {
1026 #if CMK_SMP && !CMK_MULTICORE
1027     //CmiLock(commThdExitLock);
1028     if (commThdExit) {
1029         while (msgQueueLen > 0 || outstanding_recvs > 0) {
1030             AdvanceCommunications();
1031         }
1032         CmiUnlock(commThdExitLock);
1033         pthread_exit(NULL);
1034         return;
1035     }
1036     //CmiUnlock(commThdExitLock);
1037 #endif
1038     AdvanceCommunications();
1039
1040 #if CMK_IMMEDIATE_MSG && CMK_SMP && !CMK_MULTICORE
1041     CmiHandleImmediate();
1042 #endif
1043
1044     //mysleep(sleepTime);
1045 }
1046
1047 static void CommunicationServerThread(int sleepTime) {
1048 #if CMK_SMP
1049     CommunicationServer(sleepTime);
1050 #endif
1051 //immediate msgs are handled in AdvanceCommunications
1052 }
1053
1054 #if CMK_NODE_QUEUE_AVAILABLE
1055 char *CmiGetNonLocalNodeQ(void) {
1056     CmiState cs = CmiGetState();
1057     char *result = 0;
1058     CmiIdleLock_checkMessage(&cs->idle);
1059     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1060         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1061
1062         if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1063             //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1064             result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1065             CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1066         }
1067
1068         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1069     }
1070     return result;
1071 }
1072 #endif
1073
1074
1075 void *CmiGetNonLocal() {
1076
1077     CmiState cs = CmiGetState();
1078
1079     void *msg = NULL;
1080     CmiIdleLock_checkMessage(&cs->idle);
1081     /* although it seems that lock is not needed, I found it crashes very often
1082        on mpi-smp without lock */
1083
1084 #if !CMK_SMP || CMK_MULTICORE  /*|| !BCASTMSG_ONLY_TO_COMMTHD*/
1085 //ChaoMei changes
1086     AdvanceCommunications();
1087 #endif
1088
1089     /*if(CmiMyRank()==0) printf("Got stuck here on proc[%d] node[%d]\n", CmiMyPe(), CmiMyNode());*/
1090
1091     if (PCQueueLength(cs->recv)==0) return NULL;
1092
1093 #if CMK_SMP
1094     CmiLock(procState[cs->rank].recvLock);
1095 #endif
1096
1097     msg =  PCQueuePop(cs->recv);
1098
1099 #if CMK_SMP
1100     CmiUnlock(procState[cs->rank].recvLock);
1101 #endif
1102
1103     return msg;
1104 }
1105
1106 static void CmiSendSelf(char *msg) {
1107 #if CMK_IMMEDIATE_MSG
1108     if (CmiIsImmediate(msg)) {
1109         /* CmiBecomeNonImmediate(msg); */
1110         //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1111         CmiPushImmediateMsg(msg);
1112 #if CMK_MULTICORE
1113         CmiHandleImmediate();
1114 #endif
1115         return;
1116     }
1117 #endif
1118     
1119     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1120 }
1121
1122 #if CMK_SMP
1123 static void CmiSendPeer (int rank, int size, char *msg) {
1124 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
1125     if (CMI_BROADCAST_ROOT(msg) != 0) {
1126         char *copymsg;
1127         copymsg = (char *)CmiAlloc(size);
1128         CmiMemcpy(copymsg,msg,size);
1129
1130         CmiLock(procState[rank].bcastLock);
1131         PCQueuePush(CpvAccessOther(broadcast_q, rank), copymsg);
1132         CmiUnlock(procState[rank].bcastLock);
1133     }
1134 #endif
1135     
1136     CmiPushPE (rank, msg);
1137 }
1138 #endif
1139
1140
1141 void machineSend(SMSG_LIST *msg_tmp) {    
1142
1143     if (msg_tmp->destpe == CmiMyNode())
1144         CmiAbort("Sending to self\n");
1145
1146     CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumNodes());
1147     msg_tmp->cb.function     =   send_done;
1148     msg_tmp->cb.clientdata   =   msg_tmp;
1149
1150     DCMF_Protocol_t *protocol = NULL;
1151
1152     if (msg_tmp->size < 224)
1153         protocol = &cmi_dcmf_short_registration;
1154     else if (msg_tmp->size < 2048)
1155         protocol = &cmi_dcmf_eager_registration;
1156     else
1157         protocol = &cmi_dcmf_rzv_registration;
1158
1159
1160 #if CMK_SMP
1161     DCMF_CriticalSection_enter (0);
1162 #endif
1163
1164     msgQueueLen ++;
1165     DCMF_Send (protocol, &msg_tmp->send, msg_tmp->cb,
1166                DCMF_MATCH_CONSISTENCY, msg_tmp->destpe,
1167                msg_tmp->size, msg_tmp->msg, &msg_tmp->info, 1);
1168
1169 /*    
1170     #if CMK_SMP && !CMK_MULTICORE
1171     //Adding this advance call here improves the SMP performance
1172     //a little bit although it is possible that some more bugs are
1173     //introduced
1174     DCMF_Messager_advance();
1175     #endif
1176 */
1177
1178 #if CMK_SMP
1179     DCMF_CriticalSection_exit (0);
1180 #endif
1181 }
1182
1183 #define MAX_MULTICAST 128
1184 DCMF_Opcode_t  CmiOpcodeList [MAX_MULTICAST];
1185
1186 void  machineMulticast(int npes, int *pelist, int size, char* msg){  
1187   CQdCreate(CpvAccess(cQdState), npes);
1188   
1189   CmiAssert (npes < MAX_MULTICAST);
1190
1191   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1192   ((CmiMsgHeaderBasic *)msg)->size = size;  
1193   CMI_SET_BROADCAST_ROOT(msg,0);
1194   CMI_SET_CHECKSUM(msg, size);
1195   
1196   SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1197   
1198   msg_tmp->destpe    = -1;      //multicast operation
1199   msg_tmp->size      = size * npes; //keep track of #bytes outstanding
1200   msg_tmp->msg       = msg;
1201   msg_tmp->pelist    = pelist;
1202   
1203   msgQueueLen ++;
1204   
1205   DCMF_Multicast_t  mcast_info __attribute__((__aligned__(16)));
1206   
1207   mcast_info.registration   = & cmi_dcmf_multicast_registration;
1208   mcast_info.request        = & msg_tmp->send;
1209   mcast_info.cb_done.function    =   send_multi_done;
1210   mcast_info.cb_done.clientdata  =   msg_tmp;
1211   mcast_info.consistency    =   DCMF_MATCH_CONSISTENCY;
1212   mcast_info.connection_id  =   CmiMyPe();
1213   mcast_info.bytes          =   size;
1214   mcast_info.src            =   msg;
1215   mcast_info.nranks         =   npes;
1216   mcast_info.ranks          =   (unsigned *)pelist;
1217   mcast_info.opcodes        =   CmiOpcodeList;   //static list of MAX_MULTICAST entires with 0 in them
1218   mcast_info.flags          =   0;
1219   mcast_info.msginfo        =   &msg_tmp->info;
1220   mcast_info.count          =   1;
1221
1222   DCMF_Multicast (&mcast_info);
1223 }
1224
1225
1226
1227 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg);
1228
1229 /* The general free send function
1230  * Send is synchronous, and free msg after posted
1231  */
1232 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1233
1234   if (destPE < 0 || destPE > CmiNumPes ())
1235     printf ("Sending to %d\n", destPE);
1236
1237   CmiAssert (destPE >= 0 && destPE < CmiNumPes());
1238
1239     CmiState cs = CmiGetState();
1240
1241     if (destPE==cs->pe) {
1242         CmiSendSelf(msg);
1243         return;
1244     }
1245
1246 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1247     //In the presence of comm thd which is responsible for sending bcast msgs, then
1248     //CmiNodeOf and CmiRankOf may return incorrect information if the destPE is considered
1249     //as a comm thd.
1250     if (destPE >= _Cmi_numpes) { //destination is a comm thd
1251         int nid = destPE - _Cmi_numpes;
1252         int rid = _Cmi_mynodesize;
1253         CmiGeneralFreeSendN (nid, rid, size, msg);
1254     } else {
1255         CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1256     }
1257 #else
1258     //printf ("%d: Sending Message to %d \n", CmiMyPe(), destPE);
1259     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg);
1260 #endif
1261 }
1262
1263 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg) {
1264
1265     //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
1266     //  CmiMyNode(), node, rank);
1267
1268 #if CMK_SMP
1269     CMI_DEST_RANK(msg) = rank;
1270     //CMI_SET_CHECKSUM(msg, size);
1271
1272     if (node == CmiMyNode()) {
1273         CmiSendPeer (rank, size, msg);
1274         return;
1275     }
1276 #endif
1277
1278     SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1279     msg_tmp->destpe = node; //destPE;
1280     msg_tmp->size = size;
1281     msg_tmp->msg = msg;
1282
1283     machineSend(msg_tmp);
1284 }
1285
1286 void CmiSyncSendFn(int destPE, int size, char *msg) {
1287     char *copymsg;
1288     copymsg = (char *)CmiAlloc(size);
1289     CmiMemcpy(copymsg,msg,size);
1290     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1291     CmiFreeSendFn(destPE,size,copymsg);
1292 }
1293
1294 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1295     CQdCreate(CpvAccess(cQdState), 1);
1296     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1297
1298     CMI_SET_BROADCAST_ROOT(msg,0);
1299     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1300     ((CmiMsgHeaderBasic *)msg)->size = size;
1301     CMI_SET_CHECKSUM(msg, size);
1302
1303     CmiGeneralFreeSend(destPE,size,msg);
1304 }
1305
1306 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1307 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1308     char *copymsg;
1309     copymsg = (char *)CmiAlloc(size);
1310     CmiMemcpy(copymsg, msg, size);
1311
1312     //  asm volatile("sync" ::: "memory");
1313
1314     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1315     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1316     CMI_SET_CHECKSUM(copymsg, size);
1317
1318     CmiGeneralFreeSend(destPE,size,copymsg);
1319 }
1320
1321 #define NODE_LEVEL_ST_IMPLEMENTATION 1
1322 #if NODE_LEVEL_ST_IMPLEMENTATION
1323 //send msgs to other ranks except the rank specified in the argument
1324 static void CmiSendChildrenPeers(int rank, int size, char *msg) {
1325     //printf ("%d [%d]: Send children peers except rank %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank());
1326     int r=0;
1327
1328 //With comm thd, broadcast msg will be pulled from bcast queue from the comm thd
1329 //And the msg would be finally pushed into other cores on the same node by the
1330 //comm thd. But it's possible a msg has already been pushed into rank 0 in
1331 //recv_done func call. So there's no need to push the bcast msg into the recv
1332 //queue again in the case BCASTMSG_ONLY_TO_COMMTHD is not set
1333
1334 #if !CMK_MULTICORE && !BCASTMSG_ONLY_TO_COMMTHD
1335     //indicate this is called from comm thread.
1336     if (rank == _Cmi_mynodesize) r = 1;
1337 #endif
1338
1339     for (;r<rank; r++) {
1340         char *copymsg;
1341         copymsg = (char *)CmiAlloc(size);
1342         CmiMemcpy(copymsg,msg,size);        
1343         CmiPushPE (r, copymsg);
1344     }
1345
1346     for (r=rank+1; r<_Cmi_mynodesize; r++) {
1347         char *copymsg;
1348         copymsg = (char *)CmiAlloc(size);
1349         CmiMemcpy(copymsg,msg,size);        
1350         CmiPushPE (r, copymsg);
1351     }
1352 }
1353
1354 //In this implementation, msgs are first sent out to the comm thd or rank 0 of other nodes
1355 //then send msgs to the other cores on the same node
1356 void SendSpanningChildren(int size, char *msg) {
1357     CmiState cs = CmiGetState();
1358     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1359     int i;
1360
1361     //printf ("%d [%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), startpe);
1362
1363     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1364
1365     int startNid = CmiNodeOf(startpe);
1366     int thisNid, thisRid;
1367     thisNid = CmiMyNode();
1368     thisRid = CmiMyRank();
1369
1370     //printf ("%d [%d/%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), thisRid, startpe);
1371
1372     //Step1: send to cores that has comm thd on other nodes
1373     int dist = thisNid - startNid;
1374     if (dist<0) dist += _Cmi_numnodes;
1375     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1376         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1377         if (nid > _Cmi_numnodes - 1) break;
1378         nid += startNid;
1379         nid = nid%_Cmi_numnodes;
1380         CmiAssert(nid>=0 && nid<_Cmi_numnodes && nid!=thisNid);
1381 #if CMK_SMP && !CMK_MULTICORE && BCASTMSG_ONLY_TO_COMMTHD
1382         int p = nid + _Cmi_numpes;
1383 #else
1384         int p = CmiNodeFirst(nid);
1385 #endif
1386         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1387         CmiSyncSendFn1(p, size, msg);
1388     }
1389
1390     //Step2: send to other cores (i.e. excluding myself cs->pe) on the same nodes (just a flat send)
1391     CmiSendChildrenPeers(thisRid, size, msg);
1392
1393 }
1394 #else
1395 /* send msg to its spanning children in broadcast. G. Zheng */
1396 void SendSpanningChildren(int size, char *msg) {
1397     CmiState cs = CmiGetState();
1398     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1399     int i;
1400
1401     //printf ("%d [%d]: In Send Spanning Tree\n",  CmiMyPe(), CmiMyNode());
1402
1403     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1404     int dist = cs->pe-startpe;
1405     if (dist<0) dist+=_Cmi_numpes;
1406     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1407         int p = BROADCAST_SPANNING_FACTOR*dist + i;
1408         if (p > _Cmi_numpes - 1) break;
1409         p += startpe;
1410         p = p%_Cmi_numpes;
1411         CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1412
1413         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1414         CmiSyncSendFn1(p, size, msg);
1415     }
1416 }
1417 #endif
1418
1419 /* send msg along the hypercube in broadcast. (Sameer) */
1420 void SendHypercube(int size, char *msg) {
1421     CmiState cs = CmiGetState();
1422     int curcycle = CMI_GET_CYCLE(msg);
1423     int i;
1424
1425     double logp = CmiNumPes();
1426     logp = log(logp)/log(2.0);
1427     logp = ceil(logp);
1428
1429     /*  CmiPrintf("In hypercube\n"); */
1430     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1431
1432     for (i = curcycle; i < logp; i++) {
1433         int p = cs->pe ^ (1 << i);
1434         /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1435         if (p < CmiNumPes()) {
1436             CMI_SET_CYCLE(msg, i + 1);
1437
1438             CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1439             CmiSyncSendFn1(p, size, msg);
1440         }
1441     }
1442 }
1443
1444 void CmiSyncBroadcastFn(int size, char *msg) {
1445     char *copymsg;
1446     copymsg = (char *)CmiAlloc(size);
1447     CmiMemcpy(copymsg,msg,size);
1448     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastFn on comm thd on node %d\n", CmiMyNode());
1449     CmiFreeBroadcastFn(size,copymsg);
1450 }
1451
1452 void CmiFreeBroadcastFn(int size, char *msg) {
1453
1454     //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1455
1456     CmiState cs = CmiGetState();
1457 #if CMK_BROADCAST_SPANNING_TREE    
1458     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1459     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastFn on comm thd on node %d\n", CmiMyNode());
1460
1461     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1462
1463     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1464     SendSpanningChildren(size, msg);
1465     CmiFree(msg);
1466 #elif CMK_BROADCAST_HYPERCUBE    
1467     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1468
1469     CMI_SET_CYCLE(msg, 0);
1470     SendHypercube(size, msg);
1471     CmiFree(msg);
1472 #else
1473     int i;
1474
1475     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1476         CmiSyncSendFn(i,size,msg);
1477
1478     for ( i=0; i<cs->pe; i++ )
1479         CmiSyncSendFn(i,size,msg);
1480
1481     CmiFree(msg);
1482 #endif
1483 }
1484
1485 void CmiSyncBroadcastAllFn(int size, char *msg) {
1486     char *copymsg;
1487     copymsg = (char *)CmiAlloc(size);
1488     CmiMemcpy(copymsg,msg,size);
1489     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1490     CmiFreeBroadcastAllFn(size,copymsg);
1491 }
1492
1493 void CmiFreeBroadcastAllFn(int size, char *msg) {
1494
1495     //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1496
1497     CmiState cs = CmiGetState();
1498 #if CMK_BROADCAST_SPANNING_TREE
1499
1500     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1501     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1502
1503     CmiSyncSendFn(cs->pe,size,msg);
1504     
1505     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1506
1507     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1508     SendSpanningChildren(size, msg);
1509     CmiFree(msg);
1510
1511 #elif CMK_BROADCAST_HYPERCUBE
1512     CmiSyncSendFn(cs->pe,size,msg);
1513     
1514     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1515
1516     CMI_SET_CYCLE(msg, 0);
1517     SendHypercube(size, msg);
1518     CmiFree(msg);
1519 #else
1520     int i ;
1521
1522     DCMF_CriticalSection_enter (0);
1523
1524     for ( i=0; i<_Cmi_numpes; i++ ) {
1525         CmiSyncSendFn(i,size,msg);
1526
1527         if ( (i % 32) == 0 )
1528             SendMsgsUntil (0);
1529     }
1530
1531     DCMF_CriticalSection_exit (0);
1532
1533     CmiFree(msg);
1534 #endif
1535 }
1536
1537 void AdvanceCommunications() {
1538
1539 #if CMK_SMP
1540     DCMF_CriticalSection_enter (0);
1541 #endif
1542
1543     while(DCMF_Messager_advance()>0);
1544     //DCMF_Messager_advance();
1545
1546 #if CMK_SMP
1547     DCMF_CriticalSection_exit (0);
1548 #endif
1549
1550     sendBroadcastMessages();
1551 #if CMK_NODE_QUEUE_AVAILABLE
1552     sendBroadcastMessagesNode();
1553 #endif
1554
1555 #if CMK_IMMEDIATE_MSG && CMK_MULTICORE
1556     CmiHandleImmediate();
1557 #endif
1558 }
1559
1560
1561 static void SendMsgsUntil(int targetm) {
1562
1563     while (msgQueueLen>targetm) {
1564         AdvanceCommunications ();
1565     }
1566 }
1567
1568 void CmiNotifyIdle() {
1569 #if !CMK_SMP || CMK_MULTICORE
1570     AdvanceCommunications();
1571 #endif
1572 }
1573
1574
1575 /*==========================================================*/
1576 /*==========================================================*/
1577 /*==========================================================*/
1578
1579 /************ Recommended routines ***********************/
1580 /************ You dont have to implement these but they are supported
1581  in the converse syntax and some rare programs may crash. But most
1582  programs dont need them. *************/
1583
1584 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1585     CmiAbort("CmiAsyncSendFn not implemented.");
1586     return (CmiCommHandle) 0;
1587 }
1588
1589 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1590     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1591     return (CmiCommHandle) 0;
1592 }
1593
1594 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1595     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1596     return (CmiCommHandle) 0;
1597 }
1598
1599 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1600     CmiAbort("CmiAsyncMsgSent not implemented.");
1601     return 0;
1602 }
1603 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1604     CmiAbort("CmiReleaseCommHandle not implemented.");
1605 }
1606
1607
1608 /*==========================================================*/
1609 /*==========================================================*/
1610 /*==========================================================*/
1611
1612 /* Optional routines which could use common code which is shared with
1613    other machine layer implementations. */
1614
1615 /* MULTICAST/VECTOR SENDING FUNCTIONS
1616
1617  * In relations to some flags, some other delivery functions may be needed.
1618  */
1619
1620 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1621
1622 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1623     char *copymsg;
1624     copymsg = (char *)CmiAlloc(size);
1625     CmiMemcpy(copymsg,msg,size);
1626     CmiFreeListSendFn(npes, pes, size, msg);
1627 }
1628
1629 //#define OPTIMIZED_MULTICAST  0
1630
1631 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1632 #if CMK_SMP && !CMK_MULTICORE
1633     //DCMF_CriticalSection_enter (0);
1634 #endif
1635     CMI_SET_BROADCAST_ROOT(msg,0);
1636     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1637     ((CmiMsgHeaderBasic *)msg)->size = size;
1638     CMI_SET_CHECKSUM(msg, size);
1639
1640     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeListSendFn on comm thd on node %d\n", CmiMyNode());
1641
1642     //printf("%d: In Free List Send Fn\n", CmiMyPe());
1643     int new_npes = 0;
1644
1645     int i, count = 0, my_loc = -1;
1646     for (i=0; i<npes; i++) {
1647         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode()) {
1648         if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1649             CmiSyncSend(pes[i], size, msg);
1650             //my_loc = i;
1651         }
1652     }
1653
1654 #if OPTIMIZED_MULTICAST
1655 #warning "Using Optimized Multicast"
1656     if (npes > 1) {    
1657       int *newpelist = (int *) malloc (sizeof(int) * npes);
1658       int new_npes = 0;
1659     
1660       for(i=0; i<npes; i++) {
1661         if(CmiNodeOf(pes[i]) == CmiMyNode()) 
1662           continue;
1663         else
1664           newpelist[new_npes++] = pes[i];
1665       }
1666
1667       if (new_npes >= 1)
1668         machineMulticast (new_npes, newpelist, size, msg);
1669       else
1670         CmiFree (msg);
1671       return;
1672     }
1673 #endif
1674
1675     for (i=0;i<npes;i++) {
1676         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode());
1677         if (CmiNodeOf(pes[i]) == CmiMyNode());
1678         else if (i < npes - 1) {
1679 #if !CMK_SMP /*|| (CMK_SMP && !CMK_MULTICORE)*/
1680             CmiReference(msg);
1681             CmiGeneralFreeSend(pes[i], size, msg);
1682 #else
1683             CmiSyncSend(pes[i], size, msg);
1684 #endif
1685         }
1686     }
1687
1688     //if (npes  && (pes[npes-1] != CmiMyPe() && CmiNodeOf(pes[i]) != CmiMyNode()))
1689     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1690         CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
1691     else
1692         CmiFree(msg);
1693
1694     //AdvanceCommunications();
1695 #if CMK_SMP && !CMK_MULTICORE
1696     //DCMF_CriticalSection_exit (0);
1697 #endif
1698 }
1699
1700 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1701     CmiAbort("CmiAsyncListSendFn not implemented.");
1702     return (CmiCommHandle) 0;
1703 }
1704 #endif
1705
1706 /** NODE SENDING FUNCTIONS
1707
1708  * If there is a node queue, and we consider also nodes as entity (tipically in
1709  * SMP versions), these functions are needed.
1710  */
1711
1712 #if CMK_NODE_QUEUE_AVAILABLE
1713
1714 void          CmiSyncNodeSendFn(int, int, char *);
1715 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1716 void          CmiFreeNodeSendFn(int, int, char *);
1717
1718 void          CmiSyncNodeBroadcastFn(int, char *);
1719 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1720 void          CmiFreeNodeBroadcastFn(int, char *);
1721
1722 void          CmiSyncNodeBroadcastAllFn(int, char *);
1723 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1724 void          CmiFreeNodeBroadcastAllFn(int, char *);
1725
1726 #endif
1727
1728
1729 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1730
1731 int CmiMyPe();
1732 int CmiMyRank();
1733 int CmiNodeFirst(int node);
1734 int CmiNodeSize(int node);
1735 int CmiNodeOf(int pe);
1736 int CmiRankOf(int pe);
1737
1738 int CmiMyPe(void) {
1739     return CmiGetState()->pe;
1740 }
1741
1742 int CmiMyRank(void) {
1743     return CmiGetState()->rank;
1744 }
1745
1746 int CmiNodeFirst(int node) {
1747     return node*_Cmi_mynodesize;
1748 }
1749 int CmiNodeSize(int node)  {
1750     return _Cmi_mynodesize;
1751 }
1752
1753 int CmiNodeOf(int pe)      {
1754     return (pe/_Cmi_mynodesize);
1755 }
1756 int CmiRankOf(int pe)      {
1757     return pe%_Cmi_mynodesize;
1758 }
1759
1760
1761 /* optional, these functions are implemented in "machine-smp.c", so including
1762    this file avoid the necessity to reimplement them.
1763  */
1764 void CmiNodeBarrier(void);
1765 void CmiNodeAllBarrier(void);
1766 CmiNodeLock CmiCreateLock();
1767 void CmiDestroyLock(CmiNodeLock lock);
1768
1769 #endif
1770
1771 /** IMMEDIATE MESSAGES
1772
1773  * If immediate messages are supported, the following function is needed. There
1774  * is an exeption if the machine progress is also defined (see later for this).
1775
1776  * Moreover, the file "immediate.c" should be included, otherwise all its
1777  * functions and variables have to be redefined.
1778 */
1779
1780 #if CMK_CCS_AVAILABLE
1781
1782 #include "immediate.c"
1783
1784 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1785 void CmiProbeImmediateMsg();
1786 #endif
1787
1788 #endif
1789
1790
1791 /** MACHINE PROGRESS DEFINED
1792
1793  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1794  * to be pulled out of the network manually. For this reason the following
1795  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1796  * not be defined anymore.
1797  */
1798
1799 #if CMK_MACHINE_PROGRESS_DEFINED
1800
1801
1802
1803 void CmiMachineProgressImpl() {
1804
1805 #if !CMK_SMP
1806     AdvanceCommunications();
1807 #else
1808     /*Not implemented yet. Communication server does not seem to be
1809       thread safe */
1810 #endif
1811 }
1812
1813 #endif
1814
1815 /* Dummy implementation */
1816 extern int CmiBarrier() {
1817     //Use DCMF barrier later
1818 }
1819
1820 #if CMK_NODE_QUEUE_AVAILABLE
1821 static void CmiSendNodeSelf(char *msg) {
1822 #if CMK_IMMEDIATE_MSG
1823     if (CmiIsImmediate(msg)) {
1824         //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1825         CmiPushImmediateMsg(msg);
1826 #if CMK_MULTICORE
1827         CmiHandleImmediate();
1828 #endif
1829         return;
1830     }
1831 #endif    
1832     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1833     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1834     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1835 }
1836
1837 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1838     CmiAbort ("Async Node Send not supported\n");
1839 }
1840
1841 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1842
1843     CMI_SET_BROADCAST_ROOT(msg,0);
1844     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1845     ((CmiMsgHeaderBasic *)msg)->size = size;
1846     CMI_SET_CHECKSUM(msg, size);
1847     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1848     
1849     CQdCreate(CpvAccess(cQdState), 1);
1850
1851     if (node == _Cmi_mynode) {
1852         CmiSendNodeSelf(msg);
1853     } else {
1854         CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg);
1855     }
1856 }
1857
1858 void CmiSyncNodeSendFn(int p, int s, char *m) {
1859     char *dupmsg;
1860     dupmsg = (char *)CmiAlloc(s);
1861     CmiMemcpy(dupmsg,m,s);
1862     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1863     CmiFreeNodeSendFn(p, s, dupmsg);
1864 }
1865
1866 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1867     return NULL;
1868 }
1869
1870 void SendSpanningChildrenNode(int size, char *msg) {
1871     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1872     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1873     assert(startnode>=0 && startnode<CmiNumNodes());
1874
1875     int dist = CmiMyNode()-startnode;
1876     if (dist<0) dist += CmiNumNodes();
1877     int i;
1878     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1879         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1880         if (nid > CmiNumNodes() - 1) break;
1881         nid += startnode;
1882         nid = nid%CmiNumNodes();
1883         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1884         char *dupmsg = (char *)CmiAlloc(size);
1885         CmiMemcpy(dupmsg,msg,size);
1886         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1887         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg);
1888     }
1889 }
1890
1891 /* need */
1892 void CmiFreeNodeBroadcastFn(int s, char *m) {
1893 #if CMK_BROADCAST_SPANNING_TREE
1894     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1895     
1896     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1897
1898     int mynode = CmiMyNode();
1899     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1900     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1901     ((CmiMsgHeaderBasic *)m)->size = s;
1902     CMI_SET_CHECKSUM(m, s);
1903     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1904
1905     SendSpanningChildrenNode(s, m);
1906 #else
1907     int i;
1908     for (i=0; i<CmiNumNodes(); i++) {
1909         if (i==CmiMyNode()) continue;
1910         char *dupmsg = (char *)CmiAlloc(s);
1911         CmiMemcpy(dupmsg,m,s);
1912         CmiFreeNodeSendFn(i, s, dupmsg);
1913     }
1914 #endif
1915     CmiFree(m);    
1916 }
1917
1918 void CmiSyncNodeBroadcastFn(int s, char *m) {
1919     char *dupmsg;
1920     dupmsg = (char *)CmiAlloc(s);
1921     CmiMemcpy(dupmsg,m,s);
1922     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1923     CmiFreeNodeBroadcastFn(s, dupmsg);
1924 }
1925
1926 /* need */
1927 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1928     char *dupmsg = (char *)CmiAlloc(s);
1929     CmiMemcpy(dupmsg,m,s);
1930     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1931     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1932     CMI_SET_CHECKSUM(dupmsg, s);
1933
1934     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1935     
1936     CQdCreate(CpvAccess(cQdState), 1);
1937     CmiSendNodeSelf(dupmsg);
1938
1939     CmiFreeNodeBroadcastFn(s, m);
1940 }
1941
1942 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1943     char *dupmsg;
1944     dupmsg = (char *)CmiAlloc(s);
1945     CmiMemcpy(dupmsg,m,s);
1946     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1947     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1948 }
1949
1950
1951 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1952     return NULL;
1953 }
1954 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1955
1956 #include "manytomany.c"
1957
1958
1959 /*********************************************************************************************
1960 This section is for CmiDirect. This is a variant of the  persistent communication in which
1961 the user can transfer data between processors without using Charm++ messages. This lets the user
1962 send and receive data from the middle of his arrays without any copying on either send or receive
1963 side
1964 *********************************************************************************************/
1965
1966
1967
1968
1969 #ifdef BGP_USE_RDMA
1970
1971 #include "cmidirect.h"
1972
1973 /* We can avoid a receiver side lookup by just sending the whole shebang.
1974    DCMF header is in units of quad words (16 bytes), so we'd need less than a
1975    quad word for the handle if we just sent that and did a lookup. Or exactly
1976    2 quad words for the buffer pointer, callback pointer, callback
1977    data pointer, and DCMF_Request_t pointer with no lookup.
1978
1979    Since CmiDirect is generally going to be used for messages which aren't
1980    tiny, the extra 16 bytes is not likely to impact performance noticably and
1981    not having to lookup handles in tables simplifies the code enormously.
1982
1983    EJB   2008/4/2
1984 */
1985
1986
1987 /**
1988  To be called on the receiver to create a handle and return its number
1989 **/
1990 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
1991     /* one-sided primitives would require registration of memory */
1992
1993     /* with two-sided primitives we just bundle the buffer and callback info into the handle so the sender can remind us about it later. */
1994     struct infiDirectUserHandle userHandle;
1995     userHandle.handle=1; /* doesn't matter on BG/P*/
1996     userHandle.senderNode=senderNode;
1997     userHandle.recverNode=_Cmi_mynode;
1998     userHandle.recverBufSize=recvBufSize;
1999     userHandle.recverBuf=recvBuf;
2000     userHandle.initialValue=initialValue;
2001     userHandle.callbackFnPtr=callbackFnPtr;
2002     userHandle.callbackData=callbackData;
2003     userHandle.DCMF_rq_trecv=ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2004 #if CMI_DIRECT_DEBUG
2005     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
2006 #endif
2007     return userHandle;
2008 }
2009
2010 /****
2011  To be called on the sender to attach the sender's buffer to this handle
2012 ******/
2013
2014 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
2015
2016     /* one-sided primitives would require registration of memory */
2017
2018     /* with two-sided primitives we just record the sender buf in the handle */
2019     userHandle->senderBuf=sendBuf;
2020     CmiAssert(sendBufSize==userHandle->recverBufSize);
2021     userHandle->DCMF_rq_tsend =ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2022 #if CMI_DIRECT_DEBUG
2023     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2024 #endif
2025
2026 }
2027
2028 /****
2029 To be called on the sender to do the actual data transfer
2030 ******/
2031 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
2032     /** invoke a DCMF_Send with the direct callback */
2033     DCMF_Protocol_t *protocol = NULL;
2034     protocol = &cmi_dcmf_direct_registration;
2035     /* local copy */
2036     CmiAssert(userHandle->recverBuf!=NULL);
2037     CmiAssert(userHandle->senderBuf!=NULL);
2038     CmiAssert(userHandle->recverBufSize>0);
2039     if (userHandle->recverNode== _Cmi_mynode) {
2040 #if CMI_DIRECT_DEBUG
2041         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2042 #endif
2043
2044         memcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
2045         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2046     } else {
2047         dcmfDirectMsgHeader msgHead;
2048         msgHead.recverBuf=userHandle->recverBuf;
2049         msgHead.callbackFnPtr=userHandle->callbackFnPtr;
2050         msgHead.callbackData=userHandle->callbackData;
2051         msgHead.DCMF_rq_t=(DCMF_Request_t *) userHandle->DCMF_rq_trecv;
2052 #if CMK_SMP
2053         DCMF_CriticalSection_enter (0);
2054 #endif
2055 #if CMI_DIRECT_DEBUG
2056         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2057 #endif
2058         DCMF_Send (protocol,
2059                    (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
2060                    directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2061                    userHandle->recverBufSize, userHandle->senderBuf,
2062                    (struct DCQuad *) &(msgHead), 2);
2063
2064 #if CMK_SMP
2065         DCMF_CriticalSection_exit (0);
2066 #endif
2067     }
2068 }
2069
2070 /**** Should not be called the first time *********/
2071 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2072     /* no op on BGP */
2073 }
2074
2075 /**** Should not be called the first time *********/
2076 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2077     /* no op on BGP */
2078 }
2079
2080 /**** Should not be called the first time *********/
2081 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2082     /* no op on BGP */
2083 }
2084
2085 #endif /* BGP_USE_RDMA*/
2086