Updated the machine layer so that it is working both in non-SMP and SMP mode
[charm.git] / src / arch / bluegenep / machine.c
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include "assert.h"
12 #include "malloc.h"
13
14 #include <bpcore/ppc450_inlines.h>
15
16 #include "dcmf.h"
17 #include "dcmf_multisend.h"
18
19 char *ALIGN_16(char *p) {
20     return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
21 }
22
23 #define PROGRESS_PERIOD 1024
24
25 //There are two roles of comm thread:
26 // 1. polling other cores' bcast msg queue
27 // 2. bcast msg is handled only by comm thd
28 #define BCASTMSG_ONLY_TO_COMMTHD 1
29
30 //only one bcast queue for comm thread to handle, but
31 //may be accessed by system threads such as the handler
32 //for receiving messages
33 static PCQueue broadcast_q;                 //queue to send broadcast messages
34 #if CMK_NODE_QUEUE_AVAILABLE
35 CsvDeclare(PCQueue, node_bcastq);
36 CsvDeclare(CmiNodeLock, node_bcastLock);
37 #endif
38
39 /*
40     To reduce the buffer used in broadcast and distribute the load from
41   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
42   spanning tree broadcast algorithm.
43     This will use the fourth short in message as an indicator of spanning tree
44   root.
45 */
46 #define CMK_BROADCAST_SPANNING_TREE    1
47 /* FIXME: HYPERCUBE in SMP will not work*/
48 #define CMK_BROADCAST_HYPERCUBE        0
49
50 #define BROADCAST_SPANNING_FACTOR     4
51
52 //The root of the message infers the type of the message
53 // 1. root is 0, then it is a normal point-to-point message
54 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
55 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
56 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
57 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
58 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
59 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
60
61 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
62 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
63
64 /* FIXME: need a random number that everyone agrees ! */
65 #define CHARM_MAGIC_NUMBER               126
66
67 #if CMK_ERROR_CHECKING
68 static int checksum_flag = 0;
69 extern unsigned char computeCheckSum(unsigned char *data, int len);
70
71 #define CMI_SET_CHECKSUM(msg, len)      \
72         if (checksum_flag)  {   \
73           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
74           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
75         }
76
77 #define CMI_CHECK_CHECKSUM(msg, len)    \
78         if (checksum_flag)      \
79           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
80             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
81             for(count = 0; count < len; count++) { \
82                 printf("%2x", msg[count]);                 \
83             }                                             \
84             printf("------------------------------\n\n"); \
85             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
86           }
87 #else
88 #define CMI_SET_CHECKSUM(msg, len)
89 #define CMI_CHECK_CHECKSUM(msg, len)
90 #endif
91
92 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
93
94 #if CMK_BROADCAST_HYPERCUBE
95 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
96 #else
97 #  define CMI_SET_CYCLE(msg, cycle)
98 #endif
99
100 int               _Cmi_numpes;
101 int               _Cmi_mynode;    /* Which address space am I */
102 int               _Cmi_mynodesize;/* Number of processors in my address space */
103 int               _Cmi_numnodes;  /* Total number of address spaces */
104 int                Cmi_nodestart; /* First processor in this address space */
105 CpvDeclare(void*, CmiLocalQueue);
106
107
108 #if CMK_NODE_QUEUE_AVAILABLE
109 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
110 // is available
111 #define NODE_BROADCAST_OTHERS (-1)
112 #define NODE_BROADCAST_ALL    (-2)
113 #endif
114
115
116 typedef struct ProcState {
117     /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
118     CmiNodeLock  recvLock;              /* for cs->recv */
119     CmiNodeLock bcastLock;
120 } ProcState;
121
122 static ProcState  *procState;
123
124 #if CMK_SMP && !CMK_MULTICORE
125 static volatile int commThdExit = 0;
126 static CmiNodeLock commThdExitLock = 0;
127 #endif
128
129 void ConverseRunPE(int everReturn);
130 static void CommunicationServer(int sleepTime);
131 static void CommunicationServerThread(int sleepTime);
132
133 //So far we dont define any comm threads
134 int Cmi_commthread = 0;
135
136 #include "machine-smp.c"
137 CsvDeclare(CmiNodeState, NodeState);
138 #include "immediate.c"
139
140 void AdvanceCommunications();
141
142
143 #if !CMK_SMP
144 /************ non SMP **************/
145 static struct CmiStateStruct Cmi_state;
146 int _Cmi_mype;
147 int _Cmi_myrank;
148
149 void CmiMemLock(void) {}
150 void CmiMemUnlock(void) {}
151
152 #define CmiGetState() (&Cmi_state)
153 #define CmiGetStateN(n) (&Cmi_state)
154
155 //void CmiYield(void) { sleep(0); }
156
157 static void CmiStartThreads(char **argv) {
158     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
159     _Cmi_mype = Cmi_nodestart;
160     _Cmi_myrank = 0;
161 }
162 #endif  /* !CMK_SMP */
163
164 //int received_immediate;
165 //int received_broadcast;
166
167 /*Add a message to this processor's receive queue, pe is a rank */
168 void CmiPushPE(int pe,void *msg) {
169     CmiState cs = CmiGetStateN(pe);
170     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
171 #if CMK_IMMEDIATE_MSG
172     if (CmiIsImmediate(msg)) {
173         /**(CmiUInt2 *)msg = pe;*/
174         //received_immediate = 1;
175         //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
176         //CMI_DEST_RANK(msg) = pe;
177         CmiPushImmediateMsg(msg);
178         return;
179     }
180 #endif
181
182     PCQueuePush(cs->recv,(char *)msg);
183     //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
184
185     CmiIdleLock_addMessage(&cs->idle);
186     MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
187 }
188
189 #if CMK_NODE_QUEUE_AVAILABLE
190 /*Add a message to this processor's receive queue */
191 static void CmiPushNode(void *msg) {
192     MACHSTATE(3,"Pushing message into NodeRecv queue");
193 #if CMK_IMMEDIATE_MSG
194     if (CmiIsImmediate(msg)) {
195         //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
196         //CMI_DEST_RANK(msg) = 0;
197         CmiPushImmediateMsg(msg);
198         return;
199     }
200 #endif
201     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
202     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
203     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
204     {
205         CmiState cs=CmiGetStateN(0);
206         CmiIdleLock_addMessage(&cs->idle);
207     }
208 }
209 #endif /* CMK_NODE_QUEUE_AVAILABLE */
210
211 volatile int msgQueueLen;
212 volatile int outstanding_recvs;
213
214 static int Cmi_dim;     /* hypercube dim of network */
215
216 static char     **Cmi_argv;
217 static char     **Cmi_argvcopy;
218 static CmiStartFn Cmi_startfn;   /* The start function */
219 static int        Cmi_usrsched;  /* Continue after start function finishes? */
220
221 extern void ConverseCommonInit(char **argv);
222 extern void ConverseCommonExit(void);
223 extern void CthInit(char **argv);
224
225 static void SendMsgsUntil(int);
226
227
228 void SendSpanningChildren(int size, char *msg);
229 #if CMK_NODE_QUEUE_AVAILABLE
230 void SendSpanningChildrenNode(int size, char *msg);
231 #endif
232 void SendHypercube(int size, char *msg);
233
234 DCMF_Protocol_t  cmi_dcmf_short_registration __attribute__((__aligned__(16)));
235 DCMF_Protocol_t  cmi_dcmf_eager_registration __attribute__((__aligned__(16)));
236 DCMF_Protocol_t  cmi_dcmf_rzv_registration   __attribute__((__aligned__(16)));
237 DCMF_Protocol_t  cmi_dcmf_multicast_registration   __attribute__((__aligned__(16)));
238
239
240 #define BGP_USE_AM_DIRECT 1
241 //#define BGP_USE_RDMA_DIRECT 1
242 //#define CMI_DIRECT_DEBUG 1
243 #ifdef BGP_USE_AM_DIRECT
244
245
246 DCMF_Protocol_t  cmi_dcmf_direct_registration __attribute__((__aligned__(16)));
247 /** The receive side of a put implemented in DCMF_Send */
248
249
250 typedef struct {
251     void *recverBuf;
252   void (*callbackFnPtr)(void *);
253     void *callbackData;
254     DCMF_Request_t *DCMF_rq_t;
255 } dcmfDirectMsgHeader;
256
257 /* nothing for us to do here */
258 #if (DCMF_VERSION_MAJOR >= 2)
259 void direct_send_done_cb(void*nothing, DCMF_Error_t *err) 
260 #else 
261   void direct_send_done_cb(void*nothing) 
262 #endif
263 {
264 #if CMI_DIRECT_DEBUG
265   CmiPrintf("[%d] RDMA send_done_cb\n", CmiMyPe());
266 #endif
267 }
268
269 DCMF_Callback_t  directcb;
270
271 void     direct_short_pkt_recv (void             * clientdata,
272                                 const DCQuad     * info,
273                                 unsigned           count,
274                                 unsigned           senderrank,
275                                 const char       * buffer,
276                                 const unsigned     sndlen) {
277 #if CMI_DIRECT_DEBUG
278     CmiPrintf("[%d] RDMA direct_short_pkt_recv\n", CmiMyPe());
279 #endif
280     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
281     CmiMemcpy(msgHead->recverBuf, buffer, sndlen);
282     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
283 }
284
285
286 #if (DCMF_VERSION_MAJOR >= 2)
287 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
288 #else
289 typedef void (*cbhdlr) (void *);
290 #endif
291
292 DCMF_Request_t * direct_first_pkt_recv_done (void              * clientdata,
293         const DCQuad      * info,
294         unsigned            count,
295         unsigned            senderrank,
296         const unsigned      sndlen,
297         unsigned          * rcvlen,
298         char             ** buffer,
299         DCMF_Callback_t   * cb
300                                             ) {
301 #if CMI_DIRECT_DEBUG
302     CmiPrintf("[%d] RDMA direct_first_pkt_recv_done\n", CmiMyPe());
303 #endif
304     /* pull the data we need out of the header */
305     *rcvlen=sndlen;
306     dcmfDirectMsgHeader *msgHead=  (dcmfDirectMsgHeader *) info;
307     cb->function= (cbhdlr)msgHead->callbackFnPtr;
308     cb->clientdata=msgHead->callbackData;
309     *buffer=msgHead->recverBuf;
310     return msgHead->DCMF_rq_t;
311 }
312
313
314 #endif
315
316 #ifdef BGP_USE_RDMA_DIRECT
317 static struct DCMF_Callback_t dcmf_rdma_cb_ack;
318
319
320 DCMF_Protocol_t  cmi_dcmf_direct_put_registration __attribute__((__aligned__(16)));
321
322 DCMF_Protocol_t  cmi_dcmf_direct_get_registration __attribute__((__aligned__(16)));
323
324 DCMF_Protocol_t  cmi_dcmf_direct_rdma_registration __attribute__((__aligned__(16)));
325 /** The receive side of a DCMF_Put notification implemented in DCMF_Send */
326
327 typedef struct {
328   void (*callbackFnPtr)(void *);
329     void *callbackData;
330 } dcmfDirectRDMAMsgHeader;
331
332
333
334 #if (DCMF_VERSION_MAJOR >= 2)
335 void direct_send_rdma_done_cb(void*nothing, DCMF_Error_t *err) 
336 #else 
337   void direct_send_rdma_done_cb(void*nothing) 
338 #endif
339 {
340 #if CMI_DIRECT_DEBUG
341   CmiPrintf("[%d] RDMA send_rdma_done_cb result %d\n", CmiMyPe());
342 #endif
343
344
345 }
346
347 DCMF_Callback_t  directcb;
348
349 void     direct_short_rdma_pkt_recv (void             * clientdata,
350                                 const DCQuad     * info,
351                                 unsigned           count,
352                                 unsigned           senderrank,
353                                 const char       * buffer,
354                                 const unsigned     sndlen) {
355 #if CMI_DIRECT_DEBUG
356     CmiPrintf("[%d] RDMA direct_short_rdma_pkt_recv\n", CmiMyPe());
357 #endif
358     dcmfDirectRDMAMsgHeader *msgHead=  (dcmfDirectRDMAMsgHeader *) info;
359     (*(msgHead->callbackFnPtr))(msgHead->callbackData);
360 }
361
362
363 #if (DCMF_VERSION_MAJOR >= 2)
364 typedef void (*cbhdlr) (void *, DCMF_Error_t *);
365 #else
366 typedef void (*cbhdlr) (void *);
367 #endif
368
369 DCMF_Request_t * direct_first_rdma_pkt_recv_done (void              * clientdata,
370         const DCQuad      * info,
371         unsigned            count,
372         unsigned            senderrank,
373         const unsigned      sndlen,
374         unsigned          * rcvlen,
375         char             ** buffer,
376         DCMF_Callback_t   * cb
377                                             ) {
378     CmiAbort("direct_first_rdma_pkt_recv should not be called");
379 }
380
381
382 #endif
383
384
385 typedef struct msg_list {
386     char              * msg;
387     int                 size;
388     int                 destpe;
389     int               * pelist;
390     DCMF_Callback_t     cb;
391     DCQuad              info __attribute__((__aligned__(16)));
392     DCMF_Request_t      send __attribute__((__aligned__(16)));
393 } SMSG_LIST __attribute__((__aligned__(16)));
394
395 #define MAX_NUM_SMSGS   64
396 CpvDeclare(PCQueue, smsg_list_q);
397
398 static inline SMSG_LIST * smsg_allocate() {
399     SMSG_LIST *smsg = (SMSG_LIST *)PCQueuePop(CpvAccess(smsg_list_q));
400     if (smsg != NULL)
401         return smsg;
402
403     void * buf = malloc(sizeof(SMSG_LIST)); 
404     assert(buf!=NULL);
405     assert (((unsigned)buf & 0x0f) == 0);
406
407     return (SMSG_LIST *) buf;
408 }
409
410 static inline void smsg_free (SMSG_LIST *smsg) {
411     int size = PCQueueLength (CpvAccess(smsg_list_q));
412     if (size < MAX_NUM_SMSGS)
413         PCQueuePush (CpvAccess(smsg_list_q), (char *) smsg);
414     else
415         free (smsg);
416 }
417
418 typedef struct {
419     int sleepMs; /*Milliseconds to sleep while idle*/
420     int nIdles; /*Number of times we've been idle in a row*/
421     CmiState cs; /*Machine state*/
422 } CmiIdleState;
423
424 static CmiIdleState *CmiNotifyGetState(void) {
425     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
426     s->sleepMs=0;
427     s->nIdles=0;
428     s->cs=CmiGetState();
429     return s;
430 }
431
432
433 #if (DCMF_VERSION_MAJOR >= 2)
434 static void send_done(void *data, DCMF_Error_t *err) 
435 #else 
436 static void send_done(void *data) 
437 #endif
438 /* send done callback: sets the smsg entry to done */
439 {
440     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
441     CmiFree(msg_tmp->msg);
442     //free(data);
443     smsg_free (msg_tmp);
444
445     msgQueueLen--;
446 }
447
448 #if (DCMF_VERSION_MAJOR >= 2)
449 static void send_multi_done(void *data, DCMF_Error_t *err) 
450 #else 
451 static void send_multi_done(void *data) 
452 #endif
453 /* send done callback: sets the smsg entry to done */
454 {
455     SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
456     CmiFree(msg_tmp->msg);
457     free(msg_tmp->pelist);
458
459     smsg_free(msg_tmp);
460
461     msgQueueLen--;
462 }
463
464
465 #if (DCMF_VERSION_MAJOR >= 2)
466 static void recv_done(void *clientdata, DCMF_Error_t * err) 
467 #else 
468 static void recv_done(void *clientdata) 
469 #endif
470 /* recv done callback: push the recved msg to recv queue */
471 {
472
473     char *msg = (char *) clientdata;
474     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
475
476     /*printf ("NODE[%d] Recv message done with msg rank %d\n", CmiMyNode(), CMI_DEST_RANK(msg));*/
477
478     /* then we do what PumpMsgs used to do:
479      * push msg to recv queue */
480     int count=0;
481     CMI_CHECK_CHECKSUM(msg, sndlen);
482     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
483         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
484         return;
485     }
486
487 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
488     if (CMI_IS_BCAST_ON_CORES(msg) ) {
489         int pe = CMI_DEST_RANK(msg);
490
491         //printf ("%d: Receiving bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, pe);
492
493         char *copymsg;
494         copymsg = (char *)CmiAlloc(sndlen);
495         CmiMemcpy(copymsg,msg,sndlen);
496         //received_broadcast = 1;
497         PCQueuePush(broadcast_q, copymsg);      
498     }
499 #endif
500
501 #if CMK_NODE_QUEUE_AVAILABLE
502 #if CMK_BROADCAST_SPANNING_TREE
503     if (CMI_IS_BCAST_ON_NODES(msg)) {
504         //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
505         char *copymsg = (char *)CmiAlloc(sndlen);
506         CmiMemcpy(copymsg,msg,sndlen);
507
508         CmiLock(CsvAccess(node_bcastLock));
509         PCQueuePush(CsvAccess(node_bcastq), copymsg);
510         CmiUnlock(CsvAccess(node_bcastLock));   
511     }
512 #endif
513     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
514         CmiPushNode(msg);
515     else
516 #endif
517         CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
518
519     outstanding_recvs --;
520 }
521
522
523 void     short_pkt_recv (void             * clientdata,
524                          const DCQuad     * info,
525                          unsigned           count,
526                          unsigned           senderrank,
527                          const char       * buffer,
528                          const unsigned     sndlen) {
529     outstanding_recvs ++;
530     int alloc_size = sndlen;
531
532     char * new_buffer = (char *)CmiAlloc(alloc_size);
533     CmiMemcpy (new_buffer, buffer, sndlen);
534     
535 #if (DCMF_VERSION_MAJOR >= 2)
536     recv_done (new_buffer, NULL);
537 #else
538     recv_done (new_buffer);
539 #endif
540 }
541
542
543 DCMF_Request_t * first_multi_pkt_recv_done (const DCQuad      * info,
544                                             unsigned            count,
545                                             unsigned            senderrank,                                    
546                                             const unsigned      sndlen,
547                                             unsigned            connid,
548                                             void              * clientdata,
549                                             unsigned          * rcvlen,
550                                             char             ** buffer,
551                                             unsigned          * pw,
552                                             DCMF_Callback_t   * cb
553                                             ) {
554     outstanding_recvs ++;
555     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
556
557     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
558
559     /* printf ("Receiving %d bytes\n", sndlen); */
560     *rcvlen = sndlen;  /* to avoid malloc(0) which might
561                                    return NULL */
562
563     *buffer = (char *)CmiAlloc(alloc_size);
564     cb->function = recv_done;
565     cb->clientdata = *buffer;
566
567     *pw  = 0x7fffffff;
568     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
569 }
570
571
572 DCMF_Request_t * first_pkt_recv_done (void              * clientdata,
573                                       const DCQuad      * info,
574                                       unsigned            count,
575                                       unsigned            senderrank,
576                                       const unsigned      sndlen,
577                                       unsigned          * rcvlen,
578                                       char             ** buffer,
579                                       DCMF_Callback_t   * cb
580                                      ) {
581     outstanding_recvs ++;
582     int alloc_size = sndlen + sizeof(DCMF_Request_t) + 16;
583
584     //printf ("%d: Receiving message %d bytes from %d\n", CmiMyPe(), sndlen, senderrank);
585
586     /* printf ("Receiving %d bytes\n", sndlen); */
587     *rcvlen = sndlen;  /* to avoid malloc(0) which might
588                                    return NULL */
589
590     *buffer = (char *)CmiAlloc(alloc_size);
591     cb->function = recv_done;
592     cb->clientdata = *buffer;
593
594     return (DCMF_Request_t *) ALIGN_16(*buffer + sndlen);
595 }
596
597
598 #if CMK_NODE_QUEUE_AVAILABLE
599 void sendBroadcastMessagesNode() {
600     //this should be exectued on comm thread
601     CmiAssert(CmiMyRank() == CmiMyNodeSize());
602     if (PCQueueLength(CsvAccess(node_bcastq))==0) return;
603     //node broadcast message could be always handled by any cores (including
604     //comm thd) on this node
605     CmiLock(CsvAccess(node_bcastLock));
606     char *msg = PCQueuePop(CsvAccess(node_bcastq));
607     CmiUnlock(CsvAccess(node_bcastLock));
608     while (msg) {
609 #if CMK_BROADCAST_SPANNING_TREE
610         //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
611         SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
612 #endif
613         CmiFree(msg);
614
615         CmiLock(CsvAccess(node_bcastLock));
616         msg = PCQueuePop(CsvAccess(node_bcastq));
617         CmiUnlock(CsvAccess(node_bcastLock));
618     }
619 }
620 #endif
621
622 static void CmiSendChildrenPeers(int rank, int size, char *msg);
623 //Should be only called from comm thd
624 void sendBroadcastMessages() {
625 #if CMK_SMP
626     CmiAssert(CmiMyRank()==_Cmi_mynodesize); 
627 #endif
628
629     if (PCQueueLength(broadcast_q)==0) return;
630
631     char *msg = (char *) PCQueuePop(broadcast_q);
632
633     while (msg) {
634         int msgsize = ((CmiMsgHeaderBasic *)msg)->size;
635 #if CMK_BROADCAST_SPANNING_TREE
636         SendSpanningChildren(msgsize, msg);
637 #elif CMK_BROADCAST_HYPERCUBE
638         SendHypercube(msgsize, msg);
639 #endif
640
641 #if CMK_SMP
642         //this msg has already been pushed into CMI_DEST_RANK(msg)'s queue
643         //so we just need to send the msg to other cores, and free the msg as it 
644         //is a copy in recv_done --Chao Mei
645         CmiSendChildrenPeers(CMI_DEST_RANK(msg), msgsize, msg); 
646 #endif       
647         CmiFree(msg);
648         msg = (char *) PCQueuePop(broadcast_q);
649     }
650 }
651
652 CpvDeclare(unsigned, networkProgressCount);
653 int  networkProgressPeriod;
654
655 #if 0
656 unsigned int *ranklist;
657
658 BGTsC_t        barrier;
659
660 // -----------------------------------------
661 // Rectangular broadcast implementation
662 // -----------------------------------------
663
664 #define MAX_COMM  256
665 static void * comm_table [MAX_COMM];
666
667 typedef struct rectbcast_msg {
668     BGTsRC_t           request;
669     DCMF_Callback_t    cb;
670     char              *msg;
671 } RectBcastInfo;
672
673
674 static void bcast_done (void *data) {
675     RectBcastInfo *rinfo = (RectBcastInfo *) data;
676     CmiFree (rinfo->msg);
677     free (rinfo);
678 }
679
680 static  void *   getRectBcastRequest (unsigned comm) {
681     return comm_table [comm];
682 }
683
684
685 static  void *  bcast_recv     (unsigned               root,
686                                 unsigned               comm,
687                                 const unsigned         sndlen,
688                                 unsigned             * rcvlen,
689                                 char                ** rcvbuf,
690                                 DCMF_Callback_t      * const cb) {
691
692     int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
693
694     *rcvlen = sndlen;  /* to avoid malloc(0) which might
695                                    return NULL */
696
697     *rcvbuf       =  (char *)CmiAlloc(alloc_size);
698     cb->function  =   recv_done;
699     cb->clientdata = *rcvbuf;
700
701     return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
702
703 }
704
705
706 extern void bgl_machine_RectBcast (unsigned                 commid,
707                                        const char             * sndbuf,
708                                        unsigned                 sndlen) {
709     RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));
710     rinfo->cb.function    =   bcast_done;
711     rinfo->cb.clientdata  =   rinfo;
712
713     BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
714
715 }
716
717 extern void        bgl_machine_RectBcastInit  (unsigned               commID,
718             const BGTsRC_Geometry_t* geometry) {
719
720     CmiAssert (commID < 256);
721     CmiAssert (comm_table [commID] == NULL);
722
723     BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
724     comm_table [commID] = request;
725
726     BGTsRC_AsyncBcast_init  (request, commID,  geometry);
727 }
728
729
730
731
732 //--------------------------------------------------------------
733 //----- End Rectangular Broadcast Implementation ---------------
734 //--------------------------------------------------------------
735 #endif
736
737 //approx sleep command
738 void mysleep (int cycles) {
739     unsigned long long start = DCMF_Timebase();
740     unsigned long long end = start + cycles;
741
742     while (start < end)
743         start = DCMF_Timebase();
744
745     return;
746 }
747
748 static void * test_buf;
749
750 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
751     int n, i, count;
752
753     //fprintf(stderr, "Initializing Converse Blue Gene/P machine Layer\n");
754
755     DCMF_Messager_initialize();
756
757 #if CMK_SMP
758     DCMF_Configure_t  config_in, config_out;
759     config_in.thread_level= DCMF_THREAD_MULTIPLE;
760     config_in.interrupts  = DCMF_INTERRUPTS_OFF;
761
762     DCMF_Messager_configure(&config_in, &config_out);
763     //assert (config_out.thread_level == DCMF_THREAD_MULTIPLE); //not supported in vn mode
764 #endif
765
766     DCMF_Send_Configuration_t short_config, eager_config, rzv_config;
767
768
769     short_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
770     short_config.cb_recv_short = short_pkt_recv;
771     short_config.cb_recv       = first_pkt_recv_done;
772
773 #if (DCMF_VERSION_MAJOR >= 3)
774     short_config.network  = DCMF_DEFAULT_NETWORK;
775 #elif (DCMF_VERSION_MAJOR == 2)
776     short_config.network  = DCMF_DefaultNetwork;
777 #endif
778
779     eager_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
780     eager_config.cb_recv_short = short_pkt_recv;
781     eager_config.cb_recv       = first_pkt_recv_done;
782 #if (DCMF_VERSION_MAJOR >= 3)
783     eager_config.network  = DCMF_DEFAULT_NETWORK;
784 #elif (DCMF_VERSION_MAJOR == 2)
785     eager_config.network  = DCMF_DefaultNetwork;
786 #endif
787
788 #ifdef  OPT_RZV
789 #warning "Enabling Optimize Rzv"
790     rzv_config.protocol        = DCMF_RZV_SEND_PROTOCOL;
791 #else
792     rzv_config.protocol        = DCMF_DEFAULT_SEND_PROTOCOL;
793 #endif
794     rzv_config.cb_recv_short   = short_pkt_recv;
795     rzv_config.cb_recv         = first_pkt_recv_done;
796 #if (DCMF_VERSION_MAJOR >= 3)
797     rzv_config.network  = DCMF_DEFAULT_NETWORK;
798 #elif (DCMF_VERSION_MAJOR == 2)
799     rzv_config.network  = DCMF_DefaultNetwork;
800 #endif
801
802     DCMF_Send_register (&cmi_dcmf_short_registration, &short_config);
803     DCMF_Send_register (&cmi_dcmf_eager_registration, &eager_config);
804     DCMF_Send_register (&cmi_dcmf_rzv_registration,   &rzv_config);
805
806 #ifdef BGP_USE_AM_DIRECT
807     DCMF_Send_Configuration_t direct_config;
808     direct_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
809     direct_config.cb_recv_short = direct_short_pkt_recv;
810     direct_config.cb_recv       = direct_first_pkt_recv_done;
811 #if (DCMF_VERSION_MAJOR >= 3)
812     direct_config.network  = DCMF_DEFAULT_NETWORK;
813 #elif (DCMF_VERSION_MAJOR == 2)
814     direct_config.network  = DCMF_DefaultNetwork;
815 #endif
816     DCMF_Send_register (&cmi_dcmf_direct_registration,   &direct_config);
817     directcb.function=direct_send_done_cb;
818     directcb.clientdata=NULL;
819 #endif
820
821 #ifdef BGP_USE_RDMA_DIRECT
822     /* notification protocol */
823     DCMF_Send_Configuration_t direct_rdma_config;
824     direct_rdma_config.protocol      = DCMF_DEFAULT_SEND_PROTOCOL;
825     direct_rdma_config.cb_recv_short = direct_short_rdma_pkt_recv;
826     direct_rdma_config.cb_recv       = direct_first_rdma_pkt_recv_done;
827 #if (DCMF_VERSION_MAJOR >= 3)
828     direct_rdma_config.network  = DCMF_DEFAULT_NETWORK;
829 #elif (DCMF_VERSION_MAJOR == 2)
830     direct_rdma_config.network  = DCMF_DefaultNetwork;
831 #endif
832     DCMF_Send_register (&cmi_dcmf_direct_rdma_registration,   &direct_rdma_config);
833     directcb.function=direct_send_rdma_done_cb;
834     directcb.clientdata=NULL;
835     /* put protocol */
836    DCMF_Put_Configuration_t put_configuration = { DCMF_DEFAULT_PUT_PROTOCOL };
837    DCMF_Put_register (&cmi_dcmf_direct_put_registration, &put_configuration);
838    DCMF_Get_Configuration_t get_configuration = { DCMF_DEFAULT_GET_PROTOCOL };
839    DCMF_Get_register (&cmi_dcmf_direct_get_registration, &get_configuration);
840     
841 #endif
842     //fprintf(stderr, "Initializing Eager Protocol\n");
843
844     _Cmi_numnodes = DCMF_Messager_size();
845     _Cmi_mynode = DCMF_Messager_rank();
846
847     unsigned rank = DCMF_Messager_rank();
848     unsigned size = DCMF_Messager_size();
849
850     CmiBarrier();
851     CmiBarrier();
852     CmiBarrier();
853
854     /* processor per node */
855     _Cmi_mynodesize = 1;
856     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
857 #if ! CMK_SMP
858     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
859         CmiAbort("+ppn cannot be used in non SMP version!\n");
860 #endif
861
862     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
863     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
864     Cmi_argvcopy = CmiCopyArgs(argv);
865     Cmi_argv = argv;
866     Cmi_startfn = fn;
867     Cmi_usrsched = usched;
868
869     //printf ("Starting Charm with %d nodes and %d processors\n", CmiNumNodes(), CmiNumPes());
870
871     DCMF_Multicast_Configuration_t mconfig;
872     mconfig.protocol = DCMF_MEMFIFO_DMA_MSEND_PROTOCOL;
873     mconfig.cb_recv  = first_multi_pkt_recv_done;
874     mconfig.clientdata = NULL;
875     mconfig.connectionlist = (void **) malloc (CmiNumPes() * sizeof(unsigned long));
876     mconfig.nconnections = CmiNumPes();  
877     DCMF_Multicast_register(&cmi_dcmf_multicast_registration, &mconfig);
878
879
880     /* find dim = log2(numpes), to pretend we are a hypercube */
881     for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
882         Cmi_dim++ ;
883
884
885     /* checksum flag */
886     if (CmiGetArgFlag(argv,"+checksum")) {
887 #if CMK_ERROR_CHECKING
888         checksum_flag = 1;
889         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
890 #else
891         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
892 #endif
893     }
894
895     CsvInitialize(CmiNodeState, NodeState);
896     CmiNodeStateInit(&CsvAccess(NodeState));
897
898     broadcast_q = PCQueueCreate();
899
900 #if CMK_NODE_QUEUE_AVAILABLE
901     CsvInitialize(PCQueue, node_bcastq);
902     CsvAccess(node_bcastq) = PCQueueCreate();
903     CsvInitialize(CmiNodeLock, node_bcastLock);
904     CsvAccess(node_bcastLock) = CmiCreateLock();
905 #endif
906
907     int actualNodeSize = _Cmi_mynodesize;
908 #if !CMK_MULTICORE
909     actualNodeSize++; //considering the extra comm thread
910 #endif
911
912     procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
913     for (i=0; i<actualNodeSize; i++) {
914         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
915         procState[i].recvLock = CmiCreateLock();
916         procState[i].bcastLock = CmiCreateLock();
917     }
918
919 #if CMK_SMP && !CMK_MULTICORE
920     commThdExitLock = CmiCreateLock();
921 #endif
922
923     /* Network progress function is used to poll the network when for
924        messages. This flushes receive buffers on some  implementations*/
925     networkProgressPeriod = PROGRESS_PERIOD;
926     CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
927
928     //printf ("Starting Threads\n");
929
930     CmiStartThreads(argv);
931
932     ConverseRunPE(initret);
933 }
934
935
936 int PerrorExit (char *err) {
937     fprintf (stderr, "err\n\n");
938     exit (-1);
939     return -1;
940 }
941
942
943 void ConverseRunPE(int everReturn) {
944     //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
945
946     CmiIdleState *s=CmiNotifyGetState();
947     CmiState cs;
948     char** CmiMyArgv;
949     CmiNodeAllBarrier();
950
951     cs = CmiGetState();
952     CpvInitialize(void *,CmiLocalQueue);
953     CpvAccess(CmiLocalQueue) = cs->localqueue;
954
955     if (CmiMyRank())
956         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
957     else
958         CmiMyArgv=Cmi_argv;
959
960     CthInit(CmiMyArgv);
961
962     /* initialize the network progress counter*/
963     /* Network progress function is used to poll the network when for
964        messages. This flushes receive buffers on some  implementations*/
965     CpvInitialize(int , networkProgressCount);
966     CpvAccess(networkProgressCount) = 0;
967
968     
969
970     CpvInitialize(PCQueue, smsg_list_q);
971     CpvAccess(smsg_list_q) = PCQueueCreate();
972
973     //printf ("Before Converse Common Init\n");
974     ConverseCommonInit(CmiMyArgv);
975
976     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
977
978     CmiBarrier();
979
980     /* Converse initialization finishes, immediate messages can be processed.
981        node barrier previously should take care of the node synchronization */
982     _immediateReady = 1;
983
984     /* communication thread */
985     if (CmiMyRank() == CmiMyNodeSize()) {
986         Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
987         while (1) CommunicationServer(5);
988     } else {
989         //printf ("Calling Start Fn and the scheduler \n");
990
991         if (!everReturn) {
992             Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
993             if (Cmi_usrsched==0) CsdScheduler(-1);
994             ConverseExit();
995         }
996     }
997 }
998
999 #if CMK_SMP
1000 static int inexit = 0;
1001
1002 /* test if all processors recv queues are empty */
1003 static int RecvQueueEmpty() {
1004     int i;
1005     for (i=0; i<_Cmi_mynodesize; i++) {
1006         CmiState cs=CmiGetStateN(i);
1007         if (!PCQueueEmpty(cs->recv)) return 0;
1008     }
1009     return 1;
1010 }
1011
1012 #endif
1013
1014
1015 //extern void DCMF_Messager_dumpTimers();
1016
1017 void ConverseExit(void) {
1018
1019 #if !CMK_SMP
1020     /* in SMP mode, the comm thread is advancing the communication */
1021     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1022         AdvanceCommunications();
1023     }
1024 #endif
1025
1026     CmiNodeBarrier();
1027     ConverseCommonExit();
1028
1029     //  if(CmiMyPe()%101 == 0)
1030     //DCMF_Messager_dumpTimers();
1031
1032     if (CmiMyPe() == 0) {
1033         printf("End of program\n");
1034     }
1035
1036     CmiNodeBarrier();
1037 //  CmiNodeAllBarrier ();
1038
1039 #if CMK_SMP && !CMK_MULTICORE
1040     //CmiLock(commThdExitLock);
1041     commThdExit = 1;
1042     //CmiUnlock(commThdExitLock);
1043
1044     _bgp_msync();
1045 #endif
1046
1047     int rank0 = 0;
1048
1049     if (CmiMyRank() == 0) {
1050         rank0 = 1;
1051         //CmiFree(procState);
1052         DCMF_Messager_finalize();
1053     }
1054
1055     CmiNodeBarrier();
1056 //  CmiNodeAllBarrier ();
1057
1058     if (rank0)
1059         exit(0);
1060     else
1061         pthread_exit(NULL);
1062 }
1063
1064 /* exit() called on any node would abort the whole program */
1065 void CmiAbort(const char * message) {
1066     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1067              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
1068              msgQueueLen, outstanding_recvs, message);
1069     //CmiPrintStackTrace(0);
1070 #if 0
1071     /* Since it's abortion, why need to advance the comm? The system should
1072      * clean itself.
1073      */
1074     while (msgQueueLen > 0 || outstanding_recvs > 0) {
1075         AdvanceCommunications();
1076     }
1077
1078     CmiBarrier();
1079 #endif
1080     assert (0);
1081 }
1082
1083 static void CommunicationServer(int sleepTime) {
1084 #if CMK_SMP
1085     if (commThdExit) {
1086         while (msgQueueLen > 0 || outstanding_recvs > 0) {
1087             AdvanceCommunications();
1088         }
1089         pthread_exit(NULL);
1090         return;
1091     }
1092 #endif
1093     AdvanceCommunications();
1094
1095 #if CMK_IMMEDIATE_MSG && CMK_SMP
1096     CmiHandleImmediate();
1097 #endif
1098
1099     //mysleep(sleepTime);
1100 }
1101
1102 static void CommunicationServerThread(int sleepTime) {
1103 #if CMK_SMP
1104     CommunicationServer(sleepTime);
1105 #endif
1106 //immediate msgs are handled in AdvanceCommunications
1107 }
1108
1109 #if CMK_NODE_QUEUE_AVAILABLE
1110 char *CmiGetNonLocalNodeQ(void) {
1111     CmiState cs = CmiGetState();
1112     char *result = 0;
1113     CmiIdleLock_checkMessage(&cs->idle);
1114     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1115         MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1116
1117         if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
1118             //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1119             result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1120             CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1121         }
1122
1123         MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1124     }
1125     return result;
1126 }
1127 #endif
1128
1129
1130 void *CmiGetNonLocal() {
1131
1132     CmiState cs = CmiGetState();
1133
1134     void *msg = NULL;
1135     CmiIdleLock_checkMessage(&cs->idle);
1136     /* although it seems that lock is not needed, I found it crashes very often
1137        on mpi-smp without lock */
1138
1139 #if !CMK_SMP 
1140 //ChaoMei changes
1141     AdvanceCommunications();
1142 #endif
1143
1144     /*if(CmiMyRank()==0) printf("Got stuck here on proc[%d] node[%d]\n", CmiMyPe(), CmiMyNode());*/
1145
1146     if (PCQueueLength(cs->recv)==0) return NULL;
1147
1148     msg =  PCQueuePop(cs->recv);
1149
1150     return msg;
1151 }
1152
1153 static void CmiSendSelf(char *msg) {
1154 #if CMK_IMMEDIATE_MSG
1155     if (CmiIsImmediate(msg)) {
1156         /* CmiBecomeNonImmediate(msg); */
1157         //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1158         CmiPushImmediateMsg(msg);
1159         CmiHandleImmediate();
1160         return;
1161     }
1162 #endif
1163     
1164     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1165 }
1166
1167 void machineSend(SMSG_LIST *msg_tmp) {    
1168
1169     if (msg_tmp->destpe == CmiMyNode())
1170         CmiAbort("Sending to self\n");
1171
1172     CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumNodes());
1173     msg_tmp->cb.function     =   send_done;
1174     msg_tmp->cb.clientdata   =   msg_tmp;
1175
1176     DCMF_Protocol_t *protocol = NULL;
1177
1178     if (msg_tmp->size < 224)
1179         protocol = &cmi_dcmf_short_registration;
1180     else if (msg_tmp->size < 2048)
1181         protocol = &cmi_dcmf_eager_registration;
1182     else
1183         protocol = &cmi_dcmf_rzv_registration;
1184
1185
1186 #if CMK_SMP
1187     DCMF_CriticalSection_enter (0);
1188 #endif
1189
1190     msgQueueLen ++;
1191     DCMF_Send (protocol, &msg_tmp->send, msg_tmp->cb,
1192                DCMF_MATCH_CONSISTENCY, msg_tmp->destpe,
1193                msg_tmp->size, msg_tmp->msg, &msg_tmp->info, 1);
1194
1195 /*    
1196     #if CMK_SMP && !CMK_MULTICORE
1197     //Adding this advance call here improves the SMP performance
1198     //a little bit although it is possible that some more bugs are
1199     //introduced
1200     DCMF_Messager_advance();
1201     #endif
1202 */
1203
1204 #if CMK_SMP
1205     DCMF_CriticalSection_exit (0);
1206 #endif
1207 }
1208
1209 #define MAX_MULTICAST 128
1210 DCMF_Opcode_t  CmiOpcodeList [MAX_MULTICAST];
1211
1212 void  machineMulticast(int npes, int *pelist, int size, char* msg){  
1213   CQdCreate(CpvAccess(cQdState), npes);
1214   
1215   CmiAssert (npes < MAX_MULTICAST);
1216
1217   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1218   ((CmiMsgHeaderBasic *)msg)->size = size;  
1219   CMI_SET_BROADCAST_ROOT(msg,0);
1220   CMI_SET_CHECKSUM(msg, size);
1221   
1222   SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1223   
1224   msg_tmp->destpe    = -1;      //multicast operation
1225   msg_tmp->size      = size * npes; //keep track of #bytes outstanding
1226   msg_tmp->msg       = msg;
1227   msg_tmp->pelist    = pelist;
1228   
1229   msgQueueLen ++;
1230   
1231   DCMF_Multicast_t  mcast_info __attribute__((__aligned__(16)));
1232   
1233   mcast_info.registration   = & cmi_dcmf_multicast_registration;
1234   mcast_info.request        = & msg_tmp->send;
1235   mcast_info.cb_done.function    =   send_multi_done;
1236   mcast_info.cb_done.clientdata  =   msg_tmp;
1237   mcast_info.consistency    =   DCMF_MATCH_CONSISTENCY;
1238   mcast_info.connection_id  =   CmiMyPe();
1239   mcast_info.bytes          =   size;
1240   mcast_info.src            =   msg;
1241   mcast_info.nranks         =   npes;
1242   mcast_info.ranks          =   (unsigned *)pelist;
1243   mcast_info.opcodes        =   CmiOpcodeList;   //static list of MAX_MULTICAST entires with 0 in them
1244   mcast_info.flags          =   0;
1245   mcast_info.msginfo        =   &msg_tmp->info;
1246   mcast_info.count          =   1;
1247
1248   DCMF_Multicast (&mcast_info);
1249 }
1250
1251
1252
1253 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg);
1254
1255 /* The general free send function
1256  * Send is synchronous, and free msg after posted
1257  */
1258 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1259
1260   if (destPE < 0 || destPE > CmiNumPes ())
1261     printf ("Sending to %d\n", destPE);
1262
1263   CmiAssert (destPE >= 0 && destPE < CmiNumPes());
1264
1265     CmiState cs = CmiGetState();
1266
1267     if (destPE==cs->pe) {
1268         CmiSendSelf(msg);
1269         return;
1270     }
1271     //printf ("%d: Sending Message to %d \n", CmiMyPe(), destPE);
1272     CmiGeneralFreeSendN (CmiNodeOf(destPE), CmiRankOf(destPE), size, msg);
1273 }
1274
1275 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg) {
1276
1277     //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
1278     //  CmiMyNode(), node, rank);
1279
1280 #if CMK_SMP
1281     CMI_DEST_RANK(msg) = rank;
1282     //CMI_SET_CHECKSUM(msg, size);
1283
1284     if (node == CmiMyNode()) {
1285         CmiPushPE(rank,msg);
1286         return;
1287     }
1288 #endif
1289
1290     SMSG_LIST *msg_tmp = smsg_allocate(); //(SMSG_LIST *) malloc(sizeof(SMSG_LIST));
1291     msg_tmp->destpe = node; //destPE;
1292     msg_tmp->size = size;
1293     msg_tmp->msg = msg;
1294
1295     machineSend(msg_tmp);
1296 }
1297
1298 void CmiSyncSendFn(int destPE, int size, char *msg) {
1299     char *copymsg;
1300     copymsg = (char *)CmiAlloc(size);
1301     CmiMemcpy(copymsg,msg,size);
1302     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1303     CmiFreeSendFn(destPE,size,copymsg);
1304 }
1305
1306 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1307     CQdCreate(CpvAccess(cQdState), 1);
1308     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1309
1310     CMI_SET_BROADCAST_ROOT(msg,0);
1311     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1312     ((CmiMsgHeaderBasic *)msg)->size = size;
1313     CMI_SET_CHECKSUM(msg, size);
1314
1315     CmiGeneralFreeSend(destPE,size,msg);
1316 }
1317
1318 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1319 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1320     char *copymsg;
1321     copymsg = (char *)CmiAlloc(size);
1322     CmiMemcpy(copymsg, msg, size);
1323
1324     //  asm volatile("sync" ::: "memory");
1325
1326     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1327     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1328     CMI_SET_CHECKSUM(copymsg, size);
1329
1330     CmiGeneralFreeSend(destPE,size,copymsg);
1331 }
1332
1333 //send msgs to other ranks except the rank specified in the argument
1334 //if rank==-1, then it means the msg should be sent to all cores on the node
1335 static void CmiSendChildrenPeers(int rank, int size, char *msg) {
1336     //printf ("%d [%d]: Send children peers except rank %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank());
1337     int r=0;
1338
1339 //With comm thd, broadcast msg will be pulled from bcast queue from the comm thd
1340 //And the msg would be finally pushed into other cores on the same node by the
1341 //comm thd. But it's possible a msg has already been pushed into rank 0 in
1342 //recv_done func call. So there's no need to push the bcast msg into the recv
1343 //queue again in the case BCASTMSG_ONLY_TO_COMMTHD is not set
1344
1345     for (;r<rank; r++) {
1346         char *copymsg;
1347         copymsg = (char *)CmiAlloc(size);
1348         CmiMemcpy(copymsg,msg,size);        
1349         CmiPushPE (r, copymsg);
1350     }
1351
1352     for (r=rank+1; r<_Cmi_mynodesize; r++) {
1353         char *copymsg;
1354         copymsg = (char *)CmiAlloc(size);
1355         CmiMemcpy(copymsg,msg,size);        
1356         CmiPushPE (r, copymsg);
1357     }
1358 }
1359
1360 //In this implementation, msgs are first sent out to the comm thd or rank 0 of other nodes
1361 //then send msgs to the other cores on the same node
1362 //Routine is responsible to send outgoing inter-node msgs
1363 void SendSpanningChildren(int size, char *msg) {
1364     CmiState cs = CmiGetState();
1365     int startpe = CMI_BROADCAST_ROOT(msg)-1;
1366     int i;
1367
1368     //printf ("%d [%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), startpe);
1369
1370     CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1371
1372     int startNid = CmiNodeOf(startpe);
1373     int thisNid;
1374     thisNid = CmiMyNode();
1375
1376     //printf ("%d [%d/%d]: In Send Spanning Tree with startpe %d\n",  CmiMyPe(), CmiMyNode(), CmiMyRank(), startpe);
1377     //send to other nodes
1378     int dist = thisNid - startNid;
1379     if (dist<0) dist += _Cmi_numnodes;
1380     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1381         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1382         if (nid > _Cmi_numnodes - 1) break;
1383         nid += startNid;
1384         nid = nid%_Cmi_numnodes;
1385         CmiAssert(nid>=0 && nid<_Cmi_numnodes && nid!=thisNid);
1386         int p = CmiNodeFirst(nid);
1387         //printf ("%d [%d]: Sending Spanning Tree Msg to %d\n",  CmiMyPe(), CmiMyNode(), p);
1388         CmiSyncSendFn1(p, size, msg);
1389     }
1390
1391 #if !CMK_SMP
1392 #if ENABLE_BROADCAST_THROTTLE
1393     SendMsgsUntil (0);
1394 #endif
1395 #endif
1396 }
1397
1398 /* send msg along the hypercube in broadcast. (Sameer) */
1399 void SendHypercube(int size, char *msg) {
1400     CmiState cs = CmiGetState();
1401     int curcycle = CMI_GET_CYCLE(msg);
1402     int i;
1403
1404     double logp = CmiNumPes();
1405     logp = log(logp)/log(2.0);
1406     logp = ceil(logp);
1407
1408     /*  CmiPrintf("In hypercube\n"); */
1409     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1410
1411     for (i = curcycle; i < logp; i++) {
1412         int p = cs->pe ^ (1 << i);
1413         /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1414         if (p < CmiNumPes()) {
1415             CMI_SET_CYCLE(msg, i + 1);
1416
1417             CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1418             CmiSyncSendFn1(p, size, msg);
1419         }
1420     }
1421 }
1422
1423 void CmiSyncBroadcastFn(int size, char *msg) {
1424     char *copymsg;
1425     copymsg = (char *)CmiAlloc(size);
1426     CmiMemcpy(copymsg,msg,size);
1427     CmiFreeBroadcastFn(size,copymsg);
1428 }
1429
1430 void CmiFreeBroadcastFn(int size, char *msg) {
1431     //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1432     CmiState cs = CmiGetState();
1433 #if CMK_BROADCAST_SPANNING_TREE    
1434     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1435     CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1436     SendSpanningChildren(size, msg);
1437     CmiSendChildrenPeers(CmiMyRank(), size, msg);
1438 #elif CMK_BROADCAST_HYPERCUBE    
1439     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1440     CMI_SET_CYCLE(msg, 0);
1441     SendHypercube(size, msg);
1442     CmiSendChildrenPeers(CmiMyRank(), size, msg);
1443 #else
1444     int i;
1445     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1446         CmiSyncSendFn(i,size,msg);
1447     for ( i=0; i<cs->pe; i++ )
1448         CmiSyncSendFn(i,size,msg);
1449 #endif
1450     CmiFree(msg);
1451 }
1452
1453 void CmiSyncBroadcastAllFn(int size, char *msg) {
1454     char *copymsg;
1455     copymsg = (char *)CmiAlloc(size);
1456     CmiMemcpy(copymsg,msg,size);
1457     CmiState cs = CmiGetState();
1458     CmiSyncSendFn(cs->pe,size,copymsg);
1459     CmiFreeBroadcastFn(size, copymsg);
1460 }
1461
1462 void CmiFreeBroadcastAllFn(int size, char *msg) {
1463     CmiState cs = CmiGetState();
1464     CmiSyncSendFn(cs->pe,size,msg);
1465     CmiFreeBroadcastFn(size, msg);
1466 }
1467
1468 void AdvanceCommunications() {
1469
1470 #if CMK_SMP
1471     DCMF_CriticalSection_enter (0);
1472 #endif
1473
1474     while(DCMF_Messager_advance()>0);
1475     //DCMF_Messager_advance();
1476
1477 #if CMK_SMP
1478     DCMF_CriticalSection_exit (0);
1479 #endif
1480
1481     sendBroadcastMessages();
1482 #if CMK_NODE_QUEUE_AVAILABLE
1483     sendBroadcastMessagesNode();
1484 #endif
1485
1486 #if CMK_IMMEDIATE_MSG && !CMK_SMP
1487     CmiHandleImmediate();
1488 #endif
1489 }
1490
1491
1492 static void SendMsgsUntil(int targetm) {
1493
1494     while (msgQueueLen>targetm) {
1495       //AdvanceCommunications ();
1496 #if CMK_SMP
1497       DCMF_CriticalSection_enter (0);
1498 #endif
1499       
1500       while(DCMF_Messager_advance()>0);
1501       //DCMF_Messager_advance();
1502       
1503 #if CMK_SMP
1504       DCMF_CriticalSection_exit (0);
1505 #endif
1506     }
1507 }
1508
1509 void CmiNotifyIdle() {
1510 #if !CMK_SMP || CMK_MULTICORE
1511     AdvanceCommunications();
1512 #endif
1513 }
1514
1515
1516 /*==========================================================*/
1517 /*==========================================================*/
1518 /*==========================================================*/
1519
1520 /************ Recommended routines ***********************/
1521 /************ You dont have to implement these but they are supported
1522  in the converse syntax and some rare programs may crash. But most
1523  programs dont need them. *************/
1524
1525 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1526     CmiAbort("CmiAsyncSendFn not implemented.");
1527     return (CmiCommHandle) 0;
1528 }
1529
1530 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1531     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1532     return (CmiCommHandle) 0;
1533 }
1534
1535 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1536     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1537     return (CmiCommHandle) 0;
1538 }
1539
1540 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1541     CmiAbort("CmiAsyncMsgSent not implemented.");
1542     return 0;
1543 }
1544 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1545     CmiAbort("CmiReleaseCommHandle not implemented.");
1546 }
1547
1548
1549 /*==========================================================*/
1550 /*==========================================================*/
1551 /*==========================================================*/
1552
1553 /* Optional routines which could use common code which is shared with
1554    other machine layer implementations. */
1555
1556 /* MULTICAST/VECTOR SENDING FUNCTIONS
1557
1558  * In relations to some flags, some other delivery functions may be needed.
1559  */
1560
1561 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1562
1563 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1564     char *copymsg;
1565     copymsg = (char *)CmiAlloc(size);
1566     CmiMemcpy(copymsg,msg,size);
1567     CmiFreeListSendFn(npes, pes, size, msg);
1568 }
1569
1570 //#define OPTIMIZED_MULTICAST  0
1571
1572 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1573 #if CMK_SMP && !CMK_MULTICORE
1574     //DCMF_CriticalSection_enter (0);
1575 #endif
1576     CMI_SET_BROADCAST_ROOT(msg,0);
1577     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1578     ((CmiMsgHeaderBasic *)msg)->size = size;
1579     CMI_SET_CHECKSUM(msg, size);
1580
1581     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeListSendFn on comm thd on node %d\n", CmiMyNode());
1582
1583     //printf("%d: In Free List Send Fn\n", CmiMyPe());
1584     int new_npes = 0;
1585
1586     int i, count = 0, my_loc = -1;
1587     for (i=0; i<npes; i++) {
1588         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode()) {
1589         if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1590             CmiSyncSend(pes[i], size, msg);
1591             //my_loc = i;
1592         }
1593     }
1594
1595 #if OPTIMIZED_MULTICAST && CMK_SMP
1596 #warning "Using Optimized Multicast"
1597     if (npes > 1) {    
1598       int *newpelist = (int *) malloc (sizeof(int) * npes);
1599       int new_npes = 0;
1600     
1601       for(i=0; i<npes; i++) {
1602         if(CmiNodeOf(pes[i]) == CmiMyNode()) 
1603           continue;
1604         else
1605           newpelist[new_npes++] = pes[i];
1606       }
1607
1608       if (new_npes >= 1)
1609         machineMulticast (new_npes, newpelist, size, msg);
1610       else
1611         CmiFree (msg);
1612       return;
1613     }
1614 #endif
1615
1616     for (i=0;i<npes;i++) {
1617         //if (pes[i] == CmiMyPe() || CmiNodeOf(pes[i]) == CmiMyNode());
1618         if (CmiNodeOf(pes[i]) == CmiMyNode());
1619         else if (i < npes - 1) {
1620 #if !CMK_SMP /*|| (CMK_SMP && !CMK_MULTICORE)*/
1621             CmiReference(msg);
1622             CmiGeneralFreeSend(pes[i], size, msg);
1623 #else
1624             CmiSyncSend(pes[i], size, msg);
1625 #endif
1626         }
1627     }
1628
1629     //if (npes  && (pes[npes-1] != CmiMyPe() && CmiNodeOf(pes[i]) != CmiMyNode()))
1630     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1631         CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
1632     else
1633         CmiFree(msg);
1634
1635     //AdvanceCommunications();
1636 #if CMK_SMP && !CMK_MULTICORE
1637     //DCMF_CriticalSection_exit (0);
1638 #endif
1639 }
1640
1641 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1642     CmiAbort("CmiAsyncListSendFn not implemented.");
1643     return (CmiCommHandle) 0;
1644 }
1645 #endif
1646
1647 /** NODE SENDING FUNCTIONS
1648
1649  * If there is a node queue, and we consider also nodes as entity (tipically in
1650  * SMP versions), these functions are needed.
1651  */
1652
1653 #if CMK_NODE_QUEUE_AVAILABLE
1654
1655 void          CmiSyncNodeSendFn(int, int, char *);
1656 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1657 void          CmiFreeNodeSendFn(int, int, char *);
1658
1659 void          CmiSyncNodeBroadcastFn(int, char *);
1660 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1661 void          CmiFreeNodeBroadcastFn(int, char *);
1662
1663 void          CmiSyncNodeBroadcastAllFn(int, char *);
1664 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1665 void          CmiFreeNodeBroadcastAllFn(int, char *);
1666
1667 #endif
1668
1669
1670 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1671
1672 int CmiMyPe();
1673 int CmiMyRank();
1674 int CmiNodeFirst(int node);
1675 int CmiNodeSize(int node);
1676 int CmiNodeOf(int pe);
1677 int CmiRankOf(int pe);
1678
1679 int CmiMyPe(void) {
1680     return CmiGetState()->pe;
1681 }
1682
1683 int CmiMyRank(void) {
1684     return CmiGetState()->rank;
1685 }
1686
1687 int CmiNodeFirst(int node) {
1688     return node*_Cmi_mynodesize;
1689 }
1690 int CmiNodeSize(int node)  {
1691     return _Cmi_mynodesize;
1692 }
1693
1694 int CmiNodeOf(int pe)      {
1695     return (pe/_Cmi_mynodesize);
1696 }
1697 int CmiRankOf(int pe)      {
1698     return pe%_Cmi_mynodesize;
1699 }
1700
1701
1702 /* optional, these functions are implemented in "machine-smp.c", so including
1703    this file avoid the necessity to reimplement them.
1704  */
1705 void CmiNodeBarrier(void);
1706 void CmiNodeAllBarrier(void);
1707 CmiNodeLock CmiCreateLock();
1708 void CmiDestroyLock(CmiNodeLock lock);
1709
1710 #endif
1711
1712 /** IMMEDIATE MESSAGES
1713
1714  * If immediate messages are supported, the following function is needed. There
1715  * is an exeption if the machine progress is also defined (see later for this).
1716
1717  * Moreover, the file "immediate.c" should be included, otherwise all its
1718  * functions and variables have to be redefined.
1719 */
1720
1721 #if CMK_CCS_AVAILABLE
1722
1723 #include "immediate.c"
1724
1725 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1726 void CmiProbeImmediateMsg();
1727 #endif
1728
1729 #endif
1730
1731
1732 /** MACHINE PROGRESS DEFINED
1733
1734  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1735  * to be pulled out of the network manually. For this reason the following
1736  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1737  * not be defined anymore.
1738  */
1739
1740 #if CMK_MACHINE_PROGRESS_DEFINED
1741
1742
1743
1744 void CmiMachineProgressImpl() {
1745
1746 #if !CMK_SMP
1747     AdvanceCommunications();
1748 #else
1749     /*Not implemented yet. Communication server does not seem to be
1750       thread safe */
1751 #endif
1752 }
1753
1754 #endif
1755
1756 /* Dummy implementation */
1757 extern int CmiBarrier() {
1758     //Use DCMF barrier later
1759 }
1760
1761 #if CMK_NODE_QUEUE_AVAILABLE
1762 static void CmiSendNodeSelf(char *msg) {
1763 #if CMK_IMMEDIATE_MSG
1764     if (CmiIsImmediate(msg)) {
1765         //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
1766         CmiPushImmediateMsg(msg);
1767 #if CMK_MULTICORE
1768         CmiHandleImmediate();
1769 #endif
1770         return;
1771     }
1772 #endif    
1773     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1774     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1775     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1776 }
1777
1778 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1779     CmiAbort ("Async Node Send not supported\n");
1780 }
1781
1782 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1783
1784     CMI_SET_BROADCAST_ROOT(msg,0);
1785     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1786     ((CmiMsgHeaderBasic *)msg)->size = size;
1787     CMI_SET_CHECKSUM(msg, size);
1788     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1789     
1790     CQdCreate(CpvAccess(cQdState), 1);
1791
1792     if (node == _Cmi_mynode) {
1793         CmiSendNodeSelf(msg);
1794     } else {
1795         CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg);
1796     }
1797 }
1798
1799 void CmiSyncNodeSendFn(int p, int s, char *m) {
1800     char *dupmsg;
1801     dupmsg = (char *)CmiAlloc(s);
1802     CmiMemcpy(dupmsg,m,s);
1803     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1804     CmiFreeNodeSendFn(p, s, dupmsg);
1805 }
1806
1807 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1808     return NULL;
1809 }
1810
1811 void SendSpanningChildrenNode(int size, char *msg) {
1812     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1813     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1814     assert(startnode>=0 && startnode<CmiNumNodes());
1815
1816     int dist = CmiMyNode()-startnode;
1817     if (dist<0) dist += CmiNumNodes();
1818     int i;
1819     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1820         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1821         if (nid > CmiNumNodes() - 1) break;
1822         nid += startnode;
1823         nid = nid%CmiNumNodes();
1824         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1825         char *dupmsg = (char *)CmiAlloc(size);
1826         CmiMemcpy(dupmsg,msg,size);
1827         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1828         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg);
1829     }
1830 }
1831
1832 /* need */
1833 void CmiFreeNodeBroadcastFn(int s, char *m) {
1834 #if CMK_BROADCAST_SPANNING_TREE
1835     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1836     
1837     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1838
1839     int mynode = CmiMyNode();
1840     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1841     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1842     ((CmiMsgHeaderBasic *)m)->size = s;
1843     CMI_SET_CHECKSUM(m, s);
1844     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1845
1846     SendSpanningChildrenNode(s, m);
1847 #else
1848     int i;
1849     for (i=0; i<CmiNumNodes(); i++) {
1850         if (i==CmiMyNode()) continue;
1851         char *dupmsg = (char *)CmiAlloc(s);
1852         CmiMemcpy(dupmsg,m,s);
1853         CmiFreeNodeSendFn(i, s, dupmsg);
1854     }
1855 #endif
1856     CmiFree(m);    
1857 }
1858
1859 void CmiSyncNodeBroadcastFn(int s, char *m) {
1860     char *dupmsg;
1861     dupmsg = (char *)CmiAlloc(s);
1862     CmiMemcpy(dupmsg,m,s);
1863     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1864     CmiFreeNodeBroadcastFn(s, dupmsg);
1865 }
1866
1867 /* need */
1868 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1869     char *dupmsg = (char *)CmiAlloc(s);
1870     CmiMemcpy(dupmsg,m,s);
1871     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1872     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1873     CMI_SET_CHECKSUM(dupmsg, s);
1874
1875     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1876     
1877     CQdCreate(CpvAccess(cQdState), 1);
1878     CmiSendNodeSelf(dupmsg);
1879
1880     CmiFreeNodeBroadcastFn(s, m);
1881 }
1882
1883 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1884     char *dupmsg;
1885     dupmsg = (char *)CmiAlloc(s);
1886     CmiMemcpy(dupmsg,m,s);
1887     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1888     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1889 }
1890
1891
1892 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1893     return NULL;
1894 }
1895 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1896
1897 #include "manytomany.c"
1898
1899
1900 /*********************************************************************************************
1901 This section is for CmiDirect. This is a variant of the  persistent communication in which
1902 the user can transfer data between processors without using Charm++ messages. This lets the user
1903 send and receive data from the middle of his arrays without any copying on either send or receive
1904 side
1905 *********************************************************************************************/
1906
1907
1908
1909
1910 #ifdef BGP_USE_AM_DIRECT
1911
1912 #include "cmidirect.h"
1913
1914 /* We can avoid a receiver side lookup by just sending the whole shebang.
1915    DCMF header is in units of quad words (16 bytes), so we'd need less than a
1916    quad word for the handle if we just sent that and did a lookup. Or exactly
1917    2 quad words for the buffer pointer, callback pointer, callback
1918    data pointer, and DCMF_Request_t pointer with no lookup.
1919
1920    Since CmiDirect is generally going to be used for messages which aren't
1921    tiny, the extra 16 bytes is not likely to impact performance noticably and
1922    not having to lookup handles in tables simplifies the code enormously.
1923
1924    EJB   2008/4/2
1925 */
1926
1927
1928 /**
1929  To be called on the receiver to create a handle and return its number
1930 **/
1931 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
1932     /* with two-sided primitives we just bundle the buffer and callback info into the handle so the sender can remind us about it later. */
1933     struct infiDirectUserHandle userHandle;
1934     userHandle.handle=1; /* doesn't matter on BG/P*/
1935     userHandle.senderNode=senderNode;
1936     userHandle.recverNode=_Cmi_mynode;
1937     userHandle.recverBufSize=recvBufSize;
1938     userHandle.recverBuf=recvBuf;
1939     userHandle.initialValue=initialValue;
1940     userHandle.callbackFnPtr=callbackFnPtr;
1941     userHandle.callbackData=callbackData;
1942     userHandle.DCMF_rq_trecv=(DCMF_Request_t *) ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
1943 #if CMI_DIRECT_DEBUG
1944     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
1945 #endif
1946     return userHandle;
1947 }
1948
1949 /****
1950  To be called on the sender to attach the sender's buffer to this handle
1951 ******/
1952
1953 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
1954
1955     /* one-sided primitives would require registration of memory */
1956
1957     /* with two-sided primitives we just record the sender buf in the handle */
1958     userHandle->senderBuf=sendBuf;
1959     CmiAssert(sendBufSize==userHandle->recverBufSize);
1960     userHandle->DCMF_rq_tsend = (DCMF_Request_t *) ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
1961 #if CMI_DIRECT_DEBUG
1962     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
1963 #endif
1964
1965 }
1966
1967 /****
1968 To be called on the sender to do the actual data transfer
1969 ******/
1970 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
1971     /** invoke a DCMF_Send with the direct callback */
1972     DCMF_Protocol_t *protocol = NULL;
1973     protocol = &cmi_dcmf_direct_registration;
1974     /* local copy */
1975     CmiAssert(userHandle->recverBuf!=NULL);
1976     CmiAssert(userHandle->senderBuf!=NULL);
1977     CmiAssert(userHandle->recverBufSize>0);
1978     if (userHandle->recverNode== _Cmi_mynode) {
1979 #if CMI_DIRECT_DEBUG
1980         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
1981 #endif
1982
1983         CmiMemcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
1984         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
1985     } else {
1986         dcmfDirectMsgHeader msgHead;
1987         msgHead.recverBuf=userHandle->recverBuf;
1988         msgHead.callbackFnPtr=userHandle->callbackFnPtr;
1989         msgHead.callbackData=userHandle->callbackData;
1990         msgHead.DCMF_rq_t=(DCMF_Request_t *) userHandle->DCMF_rq_trecv;
1991 #if CMK_SMP
1992         DCMF_CriticalSection_enter (0);
1993 #endif
1994 #if CMI_DIRECT_DEBUG
1995         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
1996 #endif
1997         DCMF_Send (protocol,
1998                    (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
1999                    directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2000                    userHandle->recverBufSize, userHandle->senderBuf,
2001                    (struct DCQuad *) &(msgHead), 2);
2002
2003 #if CMK_SMP
2004         DCMF_CriticalSection_exit (0);
2005 #endif
2006     }
2007 }
2008
2009 void CmiDirect_get(struct infiDirectUserHandle *userHandle) {
2010     CmiAbort("Not Implemented, switch to #define BGP_USE_RDMA_DIRECT");
2011 }
2012
2013 /**** up to the user to safely call this */
2014 void CmiDirect_deassocLocalBuffer(struct infiDirectUserHandle *userHandle)
2015 {
2016     CmiAssert(userHandle->senderNode==_Cmi_mynode);
2017 #if CMK_SMP
2018     DCMF_CriticalSection_enter (0);
2019 #endif
2020     CmiFree(userHandle->DCMF_rq_tsend);
2021 #if CMK_SMP
2022     DCMF_CriticalSection_exit (0);
2023 #endif
2024
2025 }
2026
2027 /**** up to the user to safely call this */
2028 void CmiDirect_destroyHandle(struct infiDirectUserHandle *userHandle){
2029     CmiAssert(userHandle->recverNode==_Cmi_mynode);
2030 #if CMK_SMP
2031     DCMF_CriticalSection_enter (0);
2032 #endif
2033     CmiFree(userHandle->DCMF_rq_trecv);
2034
2035 #if CMK_SMP
2036     DCMF_CriticalSection_exit (0);
2037 #endif
2038 }
2039
2040
2041 /**** Should not be called the first time *********/
2042 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2043     /* no op on BGP */
2044 }
2045
2046 /**** Should not be called the first time *********/
2047 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2048     /* no op on BGP */
2049 }
2050
2051 /**** Should not be called the first time *********/
2052 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2053     /* no op on BGP */
2054 }
2055
2056 #endif /* BGP_USE_AM_DIRECT*/
2057
2058 #ifdef BGP_USE_RDMA_DIRECT
2059
2060 #include "cmidirect.h"
2061
2062 /* 
2063    Notification protocol passes callback function and data in a single
2064    quadword.  This occurs in a message triggered by the sender side ack
2065    callback and therefore has higher latency than polling, but is guaranteed
2066    to be semantically correct.  The latency for a single packet that isn't
2067    hitting charm/converse should be pretty minimal, but you could run into
2068    sender side progress issues.  The alternative of polling on the out of band
2069    byte scheme creates correctness issues in that the data really has to be
2070    out of band and you rely on the buffer being written in order.  It also has
2071    annoying polling issues.  A third scheme could add a second put to a
2072    control region to poll upon and force sequential consistency between
2073    puts. Its not really clear that this would be faster or avoid the progress
2074    issue since you run into the same issues to enforce that sequential
2075    consistency.
2076
2077    EJB   2011/1/20
2078 */
2079
2080
2081 /* local function to use the ack as our signal to send a remote notify */
2082 static void CmiNotifyRemoteRDMA(void *handle, struct DCMF_Error_t *error)
2083 {
2084     struct infiDirectUserHandle *userHandle= (struct infiDirectUserHandle *) handle;
2085     dcmfDirectRDMAMsgHeader msgHead;
2086     msgHead.callbackFnPtr=userHandle->callbackFnPtr;
2087     msgHead.callbackData=userHandle->callbackData;
2088 #if CMK_SMP
2089     DCMF_CriticalSection_enter (0);
2090 #endif
2091 #if CMI_DIRECT_DEBUG
2092     CmiPrintf("[%d] RDMA notify put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p \n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2093 #endif
2094     DCMF_Result res=DCMF_Send (&cmi_dcmf_direct_rdma_registration,
2095                userHandle->DCMF_rq_tsend,
2096                directcb, DCMF_MATCH_CONSISTENCY, userHandle->recverNode,
2097                sizeof(dcmfDirectRDMAMsgHeader), 
2098
2099                                userHandle->DCMF_notify_buf,
2100                (struct DCQuad *) &(msgHead), 1);
2101 //    CmiAssert(res==DCMF_SUCCESS);
2102 #if CMK_SMP
2103     DCMF_CriticalSection_exit (0);
2104 #endif    
2105 }
2106
2107 /**
2108  To be called on the receiver to create a handle and return its number
2109 **/
2110
2111
2112 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue) {
2113     /* one-sided primitives require registration of memory */
2114     struct infiDirectUserHandle userHandle;
2115     size_t numbytesRegistered=0;
2116     DCMF_Result regresult=DCMF_Memregion_create( &userHandle.DCMF_recverMemregion,
2117                                                  &numbytesRegistered,
2118                                                  recvBufSize,
2119                                                  recvBuf,
2120                                                  0);
2121     CmiAssert(numbytesRegistered==recvBufSize);
2122     CmiAssert(regresult==DCMF_SUCCESS);
2123     
2124
2125     userHandle.handle=1; /* doesn't matter on BG/P*/
2126     userHandle.senderNode=senderNode;
2127     userHandle.recverNode=_Cmi_mynode;
2128     userHandle.recverBufSize=recvBufSize;
2129     userHandle.recverBuf=recvBuf;
2130     userHandle.initialValue=initialValue;
2131     userHandle.callbackFnPtr=callbackFnPtr;
2132     userHandle.callbackData=callbackData;
2133     userHandle.DCMF_rq_trecv=(DCMF_Request_t *) ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2134 #if CMI_DIRECT_DEBUG
2135     CmiPrintf("[%d] RDMA create addr %p %d callback %p callbackdata %p\n",CmiMyPe(),userHandle.recverBuf,userHandle.recverBufSize, userHandle.callbackFnPtr, userHandle.callbackData);
2136 #endif
2137     return userHandle;
2138 }
2139
2140 /****
2141  To be called on the sender to attach the sender's buffer to this handle
2142 ******/
2143
2144 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize) {
2145     /* one-sided primitives would require registration of memory */
2146     userHandle->senderBuf=sendBuf;
2147     CmiAssert(sendBufSize==userHandle->recverBufSize);
2148     userHandle->DCMF_rq_tsend =(DCMF_Request_t *) ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+16));
2149     size_t numbytesRegistered=0;  // set as return value from create
2150     userHandle->DCMF_notify_buf=ALIGN_16(CmiAlloc(sizeof(DCMF_Request_t)+32));
2151     userHandle->DCMF_notify_cb.function=CmiNotifyRemoteRDMA; 
2152     userHandle->DCMF_notify_cb.clientdata=userHandle;
2153     DCMF_Result regresult=DCMF_Memregion_create( &userHandle->DCMF_senderMemregion,
2154                                                  &numbytesRegistered,
2155                                                  sendBufSize,
2156                                                  sendBuf,
2157                                                  0);
2158     CmiAssert(numbytesRegistered==sendBufSize);
2159     CmiAssert(regresult==DCMF_SUCCESS);
2160
2161 #if CMI_DIRECT_DEBUG
2162     CmiPrintf("[%d] RDMA assoc addr %p %d to receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,sendBufSize, userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2163 #endif
2164
2165 }
2166
2167
2168 /****
2169 To be called on the sender to do the actual data transfer
2170 ******/
2171 void CmiDirect_put(struct infiDirectUserHandle *userHandle) {
2172     /** invoke a DCMF_Put with the direct callback */
2173
2174     CmiAssert(userHandle->recverBuf!=NULL);
2175     CmiAssert(userHandle->senderBuf!=NULL);
2176     CmiAssert(userHandle->recverBufSize>0);
2177     if (userHandle->recverNode== _Cmi_mynode) {     /* local copy */
2178 #if CMI_DIRECT_DEBUG
2179         CmiPrintf("[%d] RDMA local put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2180 #endif
2181
2182         CmiMemcpy(userHandle->recverBuf,userHandle->senderBuf,userHandle->recverBufSize);
2183         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2184     } else {
2185 #if CMK_SMP
2186         DCMF_CriticalSection_enter (0);
2187 #endif
2188 #if CMI_DIRECT_DEBUG
2189         CmiPrintf("[%d] RDMA put addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2190 #endif
2191         DCMF_Result 
2192             Res= DCMF_Put(&cmi_dcmf_direct_put_registration,
2193                           userHandle->DCMF_rq_tsend,
2194                           directcb, DCMF_RELAXED_CONSISTENCY, 
2195                           userHandle->recverNode,
2196                           userHandle->recverBufSize,
2197                           &userHandle->DCMF_senderMemregion,
2198                           &userHandle->DCMF_recverMemregion,
2199                           0, /* offsets are zero */
2200                           0, 
2201                           userHandle->DCMF_notify_cb
2202                           );
2203         CmiAssert(Res==DCMF_SUCCESS); 
2204 #if CMK_SMP
2205         DCMF_CriticalSection_exit (0);
2206 #endif
2207     }
2208 }
2209
2210 /****
2211 To be called on the receiver to initiate the actual data transfer
2212 ******/
2213 void CmiDirect_get(struct infiDirectUserHandle *userHandle) {
2214     /** invoke a DCMF_Get with the direct callback */
2215
2216     CmiAssert(userHandle->recverBuf!=NULL);
2217     CmiAssert(userHandle->senderBuf!=NULL);
2218     CmiAssert(userHandle->recverBufSize>0);
2219     if (userHandle->recverNode== _Cmi_mynode) {     /* local copy */
2220 #if CMI_DIRECT_DEBUG
2221         CmiPrintf("[%d] RDMA local get addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2222 #endif
2223
2224         CmiMemcpy(userHandle->senderBuf,userHandle->recverBuf,userHandle->recverBufSize);
2225         (*(userHandle->callbackFnPtr))(userHandle->callbackData);
2226     } else {
2227         struct DCMF_Callback_t done_cb;
2228         done_cb.function=userHandle->callbackFnPtr;
2229         done_cb.clientdata=userHandle->callbackData;
2230 #if CMK_SMP
2231         DCMF_CriticalSection_enter (0);
2232 #endif
2233 #if CMI_DIRECT_DEBUG
2234         CmiPrintf("[%d] RDMA get addr %p %d to recverNode %d receiver addr %p callback %p callbackdata %p\n",CmiMyPe(),userHandle->senderBuf,userHandle->recverBufSize, userHandle->recverNode,userHandle->recverBuf, userHandle->callbackFnPtr, userHandle->callbackData);
2235 #endif
2236         DCMF_Result 
2237             Res= DCMF_Get(&cmi_dcmf_direct_get_registration,
2238                           (DCMF_Request_t *) userHandle->DCMF_rq_tsend,
2239                           done_cb, DCMF_RELAXED_CONSISTENCY, 
2240                           userHandle->recverNode,
2241                           userHandle->recverBufSize,
2242                           & userHandle->DCMF_recverMemregion,
2243                           & userHandle->DCMF_senderMemregion,
2244                           0, /* offsets are zero */
2245                           0
2246                           );
2247         CmiAssert(Res==DCMF_SUCCESS); 
2248
2249
2250 #if CMK_SMP
2251         DCMF_CriticalSection_exit (0);
2252 #endif
2253     }
2254 }
2255
2256 /**** up to the user to safely call this */
2257 void CmiDirect_deassocLocalBuffer(struct infiDirectUserHandle *userHandle)
2258 {
2259     CmiAssert(userHandle->senderNode==_Cmi_mynode);
2260 #if CMK_SMP
2261     DCMF_CriticalSection_enter (0);
2262 #endif
2263
2264     DCMF_Memregion_destroy((DCMF_Memregion_t*) userHandle->DCMF_senderMemregion);
2265     CmiFree(userHandle->DCMF_notify_buf);
2266     CmiFree(userHandle->DCMF_rq_tsend);
2267 #if CMK_SMP
2268     DCMF_CriticalSection_exit (0);
2269 #endif
2270
2271 }
2272
2273 /**** up to the user to safely call this */
2274 void CmiDirect_destroyHandle(struct infiDirectUserHandle *userHandle){
2275     CmiAssert(userHandle->recverNode==_Cmi_mynode);
2276 #if CMK_SMP
2277     DCMF_CriticalSection_enter (0);
2278 #endif
2279
2280     DCMF_Memregion_destroy((DCMF_Memregion_t*) userHandle->DCMF_recverMemregion);
2281     CmiFree(userHandle->DCMF_rq_trecv);
2282
2283 #if CMK_SMP
2284     DCMF_CriticalSection_exit (0);
2285 #endif
2286 }
2287
2288
2289
2290 /**** Should not be called the first time *********/
2291 void CmiDirect_ready(struct infiDirectUserHandle *userHandle) {
2292     /* no op on BGP */
2293 }
2294
2295 /**** Should not be called the first time *********/
2296 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle) {
2297     /* no op on BGP */
2298 }
2299
2300 /**** Should not be called the first time *********/
2301 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle) {
2302     /* no op on BGP */
2303 }
2304
2305 #endif /* BGP_USE_RDMA_DIRECT*/
2306