99ecb9931d7a8cb801588fcf56323b5ad939aa81
[charm.git] / src / arch / pami / machine.c
1 #include <stdio.h>
2 #include <errno.h>
3 #include <stdlib.h>
4 #include <unistd.h>
5 #include <math.h>
6 #include <string.h>
7 #include "machine.h"
8 #include "converse.h"
9 #include "pcqueue.h"
10 #include "assert.h"
11 #include "malloc.h"
12
13 #include <hwi/include/bqc/A2_inlines.h>
14 #include "spi/include/kernel/process.h"
15 #include "spi/include/kernel/memory.h"
16 #include "pami.h"
17 #include "pami_sys.h"
18
19 //#if CMK_SMP
20 //#define CMK_USE_L2ATOMICS   1
21 //#endif
22
23 #if !CMK_SMP
24 #if CMK_ENABLE_ASYNC_PROGRESS
25 #error "async progress non supported with non-smp
26 #endif
27 #endif
28
29 #if CMK_SMP && CMK_USE_L2ATOMICS
30 #include "L2AtomicQueue.h"
31 #include "memalloc.c"
32 #endif
33
34 #define CMI_LIKELY(x)    (__builtin_expect(x,1))
35 #define CMI_UNLIKELY(x)  (__builtin_expect(x,0))
36
37 char *ALIGN_32(char *p) {
38   return((char *)((((unsigned long)p)+0x1f) & (~0x1FUL)));
39 }
40
41 /*To reduce the buffer used in broadcast and distribute the load from
42   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
43   spanning tree broadcast algorithm.
44   This will use the fourth short in message as an indicator of spanning tree
45   root.
46 */
47 #if CMK_SMP
48 #define CMK_BROADCAST_SPANNING_TREE    1
49 #else
50 #define CMK_BROADCAST_SPANNING_TREE    1
51 #endif /* CMK_SMP */
52
53 #define BROADCAST_SPANNING_FACTOR     4
54
55 //The root of the message infers the type of the message
56 // 1. root is 0, then it is a normal point-to-point message
57 // 2. root is larger than 0 (>=1), then it is a broadcast message across all processors (cores)
58 // 3. root is less than 0 (<=-1), then it is a broadcast message across all nodes
59 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
60 #define CMI_IS_BCAST_ON_CORES(msg) (CMI_BROADCAST_ROOT(msg) > 0)
61 #define CMI_IS_BCAST_ON_NODES(msg) (CMI_BROADCAST_ROOT(msg) < 0)
62 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
63
64 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
65 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
66
67 /* FIXME: need a random number that everyone agrees ! */
68 #define CHARM_MAGIC_NUMBER               126
69
70
71 #define CMI_PAMI_SHORT_DISPATCH           7
72 #define CMI_PAMI_RZV_DISPATCH             8
73 #define CMI_PAMI_ACK_DISPATCH             9
74 #define CMI_PAMI_DISPATCH                10
75
76 #define SHORT_CUTOFF   128
77 #define EAGER_CUTOFF   4096
78
79 #if !CMK_OPTIMIZE
80 static int checksum_flag = 0;
81 extern unsigned char computeCheckSum(unsigned char *data, int len);
82
83 #define CMI_SET_CHECKSUM(msg, len)      \
84         if (checksum_flag)  {   \
85           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
86           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
87         }
88
89 #define CMI_CHECK_CHECKSUM(msg, len)    \
90   int count; \                          
91         if (checksum_flag)      \
92           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
93             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
94             for(count = 0; count < len; count++) { \
95                 printf("%2x", msg[count]);                 \
96             }                                             \
97             printf("------------------------------\n\n"); \
98             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
99           }
100 #else
101 #define CMI_SET_CHECKSUM(msg, len)
102 #define CMI_CHECK_CHECKSUM(msg, len)
103 #endif
104
105 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
106
107 #  define CMI_SET_CYCLE(msg, cycle)
108
109 int               _Cmi_numpes;
110 int               _Cmi_mynode;    /* Which address space am I */
111 int               _Cmi_mynodesize;/* Number of processors in my address space */
112 int               _Cmi_numnodes;  /* Total number of address spaces */
113 int                Cmi_nodestart; /* First processor in this address space */
114 CpvDeclare(void*, CmiLocalQueue);
115
116
117 #if CMK_NODE_QUEUE_AVAILABLE
118 #define SMP_NODEMESSAGE   (0xFB) // rank of the node message when node queue
119                                  // is available
120 #define NODE_BROADCAST_OTHERS (-1)
121 #define NODE_BROADCAST_ALL    (-2)
122 #endif
123
124
125 typedef struct ProcState {
126     /* PCQueue      sendMsgBuf; */  /* per processor message sending queue */
127 #if CMK_SMP && CMK_USE_L2ATOMICS
128     L2AtomicQueue   atomic_queue;
129 #endif
130   /* CmiNodeLock  recvLock;  */            /* for cs->recv */
131 } ProcState;
132
133 static ProcState  *procState;
134
135 #if CMK_SMP && CMK_USE_L2ATOMICS
136 static L2AtomicQueue node_recv_atomic_q;
137 #endif
138
139 #if CMK_SMP && !CMK_MULTICORE
140 //static volatile int commThdExit = 0;
141 //static CmiNodeLock commThdExitLock = 0;
142
143 //The random seed to pick destination context
144 __thread uint32_t r_seed = 0xdeadbeef;
145 __thread int32_t _cmi_bgq_incommthread = 0;
146 #endif
147
148 //int CmiInCommThread () {
149 //  //if (_cmi_bgq_incommthread)
150 //  //printf ("CmiInCommThread: %d\n", _cmi_bgq_incommthread);
151 //  return _cmi_bgq_incommthread;
152 //}
153
154 void ConverseRunPE(int everReturn);
155 static void CommunicationServer(int sleepTime);
156 static void CommunicationServerThread(int sleepTime);
157
158 static void CmiNetworkBarrier(int async);
159 static void CmiSendPeer (int rank, int size, char *msg);
160
161 //So far we dont define any comm threads
162 int Cmi_commthread = 0;
163
164 #include "machine-smp.c"
165 CsvDeclare(CmiNodeState, NodeState);
166 #include "immediate.c"
167
168 #if CMK_ENABLE_ASYNC_PROGRESS  
169 //Immediate messages not supported yet
170 #define AdvanceCommunications() 
171 #else
172 void AdvanceCommunications();
173 #endif
174
175 #if !CMK_SMP
176 /************ non SMP **************/
177 static struct CmiStateStruct Cmi_state;
178 int _Cmi_mype;
179 int _Cmi_myrank;
180
181 void CmiMemLock(void) {}
182 void CmiMemUnlock(void) {}
183
184 #define CmiGetState() (&Cmi_state)
185 #define CmiGetStateN(n) (&Cmi_state)
186
187 //void CmiYield(void) { sleep(0); }
188
189 static void CmiStartThreads(char **argv) {
190     CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
191     _Cmi_mype = Cmi_nodestart;
192     _Cmi_myrank = 0;
193 }
194 #endif  /* !CMK_SMP */
195
196 //int received_immediate;
197 //int received_broadcast;
198
199 void _alias_rank (int rank);
200
201 /*Add a message to this processor's receive queue, pe is a rank */
202 void CmiPushPE(int pe,void *msg) {
203     CmiState cs = CmiGetStateN(pe);    
204 #if CMK_IMMEDIATE_MSG
205     if (CmiIsImmediate(msg)) {
206       //_alias_rank(CMI_DEST_RANK(msg));
207       //CmiLock(CsvAccess(NodeState).immRecvLock);
208       CmiHandleImmediateMessage(msg);
209       //CmiUnlock(CsvAccess(NodeState).immRecvLock);
210       //_alias_rank(0);
211       return;
212     }
213 #endif
214     
215 #if CMK_SMP && CMK_USE_L2ATOMICS
216     L2AtomicEnqueue(&procState[pe].atomic_queue, msg);
217 #else
218     PCQueuePush(cs->recv,(char *)msg);
219 #endif
220     
221     //CmiIdleLock_addMessage(&cs->idle);
222 }
223
224 #if CMK_NODE_QUEUE_AVAILABLE
225 /*Add a message to this processor's receive queue */
226 static void CmiPushNode(void *msg) {
227     MACHSTATE(3,"Pushing message into NodeRecv queue");
228 #if CMK_IMMEDIATE_MSG
229     if (CmiIsImmediate(msg)) {
230       //CmiLock(CsvAccess(NodeState).immRecvLock);
231       CmiHandleImmediateMessage(msg);
232       //CmiUnlock(CsvAccess(NodeState).immRecvLock);    
233       return;
234     }
235 #endif
236 #if CMK_SMP && CMK_USE_L2ATOMICS
237     L2AtomicEnqueue(&node_recv_atomic_q, msg);    
238 #else
239     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
240     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
241     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
242 #endif
243     //CmiState cs=CmiGetStateN(0);
244     //CmiIdleLock_addMessage(&cs->idle);
245 }
246 #endif /* CMK_NODE_QUEUE_AVAILABLE */
247
248 #define MAX_NUM_CONTEXTS  64
249
250 #if CMK_SMP 
251 #define CMK_PAMI_MULTI_CONTEXT  1
252 #else
253 #define CMK_PAMI_MULTI_CONTEXT  0
254 #endif
255
256 #if CMK_PAMI_MULTI_CONTEXT
257 volatile int msgQueueLen [MAX_NUM_CONTEXTS];
258 volatile int outstanding_recvs [MAX_NUM_CONTEXTS];
259
260 //#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
261 #define THREADS_PER_CONTEXT 2
262 #define LTPS                1 //Log Threads Per Context (TPS)
263 //#else
264 //#define THREADS_PER_CONTEXT 4
265 //#define LTPS                2 //Log Threads Per Context (TPS)
266 //#endif
267
268 #define  MY_CONTEXT_ID() (CmiMyRank() >> LTPS)
269 #define  MY_CONTEXT()    (cmi_pami_contexts[CmiMyRank() >> LTPS])
270
271 #define  INCR_MSGQLEN()  //(msgQueueLen[CmiMyRank() >> LTPS] ++)
272 #define  DECR_MSGQLEN()  //(msgQueueLen[CmiMyRank() >> LTPS] --)
273 #define  MSGQLEN()       0 //(msgQueueLen[CmiMyRank() >> LTPS])
274 #define  INCR_ORECVS()   //(outstanding_recvs[CmiMyRank() >> LTPS] ++)
275 #define  DECR_ORECVS()   //(outstanding_recvs[CmiMyRank() >> LTPS] --)
276 #define  ORECVS()        0 //(outstanding_recvs[CmiMyRank() >> LTPS])
277 #else
278 #define LTPS    1 
279 volatile int msgQueueLen;
280 volatile int outstanding_recvs;
281 #define  MY_CONTEXT_ID() (0)
282 #define  MY_CONTEXT()    (cmi_pami_contexts[0])
283
284 #define  INCR_MSGQLEN()  (msgQueueLen ++)
285 #define  DECR_MSGQLEN()  (msgQueueLen --)
286 #define  MSGQLEN()       (msgQueueLen)
287 #define  INCR_ORECVS()   (outstanding_recvs ++)
288 #define  DECR_ORECVS()   (outstanding_recvs --)
289 #define  ORECVS()        (outstanding_recvs)
290 #endif
291
292 #if CMK_SMP  && !CMK_ENABLE_ASYNC_PROGRESS
293 #define PAMIX_CONTEXT_LOCK_INIT(x)
294 #define PAMIX_CONTEXT_LOCK(x)        if(LTPS) PAMI_Context_lock(x)
295 #define PAMIX_CONTEXT_UNLOCK(x)      if(LTPS) {ppc_msync(); PAMI_Context_unlock(x);}
296 #define PAMIX_CONTEXT_TRYLOCK(x)     ((LTPS)?(PAMI_Context_trylock(x) == PAMI_SUCCESS):(1))
297 #else
298 #define PAMIX_CONTEXT_LOCK_INIT(x)
299 #define PAMIX_CONTEXT_LOCK(x)
300 #define PAMIX_CONTEXT_UNLOCK(x)
301 #define PAMIX_CONTEXT_TRYLOCK(x)      1
302 #endif
303
304
305 static char     **Cmi_argv;
306 static char     **Cmi_argvcopy;
307 static CmiStartFn Cmi_startfn;   /* The start function */
308 static int        Cmi_usrsched;  /* Continue after start function finishes? */
309
310 extern void ConverseCommonInit(char **argv);
311 extern void ConverseCommonExit(void);
312 extern void CthInit(char **argv);
313
314 static void SendMsgsUntil(int);
315
316 #define A_PRIME 13
317 #define B_PRIME 19
318
319 static inline unsigned myrand (unsigned *seed) {
320   *seed = A_PRIME * (*seed) + B_PRIME;
321   return *seed;
322 }
323
324 void SendSpanningChildren(int size, char *msg, int from_rdone);
325 #if CMK_NODE_QUEUE_AVAILABLE
326 void SendSpanningChildrenNode(int size, char *msg, int from_rdone);
327 #endif
328
329 typedef struct {
330     int sleepMs; /*Milliseconds to sleep while idle*/
331     int nIdles; /*Number of times we've been idle in a row*/
332     CmiState cs; /*Machine state*/
333 } CmiIdleState;
334
335 static CmiIdleState *CmiNotifyGetState(void) {
336     CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
337     s->sleepMs=0;
338     s->nIdles=0;
339     s->cs=CmiGetState();
340     return s;
341 }
342
343 static void send_done(pami_context_t ctxt, void *data, pami_result_t result) 
344 {
345   CmiFree(data);
346   DECR_MSGQLEN();
347 }
348
349
350 static void recv_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
351 /* recv done callback: push the recved msg to recv queue */
352 {
353     char *msg = (char *) clientdata;
354     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
355     //int rank = *(int *) (msg + sndlen); //get rank from bottom of the message
356     //CMI_DEST_RANK(msg) = rank;
357
358     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
359     /* then we do what PumpMsgs used to do:
360      * push msg to recv queue */
361     CMI_CHECK_CHECKSUM(msg, sndlen);
362     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
363         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
364         return;
365     }
366
367 #if CMK_BROADCAST_SPANNING_TREE 
368     if (CMI_IS_BCAST_ON_CORES(msg) ) 
369         //Forward along spanning tree
370         SendSpanningChildren(sndlen, msg, 1);
371 #endif
372
373 #if CMK_NODE_QUEUE_AVAILABLE
374 #if CMK_BROADCAST_SPANNING_TREE
375     if (CMI_IS_BCAST_ON_NODES(msg)) 
376       SendSpanningChildrenNode(sndlen, msg, 1);
377 #endif
378     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
379       CmiPushNode(msg);
380     else
381 #endif
382       CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
383
384     DECR_ORECVS();
385 }
386
387 typedef struct _cmi_pami_rzv {
388   void           * buffer;
389   size_t           offset;
390   int              bytes;
391   int              dst_context;
392 }CmiPAMIRzv_t;  
393
394 typedef struct _cmi_pami_rzv_recv {
395   void           * msg;
396   void           * src_buffer;
397   int              src_ep;
398 } CmiPAMIRzvRecv_t;
399
400 static void pkt_dispatch (pami_context_t       context,      
401                           void               * clientdata,   
402                           const void         * header_addr,  
403                           size_t               header_size,  
404                           const void         * pipe_addr,    
405                           size_t               pipe_size,    
406                           pami_endpoint_t      origin,
407                           pami_recv_t         * recv)        
408 {
409     //fprintf (stderr, "Received Message of size %d %p\n", pipe_size, recv);
410     INCR_ORECVS();    
411     int alloc_size = pipe_size;
412     char * buffer  = (char *)CmiAlloc(alloc_size);
413     //char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(int));
414     //*(int *)(buffer+alloc_size) = *(int *)header_addr;
415
416     if (recv) {
417       recv->local_fn = recv_done;
418       recv->cookie   = buffer;
419       recv->type     = PAMI_TYPE_BYTE;
420       recv->addr     = buffer;
421       recv->offset   = 0;
422       recv->data_fn  = PAMI_DATA_COPY;
423     }
424     else {
425       memcpy (buffer, pipe_addr, pipe_size);
426       recv_done (NULL, buffer, PAMI_SUCCESS);
427     }
428 }
429
430 static void short_pkt_dispatch (pami_context_t       context,      
431                                 void               * clientdata,   
432                                 const void         * header_addr,  
433                                 size_t               header_size,  
434                                 const void         * pipe_addr,    
435                                 size_t               pipe_size,    
436                                 pami_endpoint_t      origin,
437                                 pami_recv_t         * recv)        
438 {
439   int alloc_size = pipe_size;
440   char * buffer  = (char *)CmiAlloc(alloc_size);
441   //char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(int));
442   //*(int *)(buffer+alloc_size) = *(int *)header_addr;
443   
444   memcpy (buffer, pipe_addr, pipe_size);
445   char *smsg = (char *)pipe_addr;
446   char *msg  = (char *)buffer;
447
448   CMI_CHECK_CHECKSUM(smsg, pipe_size);  
449   if (CMI_MAGIC(smsg) != CHARM_MAGIC_NUMBER) {
450     /* received a non-charm msg */
451     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");     
452   }
453  
454   CmiPushPE(CMI_DEST_RANK(smsg), (void *)msg);
455 }
456
457
458 void rzv_pkt_dispatch (pami_context_t       context,   
459                        void               * clientdata,
460                        const void         * header_addr,
461                        size_t               header_size,
462                        const void         * pipe_addr,  
463                        size_t               pipe_size,  
464                        pami_endpoint_t      origin,
465                        pami_recv_t         * recv);
466
467 void ack_pkt_dispatch (pami_context_t       context,   
468                        void               * clientdata,
469                        const void         * header_addr,
470                        size_t               header_size,
471                        const void         * pipe_addr,  
472                        size_t               pipe_size,  
473                        pami_endpoint_t      origin,
474                        pami_recv_t         * recv);
475
476 void rzv_recv_done   (pami_context_t     ctxt, 
477                       void             * clientdata, 
478                       pami_result_t      result); 
479
480 //approx sleep command
481 size_t mysleep_iter = 0;
482 void mysleep (unsigned long cycles) {
483     unsigned long start = GetTimeBase();
484     unsigned long end = start + cycles;
485
486     while (start < end) {
487       mysleep_iter ++;
488       start = GetTimeBase();
489     }
490
491     return;
492 }
493
494 static void * test_buf;
495 volatile int pami_barrier_flag = 0;
496 typedef pami_result_t (*pamix_proc_memalign_fn) (void**, size_t, size_t, const char*);
497
498 void pami_barrier_done (void *ctxt, void * clientdata, pami_result_t err)
499 {
500   int * active = (int *) clientdata;
501   (*active)--;
502 }
503
504 pami_client_t      cmi_pami_client;
505 pami_context_t   * cmi_pami_contexts;
506 size_t             cmi_pami_numcontexts;
507 pami_geometry_t    world_geometry;
508 pami_xfer_t        pami_barrier;
509 char clientname[] = "Converse";
510
511 #if 1
512 typedef struct _cmi_pami_mregion_t {
513   pami_memregion_t   mregion;
514   void             * baseVA;
515 } CmiPAMIMemRegion_t;
516
517 //one for each of the 64 possible contexts
518 CmiPAMIMemRegion_t  cmi_pami_memregion[64];
519 #endif
520
521 #include "malloc.h"
522 void *l2atomicbuf;
523
524 void _alias_rank (int rank) {
525 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
526
527   CmiState cs = CmiGetState();
528   CmiState cs_r = CmiGetStateN(rank);
529
530   cs->rank = cs_r->rank;
531   cs->pe   = cs_r->pe;
532 #endif
533 }
534
535 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
536
537 pami_result_t init_comm_thread (pami_context_t   context,
538                                 void           * cookie)
539 {
540   CmiState cs  = CmiGetState();
541   CmiState cs0 = CmiGetStateN(0);
542   *cs = *cs0; //Alias comm thread to rank 0
543   //printf("Initialized comm thread, my rank %d, my pe %d\n", 
544   // CmiMyRank(), 
545   // CmiMyPe());
546
547   //Notify main thread comm thread has been initialized
548   *(int*)cookie = 0;
549
550 #if 1
551   //set the seed to choose destination context
552   uint64_t rseedl = r_seed;
553   rseedl |= (uint64_t)context;
554   r_seed = ((uint32_t)rseedl)^((uint32_t)(rseedl >> 32));
555 #endif
556
557   _cmi_bgq_incommthread = 1;
558
559   return PAMI_SUCCESS;
560 }
561
562 typedef void (*pamix_progress_function) (pami_context_t context, void *cookie);
563 typedef pami_result_t (*pamix_progress_register_fn) 
564   (pami_context_t            context,
565    pamix_progress_function   progress_fn,
566    pamix_progress_function   suspend_fn,
567    pamix_progress_function   resume_fn,
568    void                     * cookie);
569 typedef pami_result_t (*pamix_progress_enable_fn)(pami_context_t   context,
570                                                   int              event_type);
571 typedef pami_result_t (*pamix_progress_disable_fn)(pami_context_t  context,
572                                                    int             event_type);
573 #define PAMI_EXTENSION_OPEN(client, name, ext)  \
574 ({                                              \
575   pami_result_t rc;                             \
576   rc = PAMI_Extension_open(client, name, ext);  \
577   CmiAssert (rc == PAMI_SUCCESS);      \
578 })
579 #define PAMI_EXTENSION_FUNCTION(type, name, ext)        \
580 ({                                                      \
581   void* fn;                                             \
582   fn = PAMI_Extension_symbol(ext, name);                \
583   CmiAssert (fn != NULL);                               \
584   (type)fn;                                             \
585 })
586
587 pami_extension_t            cmi_ext_progress;
588 pamix_progress_register_fn  cmi_progress_register;
589 pamix_progress_enable_fn    cmi_progress_enable;
590 pamix_progress_disable_fn   cmi_progress_disable;
591
592 int CMI_Progress_init(int start, int ncontexts) {
593   if (CmiMyPe() == 0)
594     printf("Enabling communication threads\n");
595   
596   PAMI_EXTENSION_OPEN(cmi_pami_client,"EXT_async_progress",&cmi_ext_progress);
597   cmi_progress_register = PAMI_EXTENSION_FUNCTION(pamix_progress_register_fn, "register", cmi_ext_progress);
598   cmi_progress_enable   = PAMI_EXTENSION_FUNCTION(pamix_progress_enable_fn,   "enable",   cmi_ext_progress);
599   cmi_progress_disable  = PAMI_EXTENSION_FUNCTION(pamix_progress_disable_fn,  "disable",  cmi_ext_progress);
600   
601   int i = 0;
602   for (i = start; i < start+ncontexts; ++i) {
603     //fprintf(stderr, "Enabling progress on context %d\n", i);
604     cmi_progress_register (cmi_pami_contexts[i], 
605                            NULL, 
606                            NULL, 
607                            NULL, NULL);
608     cmi_progress_enable   (cmi_pami_contexts[i], 0 /*progress all*/);  
609   }
610
611   pami_work_t  work;
612   volatile int x;
613   for (i = start; i < start+ncontexts; ++i) {
614     x = 1;
615     PAMI_Context_post(cmi_pami_contexts[i], &work, 
616                       init_comm_thread, (void*)&x);
617     while(x);
618   }
619   
620   return 0;
621 }
622
623 int CMI_Progress_finalize(int start, int ncontexts) {
624   int i = 0;
625   for (i = start; i < start+ncontexts; ++i) 
626     cmi_progress_disable  (cmi_pami_contexts[i], 0 /*progress all*/);    
627   PAMI_Extension_close (cmi_ext_progress);
628 }
629 #endif
630
631 #include "manytomany.c"
632
633 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
634     int n, i, count;
635
636     /* processor per node */
637     _Cmi_mynodesize = 1;
638     CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
639 #if ! CMK_SMP
640     if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
641       CmiAbort("+ppn cannot be used in non SMP version!\n");
642 #endif
643     
644     PAMI_Client_create (clientname, &cmi_pami_client, NULL, 0);
645     size_t _n = 1;
646 #if CMK_PAMI_MULTI_CONTEXT
647     if ((_Cmi_mynodesize % THREADS_PER_CONTEXT) == 0)
648       _n = _Cmi_mynodesize / THREADS_PER_CONTEXT;  //have a context for each four threads
649     else
650       _n = 1 + (_Cmi_mynodesize / THREADS_PER_CONTEXT);  //have a context for each four threads
651 #endif
652
653     cmi_pami_contexts = (pami_context_t *) malloc (sizeof(pami_context_t) * _n);
654     pami_result_t rc = PAMI_Context_createv (cmi_pami_client, NULL, 0, cmi_pami_contexts, _n);
655     if (rc != PAMI_SUCCESS) {
656       fprintf(stderr, "PAMI_Context_createv failed for %d contexts\n", _n);
657       assert(0);
658     }
659     cmi_pami_numcontexts = _n;
660
661     //fprintf(stderr,"Creating %d pami contexts\n", _n);
662
663     pami_configuration_t configuration;
664     pami_result_t result;
665     
666     configuration.name = PAMI_CLIENT_TASK_ID;
667     result = PAMI_Client_query(cmi_pami_client, &configuration, 1);
668     _Cmi_mynode = configuration.value.intval;
669
670     configuration.name = PAMI_CLIENT_NUM_TASKS;
671     result = PAMI_Client_query(cmi_pami_client, &configuration, 1);
672     _Cmi_numnodes = configuration.value.intval;
673
674     pami_dispatch_hint_t options = (pami_dispatch_hint_t) {0};
675     pami_dispatch_callback_function pfn;
676     for (i = 0; i < _n; ++i) {
677       pfn.p2p = pkt_dispatch;
678       PAMI_Dispatch_set (cmi_pami_contexts[i],
679                          CMI_PAMI_DISPATCH,
680                          pfn,
681                          NULL,
682                          options);
683       
684       pfn.p2p = ack_pkt_dispatch;
685       PAMI_Dispatch_set (cmi_pami_contexts[i],
686                          CMI_PAMI_ACK_DISPATCH,
687                          pfn,
688                          NULL,
689                          options);
690       
691       pfn.p2p = rzv_pkt_dispatch;
692       PAMI_Dispatch_set (cmi_pami_contexts[i],
693                          CMI_PAMI_RZV_DISPATCH,
694                          pfn,
695                          NULL,
696                          options);      
697
698       pfn.p2p = short_pkt_dispatch;
699       PAMI_Dispatch_set (cmi_pami_contexts[i],
700                          CMI_PAMI_SHORT_DISPATCH,
701                          pfn,
702                          NULL,
703                          options);      
704     }
705
706 #if 1
707     size_t bytes_out;
708     void * buf = malloc(sizeof(long));    
709     uint32_t retval;
710     Kernel_MemoryRegion_t k_mregion;
711     retval = Kernel_CreateMemoryRegion (&k_mregion, buf, sizeof(long));
712     assert(retval==0);  
713     for (i = 0; i < _n; ++i) {
714       cmi_pami_memregion[i].baseVA = k_mregion.BaseVa;
715       PAMI_Memregion_create (cmi_pami_contexts[i],
716                              k_mregion.BaseVa,
717                              k_mregion.Bytes,
718                              &bytes_out,
719                              &cmi_pami_memregion[i].mregion);
720     }
721     free(buf);
722 #endif
723
724     //fprintf(stderr, "%d Initializing Converse PAMI machine Layer on %d tasks\n", _Cmi_mynode, _Cmi_numnodes);
725
726     ///////////---------------------------------/////////////////////
727     //////////----------- Initialize Barrier -------////////////////
728     size_t               num_algorithm[2];
729     pami_algorithm_t    *always_works_algo = NULL;
730     pami_metadata_t     *always_works_md = NULL;
731     pami_algorithm_t    *must_query_algo = NULL;
732     pami_metadata_t     *must_query_md = NULL;
733     pami_xfer_type_t     xfer_type = PAMI_XFER_BARRIER;
734
735     /* Docs01:  Get the World Geometry */
736     result = PAMI_Geometry_world (cmi_pami_client,&world_geometry);
737     if (result != PAMI_SUCCESS)
738       {
739         fprintf (stderr, "Error. Unable to get world geometry: result = %d\n", result);
740         return;
741       }
742
743     result = PAMI_Geometry_algorithms_num(world_geometry,
744                                           xfer_type,
745                                           (size_t*)num_algorithm);
746
747     if (result != PAMI_SUCCESS || num_algorithm[0]==0)
748       {
749         fprintf (stderr,
750                  "Error. Unable to query algorithm, or no algorithms available result = %d\n",
751                  result);
752         return;
753       }
754
755     always_works_algo = (pami_algorithm_t*)malloc(sizeof(pami_algorithm_t)*num_algorithm[0]);
756     always_works_md  = (pami_metadata_t*)malloc(sizeof(pami_metadata_t)*num_algorithm[0]);
757     must_query_algo   = (pami_algorithm_t*)malloc(sizeof(pami_algorithm_t)*num_algorithm[1]);
758     must_query_md    = (pami_metadata_t*)malloc(sizeof(pami_metadata_t)*num_algorithm[1]);
759
760     /* Docs05:  Query the algorithm lists */
761     result = PAMI_Geometry_algorithms_query(world_geometry,
762                                             xfer_type,
763                                             always_works_algo,
764                                             always_works_md,
765                                             num_algorithm[0],
766                                             must_query_algo,
767                                             must_query_md,
768                                             num_algorithm[1]);
769     
770     int opt_alg = 0;
771     for (int nalg = 0; nalg < num_algorithm[0]; ++nalg) 
772       if (strstr(always_works_md[nalg].name, "GI") != NULL) {
773         opt_alg = nalg;
774         break;
775       }
776
777     if (_Cmi_mynode == 0)
778       printf ("Choosing optimized barrier algorithm name %s\n",
779               always_works_md[opt_alg]);
780               
781     pami_barrier.cb_done   = pami_barrier_done;
782     pami_barrier.cookie    = (void*) & pami_barrier_flag;
783     pami_barrier.algorithm = always_works_algo[opt_alg];
784
785     /* Docs06:  Query the algorithm lists */
786     if (result != PAMI_SUCCESS)
787       {
788         fprintf (stderr, "Error. Unable to get query algorithm. result = %d\n", result);
789         return;
790       }
791
792     CmiNetworkBarrier(0);
793     CmiNetworkBarrier(0);
794     CmiNetworkBarrier(0);
795
796     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
797     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
798     Cmi_argvcopy = CmiCopyArgs(argv);
799     Cmi_argv = argv;
800     Cmi_startfn = fn;
801     Cmi_usrsched = usched;
802
803     /* checksum flag */
804     if (CmiGetArgFlag(argv,"+checksum")) {
805 #if !CMK_OPTIMIZE
806         checksum_flag = 1;
807         if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
808 #else
809         if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
810 #endif
811     }
812
813     CsvInitialize(CmiNodeState, NodeState);
814     CmiNodeStateInit(&CsvAccess(NodeState));
815
816 #if CMK_SMP && CMK_USE_L2ATOMICS
817     //max available hardware threads
818     int actualNodeSize = 64/Kernel_ProcessCount(); 
819     //printf("Ranks per node %d, actualNodeSize %d CmiMyNodeSize() %d\n",
820     //     Kernel_ProcessCount(), actualNodeSize, _Cmi_mynodesize);
821     
822     //pami_result_t rc;
823     pami_extension_t l2;
824     pamix_proc_memalign_fn PAMIX_L2_proc_memalign;
825     size_t size = (4*actualNodeSize+1) * sizeof(L2AtomicState);
826     rc = PAMI_Extension_open(NULL, "EXT_bgq_l2atomic", &l2);
827     CmiAssert (rc == 0);
828     PAMIX_L2_proc_memalign = (pamix_proc_memalign_fn)PAMI_Extension_symbol(l2, "proc_memalign");
829     rc = PAMIX_L2_proc_memalign(&l2atomicbuf, 64, size, NULL);
830     CmiAssert (rc == 0);    
831 #endif
832
833     procState = (ProcState *)malloc((_Cmi_mynodesize) * sizeof(ProcState));
834     for (i=0; i<_Cmi_mynodesize; i++) {
835         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
836         //procState[i].recvLock = CmiCreateLock();
837 #if CMK_SMP && CMK_USE_L2ATOMICS
838         L2AtomicQueueInit ((char *) l2atomicbuf + sizeof(L2AtomicState)*i,
839                            sizeof(L2AtomicState),
840                            &procState[i].atomic_queue,
841                            1, /*use overflow*/
842                            DEFAULT_SIZE /*1024 entries*/);
843 #endif
844     }
845
846 #if CMK_SMP && CMK_USE_L2ATOMICS    
847     CmiMemAllocInit_bgq ((char*)l2atomicbuf + 
848                          2*actualNodeSize*sizeof(L2AtomicState),
849                          2*actualNodeSize*sizeof(L2AtomicState)); 
850     L2AtomicQueueInit ((char*)l2atomicbuf + 
851                        4*actualNodeSize*sizeof(L2AtomicState),
852                        sizeof(L2AtomicState),
853                        &node_recv_atomic_q,
854                        1, /*use overflow*/
855                        DEFAULT_SIZE /*1024 entries*/);                 
856 #endif
857     
858     //Initialize the manytomany api
859     _cmidirect_m2m_initialize (cmi_pami_contexts, _n);
860
861     //printf ("Starting Threads\n");
862     CmiStartThreads(argv);
863
864 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
865     CMI_Progress_init(0, _n);
866 #endif
867
868     ConverseRunPE(initret);
869 }
870
871
872 int PerrorExit (char *err) {
873   fprintf (stderr, "err\n\n");
874     exit (-1);
875     return -1;
876 }
877
878
879 void ConverseRunPE(int everReturn) {
880     //    printf ("ConverseRunPE on rank %d\n", CmiMyPe());    
881
882     CmiIdleState *s=CmiNotifyGetState();
883     CmiState cs;
884     char** CmiMyArgv;
885     CmiNodeAllBarrier();
886
887     cs = CmiGetState();
888     CpvInitialize(void *,CmiLocalQueue);
889     CpvAccess(CmiLocalQueue) = cs->localqueue;
890
891     if (CmiMyRank())
892         CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
893     else
894         CmiMyArgv=Cmi_argv;
895
896     CthInit(CmiMyArgv);
897
898     //printf ("Before Converse Common Init\n");
899     ConverseCommonInit(CmiMyArgv);
900
901     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
902
903     //printf ("before calling CmiBarrier() \n");
904     CmiBarrier();
905
906     /* Converse initialization finishes, immediate messages can be processed.
907        node barrier previously should take care of the node synchronization */
908     _immediateReady = 1;
909
910     //printf("calling the startfn\n");
911
912     if (!everReturn) {
913       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
914       if (Cmi_usrsched==0) CsdScheduler(-1);
915       ConverseExit();
916     }    
917 }
918
919 void ConverseExit(void) {
920
921     while (MSGQLEN() > 0 || ORECVS() > 0) {
922       AdvanceCommunications();
923     }
924     
925     CmiNodeBarrier();
926     ConverseCommonExit();
927
928     if (CmiMyPe() == 0) {
929         printf("End of program\n");
930     }
931
932     CmiNodeBarrier();
933 //  CmiNodeAllBarrier ();
934
935     int rank0 = 0;
936     if (CmiMyRank() == 0) {
937         rank0 = 1;
938         //free(procState);      
939 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
940         CMI_Progress_finalize(0, cmi_pami_numcontexts);
941 #endif
942         PAMI_Context_destroyv(cmi_pami_contexts, cmi_pami_numcontexts);
943         PAMI_Client_destroy(&cmi_pami_client);
944     }
945
946     CmiNodeBarrier();
947     //  CmiNodeAllBarrier ();
948     //fprintf(stderr, "Before Exit\n");
949 #if CMK_SMP
950     if (rank0) {
951       Delay(100000);
952       exit(0);
953     }
954     else
955       pthread_exit(0);
956 #else
957     exit(0);
958 #endif
959 }
960
961 /* exit() called on any node would abort the whole program */
962 void CmiAbort(const char * message) {
963     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
964              "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),
965              MSGQLEN(), ORECVS(), message);
966
967     //CmiPrintStackTrace(0);
968     //while (msgQueueLen > 0 || outstanding_recvs > 0) {
969     //  AdvanceCommunications();
970     //}    
971     //CmiBarrier();
972     assert (0);
973 }
974
975 #if CMK_NODE_QUEUE_AVAILABLE
976 char *CmiGetNonLocalNodeQ(void) {
977     //CmiState cs = CmiGetState();
978     char *result = 0;
979     //CmiIdleLock_checkMessage(&cs->idle);
980
981 #if CMK_SMP && CMK_USE_L2ATOMICS
982     if (!L2AtomicQueueEmpty(&node_recv_atomic_q)) {
983       if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
984         result = (char*)L2AtomicDequeue(&node_recv_atomic_q);
985         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
986       }
987     }
988 #else
989     if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
990       MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
991       
992       if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
993         //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
994         result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
995         CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
996       }
997
998       MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
999     }
1000 #endif
1001     
1002     return result;
1003 }
1004 #endif
1005
1006
1007 void *CmiGetNonLocal() {
1008
1009     void *msg = NULL;
1010     CmiState cs = CmiGetState();
1011     //CmiIdleLock_checkMessage(&cs->idle);
1012
1013 #if CMK_SMP && CMK_USE_L2ATOMICS
1014     msg = L2AtomicDequeue(&procState[CmiMyRank()].atomic_queue);
1015 #if !(CMK_ENABLE_ASYNC_PROGRESS)
1016     if (msg == NULL) {
1017       AdvanceCommunications();     
1018       msg = L2AtomicDequeue(&procState[CmiMyRank()].atomic_queue);
1019     }
1020 #endif
1021 #else
1022     if (PCQueueLength(cs->recv)==0)
1023       AdvanceCommunications();
1024     if (PCQueueLength(cs->recv)==0) return NULL;
1025     msg =  PCQueuePop(cs->recv);
1026 #endif
1027
1028     //if (msg != NULL)
1029     //fprintf(stderr, "%d: Returning a message\n", CmiMyPe());
1030
1031     return msg;
1032 }
1033
1034 static void CmiSendSelf(char *msg) {
1035 #if CMK_IMMEDIATE_MSG
1036     if (CmiIsImmediate(msg)) {
1037       //CmiLock(CsvAccess(NodeState).immRecvLock);
1038       CmiHandleImmediateMessage(msg);
1039       //CmiUnlock(CsvAccess(NodeState).immRecvLock);
1040       return;
1041     }
1042 #endif
1043     
1044     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1045 }
1046
1047 #if CMK_SMP
1048 static void CmiSendPeer (int rank, int size, char *msg) {
1049     //fprintf(stderr, "%d Send messages to peer\n", CmiMyPe());
1050     CmiPushPE (rank, msg);
1051 }
1052 #endif
1053
1054
1055 void CmiGeneralFreeSendN (int node, int rank, int size, char * msg, int to_lock);
1056
1057
1058 /* The general free send function
1059  * Send is synchronous, and free msg after posted
1060  */
1061 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
1062
1063     if (destPE < 0 || destPE > CmiNumPes ())
1064       printf ("Sending to %d\n", destPE);
1065     
1066     CmiAssert (destPE >= 0 && destPE < CmiNumPes());    
1067     CmiState cs = CmiGetState();
1068
1069     if (destPE==cs->pe) {
1070         CmiSendSelf(msg);
1071         return;
1072     }
1073
1074     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg, 1);
1075 }
1076
1077
1078 pami_result_t machine_send_handoff (pami_context_t context, void *msg);
1079 void  machine_send       (pami_context_t      context, 
1080                           int                 node, 
1081                           int                 rank, 
1082                           int                 size, 
1083                           char              * msg, 
1084                           int                 to_lock)__attribute__((always_inline));
1085
1086 void CmiGeneralFreeSendN(int node, int rank, int size, char * msg, int to_lock)
1087 {
1088 #if CMK_SMP
1089     CMI_DEST_RANK(msg) = rank;
1090     if (node == CmiMyNode()) {
1091         CmiSendPeer (rank, size, msg);
1092         return;
1093     }
1094 #endif
1095
1096 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
1097     int c = node % cmi_pami_numcontexts;
1098     //int c = myrand(&r_seed) % cmi_pami_numcontexts;
1099     pami_context_t my_context = cmi_pami_contexts[c];    
1100     CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
1101     hdr->dstnode = node;
1102     hdr->rank    = rank;
1103
1104     PAMI_Context_post(my_context, (pami_work_t *)hdr->work, 
1105                       machine_send_handoff, msg);
1106 #else
1107     pami_context_t my_context = MY_CONTEXT();    
1108     machine_send (my_context, node, rank, size, msg, to_lock);
1109 #endif
1110 }
1111
1112 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
1113 pami_result_t machine_send_handoff (pami_context_t context, void *msg) {
1114   CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
1115   int node = hdr->dstnode;
1116   int rank = hdr->rank;
1117   int size = hdr->size;
1118   
1119   //As this is executed on the comm thread no locking is necessary
1120   machine_send(context, node, rank, size, msg, 0);
1121   return PAMI_SUCCESS;
1122 }
1123 #endif
1124
1125 void  machine_send       (pami_context_t      context, 
1126                           int                 node, 
1127                           int                 rank, 
1128                           int                 size, 
1129                           char              * msg, 
1130                           int                 to_lock) 
1131 {
1132     CMI_DEST_RANK(msg) = rank;
1133
1134     pami_endpoint_t target;
1135 #if CMK_PAMI_MULTI_CONTEXT
1136     //size_t dst_context = (rank != SMP_NODEMESSAGE) ? (rank>>LTPS) : (rand_r(&r_seed) % cmi_pami_numcontexts);
1137     //Choose a context at random
1138     size_t dst_context = myrand(&r_seed) % cmi_pami_numcontexts;
1139 #else
1140     size_t dst_context = 0;
1141 #endif
1142     PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)node, dst_context, &target);
1143     
1144     //fprintf (stderr, "Calling PAMI Send to %d magic %d size %d\n", node, CMI_MAGIC(msg), size);
1145     if (CMI_LIKELY(size < SHORT_CUTOFF)) {
1146       pami_send_immediate_t parameters;
1147       
1148       parameters.dispatch        = CMI_PAMI_DISPATCH;
1149       if ( CMI_LIKELY(CMI_BROADCAST_ROOT(msg) == 0))
1150 #if CMK_NODE_QUEUE_AVAILABLE
1151         if ( CMI_LIKELY(rank != SMP_NODEMESSAGE) )
1152 #endif
1153           //use short callback if not a bcast and not an SMP node message
1154           parameters.dispatch        = CMI_PAMI_SHORT_DISPATCH;
1155
1156       parameters.header.iov_base = NULL; //&rank;
1157       parameters.header.iov_len  = 0;    //sizeof(int);
1158       parameters.data.iov_base   = msg;
1159       parameters.data.iov_len    = size;
1160       parameters.dest = target;
1161       
1162       if(to_lock)
1163         PAMIX_CONTEXT_LOCK(context);
1164       
1165       PAMI_Send_immediate (context, &parameters);
1166       
1167       if(to_lock)
1168         PAMIX_CONTEXT_UNLOCK(context);
1169       CmiFree(msg);
1170     }
1171     else if (size < EAGER_CUTOFF) {
1172       pami_send_t parameters;
1173       parameters.send.dispatch        = CMI_PAMI_DISPATCH;
1174       parameters.send.header.iov_base = NULL; //&rank;
1175       parameters.send.header.iov_len  = 0;    //sizeof(int);
1176       parameters.send.data.iov_base   = msg;
1177       parameters.send.data.iov_len    = size;
1178       parameters.events.cookie        = msg;
1179       parameters.events.local_fn      = send_done;
1180       parameters.events.remote_fn     = NULL;
1181       memset(&parameters.send.hints, 0, sizeof(parameters.send.hints));
1182       parameters.send.dest = target;
1183       
1184       if (to_lock)
1185         PAMIX_CONTEXT_LOCK(context);
1186       INCR_MSGQLEN();
1187       PAMI_Send (context, &parameters);
1188       if (to_lock)
1189         PAMIX_CONTEXT_UNLOCK(context);
1190     }
1191     else {
1192       CmiPAMIRzv_t   rzv;
1193       rzv.bytes       = size;
1194       rzv.buffer      = msg;
1195       rzv.offset      = (size_t)msg - (size_t)cmi_pami_memregion[0].baseVA;
1196       rzv.dst_context = dst_context;
1197
1198       pami_send_immediate_t parameters;
1199       parameters.dispatch        = CMI_PAMI_RZV_DISPATCH;
1200       parameters.header.iov_base = &rzv;
1201       parameters.header.iov_len  = sizeof(rzv);
1202       parameters.data.iov_base   = &cmi_pami_memregion[0].mregion;      
1203       parameters.data.iov_len    = sizeof(pami_memregion_t);
1204       parameters.dest = target;
1205       
1206       if(to_lock)
1207         PAMIX_CONTEXT_LOCK(context);
1208       
1209       PAMI_Send_immediate (context, &parameters);
1210       
1211       if(to_lock)
1212         PAMIX_CONTEXT_UNLOCK(context);
1213     }
1214 }
1215
1216 void CmiSyncSendFn(int destPE, int size, char *msg) {
1217     char *copymsg;
1218     copymsg = (char *)CmiAlloc(size);
1219     CmiAssert(copymsg != NULL);
1220     CmiMemcpy(copymsg,msg,size);
1221     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncSendFn on comm thd on node %d\n", CmiMyNode());
1222     CmiFreeSendFn(destPE,size,copymsg);
1223 }
1224
1225 void CmiFreeSendFn(int destPE, int size, char *msg) {    
1226     CQdCreate(CpvAccess(cQdState), 1);
1227     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeSendFn on comm thd on node %d\n", CmiMyNode());
1228
1229     CMI_SET_BROADCAST_ROOT(msg,0);
1230     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1231     ((CmiMsgHeaderBasic *)msg)->size = size;
1232     CMI_SET_CHECKSUM(msg, size);
1233
1234     CmiGeneralFreeSend(destPE,size,msg);
1235 }
1236
1237 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1238 void CmiSyncSendFn1(int destPE, int size, char *msg) {
1239     char *copymsg;
1240     copymsg = (char *)CmiAlloc(size);
1241     CmiMemcpy(copymsg, msg, size);
1242
1243     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1244     ((CmiMsgHeaderBasic *)copymsg)->size = size;
1245     CMI_SET_CHECKSUM(copymsg, size);    
1246     CmiGeneralFreeSend(destPE,size,copymsg);
1247 }
1248
1249 /* send msg to its spanning children in broadcast. G. Zheng */
1250 void SendSpanningChildren(int size, char *msg, int from_rdone) {
1251     int startnode = CMI_BROADCAST_ROOT(msg)-1;
1252     int myrank = CMI_DEST_RANK(msg);
1253     int i;
1254
1255     //printf ("%d [%d]: In Send Spanning Tree\n",  CmiMyPe(), CmiMyNode());
1256
1257     CmiAssert(startnode>=0 && startnode<_Cmi_numnodes);
1258     int dist = CmiMyNode() - startnode;
1259     if (dist < 0) dist+=_Cmi_numnodes;
1260     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1261         int p = BROADCAST_SPANNING_FACTOR*dist + i;
1262         if (p > _Cmi_numnodes - 1) break;
1263         p += startnode;
1264         p = p%_Cmi_numnodes;
1265         CmiAssert(p>=0 && p<_Cmi_numnodes && p!= CmiMyNode());
1266
1267         char *copymsg = (char *)CmiAlloc(size);
1268         CmiMemcpy(copymsg, msg, size);
1269
1270         CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
1271         ((CmiMsgHeaderBasic *)copymsg)->size = size;
1272         CMI_SET_CHECKSUM(copymsg, size);
1273         
1274         CmiGeneralFreeSendN(p,0,size,copymsg, !from_rdone);     
1275     }    
1276
1277 #if CMK_SMP    
1278     //Send data within the nodes
1279     for (i =0; i < _Cmi_mynodesize; ++i) {
1280       if (i != myrank) {
1281         char *copymsg = (char *)CmiAlloc(size);
1282         CmiMemcpy(copymsg, msg, size);                  
1283         CmiPushPE (i, copymsg);
1284       }
1285     }
1286 #endif
1287 }
1288
1289 void CmiSyncBroadcastFn(int size, char *msg) {
1290     char *copymsg;
1291     copymsg = (char *)CmiAlloc(size);
1292     CmiMemcpy(copymsg,msg,size);
1293     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastFn on comm thd on node %d\n", CmiMyNode());
1294     CmiFreeBroadcastFn(size,copymsg);
1295 }
1296
1297 void CmiFreeBroadcastFn(int size, char *msg) {
1298
1299     //  printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1300
1301     CmiState cs = CmiGetState();
1302 #if CMK_BROADCAST_SPANNING_TREE    
1303     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1304     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastFn on comm thd on node %d\n", CmiMyNode());
1305
1306     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1307
1308     CMI_SET_BROADCAST_ROOT(msg, CmiMyNode()+1);
1309     CMI_DEST_RANK(msg) = CmiMyRank();
1310     SendSpanningChildren(size, msg, 0);
1311     CmiFree(msg);
1312 #else
1313     int i;
1314
1315     for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1316         CmiSyncSendFn(i,size,msg);
1317
1318     for ( i=0; i<cs->pe; i++ )
1319         CmiSyncSendFn(i,size,msg);
1320
1321     CmiFree(msg);
1322 #endif
1323 }
1324
1325 void CmiSyncBroadcastAllFn(int size, char *msg) {
1326     char *copymsg;
1327     copymsg = (char *)CmiAlloc(size);
1328     CmiMemcpy(copymsg,msg,size);
1329     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1330     CmiFreeBroadcastAllFn(size,copymsg);
1331 }
1332
1333 void CmiFreeBroadcastAllFn(int size, char *msg) {
1334
1335     //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1336
1337     CmiState cs = CmiGetState();
1338 #if CMK_BROADCAST_SPANNING_TREE
1339     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
1340     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
1341
1342     CmiSyncSendFn(cs->pe,size,msg);
1343     
1344     CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1345
1346     CMI_SET_BROADCAST_ROOT(msg, CmiMyNode()+1);
1347     CMI_DEST_RANK(msg) = CmiMyRank();
1348     SendSpanningChildren(size, msg, 0);
1349     CmiFree(msg);
1350 #else
1351     int i ;
1352
1353     for ( i=0; i<_Cmi_numpes; i++ ) {
1354         CmiSyncSendFn(i,size,msg);      
1355     }
1356     //SendMsgsUntil (0);
1357
1358     CmiFree(msg);
1359 #endif
1360 }
1361
1362 #if !CMK_ENABLE_ASYNC_PROGRESS  
1363 //threads have to progress contexts themselves   
1364 void AdvanceCommunications() {
1365     pami_context_t my_context = MY_CONTEXT();
1366
1367 #if CMK_SMP
1368     //CmiAssert (my_context != NULL);
1369     if (PAMIX_CONTEXT_TRYLOCK(my_context))
1370     {
1371       PAMI_Context_advance(my_context, 1);
1372       PAMIX_CONTEXT_UNLOCK(my_context);
1373     }
1374 #else
1375     PAMI_Context_advance(my_context, 1);
1376 #endif
1377 }
1378 #endif
1379
1380
1381 void CmiNotifyIdle() {
1382   AdvanceCommunications();
1383 #if CMK_SMP && CMK_PAMI_MULTI_CONTEXT
1384 #if !CMK_ENABLE_ASYNC_PROGRESS && CMK_USE_L2ATOMICS
1385   //Wait on the atomic queue to get a message with very low core
1386   //overheads. One thread calls advance more frequently
1387   if ((CmiMyRank()% THREADS_PER_CONTEXT) == 0)
1388     //spin wait for 2-4us when idle
1389     //process node queue messages every 10us
1390     //Idle cores will only use one LMQ slot and an int sum
1391     L2AtomicQueue2QSpinWait(&procState[CmiMyRank()].atomic_queue, 
1392                             &node_recv_atomic_q,
1393                             10);
1394   else
1395 #endif
1396 #if CMK_USE_L2ATOMICS
1397     //spin wait for 50-100us when idle waiting for a message
1398     L2AtomicQueue2QSpinWait(&procState[CmiMyRank()].atomic_queue, 
1399                             &node_recv_atomic_q,
1400                             1000);
1401 #endif
1402 #endif
1403 }
1404
1405
1406 /*==========================================================*/
1407 /*==========================================================*/
1408 /*==========================================================*/
1409
1410 /************ Recommended routines ***********************/
1411 /************ You dont have to implement these but they are supported
1412  in the converse syntax and some rare programs may crash. But most
1413  programs dont need them. *************/
1414
1415 CmiCommHandle CmiAsyncSendFn(int dest, int size, char *msg) {
1416     CmiAbort("CmiAsyncSendFn not implemented.");
1417     return (CmiCommHandle) 0;
1418 }
1419
1420 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg) {
1421     CmiAbort("CmiAsyncBroadcastFn not implemented.");
1422     return (CmiCommHandle) 0;
1423 }
1424
1425 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg) {
1426     CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1427     return (CmiCommHandle) 0;
1428 }
1429
1430 int           CmiAsyncMsgSent(CmiCommHandle handle) {
1431     CmiAbort("CmiAsyncMsgSent not implemented.");
1432     return 0;
1433 }
1434 void          CmiReleaseCommHandle(CmiCommHandle handle) {
1435     CmiAbort("CmiReleaseCommHandle not implemented.");
1436 }
1437
1438
1439 /*==========================================================*/
1440 /*==========================================================*/
1441 /*==========================================================*/
1442
1443 /* Optional routines which could use common code which is shared with
1444    other machine layer implementations. */
1445
1446 /* MULTICAST/VECTOR SENDING FUNCTIONS
1447
1448  * In relations to some flags, some other delivery functions may be needed.
1449  */
1450
1451 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1452
1453 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
1454     char *copymsg;
1455     copymsg = (char *)CmiAlloc(size);
1456     CmiMemcpy(copymsg,msg,size);
1457     CmiFreeListSendFn(npes, pes, size, msg);
1458 }
1459
1460 typedef struct ListMulticastVec_t {
1461   int   *pes;
1462   int    npes;
1463   char  *msg;
1464   int    size;
1465 } ListMulticastVec;
1466
1467 void machineFreeListSendFn(pami_context_t    context, 
1468                            int               npes, 
1469                            int             * pes, 
1470                            int               size, 
1471                            char            * msg);
1472
1473 pami_result_t machineFreeList_handoff(pami_context_t context, void *cookie)
1474 {
1475   ListMulticastVec *lvec = (ListMulticastVec *) cookie;
1476   machineFreeListSendFn(context, lvec->npes, lvec->pes, lvec->size, lvec->msg);
1477   CmiFree(cookie);
1478 }
1479
1480 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1481     //printf("%d: In Free List Send Fn imm %d\n", CmiMyPe(), CmiIsImmediate(msg));
1482
1483     CMI_SET_BROADCAST_ROOT(msg,0);
1484     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1485     CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
1486     hdr->size = size;
1487     CMI_SET_CHECKSUM(msg, size);
1488
1489     //Fast path
1490     if (npes == 1) {
1491       CmiGeneralFreeSendN(CmiNodeOf(pes[0]), CmiRankOf(pes[0]), size, msg, 1);
1492       return;
1493     }
1494
1495     pami_context_t my_context = MY_CONTEXT();
1496 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
1497     ListMulticastVec *lvec = (ListMulticastVec *) 
1498       CmiAlloc(sizeof(ListMulticastVec) + sizeof(int)*npes);
1499     lvec->pes = (int*)((char*)lvec + sizeof(ListMulticastVec));
1500     int i = 0;
1501     for (i=0; i<npes; i++) 
1502       lvec->pes[i] = pes[i];
1503     lvec->npes = npes;
1504     lvec->msg  = msg;
1505     lvec->size = size;
1506     PAMI_Context_post(my_context, (pami_work_t*)hdr->work, 
1507                       machineFreeList_handoff, lvec);
1508 #else
1509     machineFreeListSendFn(my_context, npes, pes, size, msg);
1510 #endif
1511 }
1512
1513 void machineFreeListSendFn(pami_context_t my_context, int npes, int *pes, int size, char *msg) {
1514     int i;
1515     char *copymsg;
1516 #if CMK_SMP
1517     for (i=0; i<npes; i++) {
1518       if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1519         //CmiSyncSend(pes[i], size, msg);
1520         copymsg = (char *)CmiAlloc(size);
1521         CmiAssert(copymsg != NULL);
1522         CmiMemcpy(copymsg,msg,size);      
1523         CmiSendPeer(CmiRankOf(pes[i]), size, copymsg);
1524       }
1525     }
1526 #else
1527     for (i=0; i<npes; i++) {
1528       if (CmiNodeOf(pes[i]) == CmiMyNode()) {
1529         CmiSyncSend(pes[i], size, msg);
1530       }
1531     }
1532 #endif
1533
1534     PAMIX_CONTEXT_LOCK(my_context);
1535     
1536     for (i=0;i<npes;i++) {
1537         if (CmiNodeOf(pes[i]) == CmiMyNode());
1538         else if (i < npes - 1) {
1539 #if !CMK_SMP
1540             CmiReference(msg);
1541             copymsg = msg;
1542 #else
1543             copymsg = (char *)CmiAlloc(size);
1544             CmiAssert(copymsg != NULL);
1545             CmiMemcpy(copymsg,msg,size);
1546 #endif
1547             CmiGeneralFreeSendN(CmiNodeOf(pes[i]), CmiRankOf(pes[i]), size, copymsg, 0);
1548             //machine_send(my_context, CmiNodeOf(pes[i]), CmiRankOf(pes[i]), size, copymsg, 0);
1549         }
1550     }
1551
1552     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
1553       CmiGeneralFreeSendN(CmiNodeOf(pes[npes-1]), CmiRankOf(pes[npes-1]), size, msg, 0);
1554       //machine_send(my_context, CmiNodeOf(pes[npes-1]), CmiRankOf(pes[npes-1]), size, msg, 0);      
1555     else
1556       CmiFree(msg);    
1557
1558     PAMIX_CONTEXT_UNLOCK(my_context);
1559 }
1560
1561 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg) {
1562     CmiAbort("CmiAsyncListSendFn not implemented.");
1563     return (CmiCommHandle) 0;
1564 }
1565 #endif
1566
1567 /** NODE SENDING FUNCTIONS
1568
1569  * If there is a node queue, and we consider also nodes as entity (tipically in
1570  * SMP versions), these functions are needed.
1571  */
1572
1573 #if CMK_NODE_QUEUE_AVAILABLE
1574
1575 void          CmiSyncNodeSendFn(int, int, char *);
1576 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1577 void          CmiFreeNodeSendFn(int, int, char *);
1578
1579 void          CmiSyncNodeBroadcastFn(int, char *);
1580 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1581 void          CmiFreeNodeBroadcastFn(int, char *);
1582
1583 void          CmiSyncNodeBroadcastAllFn(int, char *);
1584 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1585 void          CmiFreeNodeBroadcastAllFn(int, char *);
1586
1587 #endif
1588
1589
1590 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1591
1592 int CmiMyPe();
1593 int CmiMyRank();
1594 int CmiNodeFirst(int node);
1595 int CmiNodeSize(int node);
1596 int CmiNodeOf(int pe);
1597 int CmiRankOf(int pe);
1598
1599 int CmiMyPe(void) {
1600     return CmiGetState()->pe;
1601 }
1602
1603 int CmiMyRank(void) {
1604     return CmiGetState()->rank;
1605 }
1606
1607 int CmiNodeFirst(int node) {
1608     return node*_Cmi_mynodesize;
1609 }
1610 int CmiNodeSize(int node)  {
1611     return _Cmi_mynodesize;
1612 }
1613
1614 int CmiNodeOf(int pe)      {
1615     return (pe/_Cmi_mynodesize);
1616 }
1617 int CmiRankOf(int pe)      {
1618     return pe%_Cmi_mynodesize;
1619 }
1620
1621
1622 /* optional, these functions are implemented in "machine-smp.c", so including
1623    this file avoid the necessity to reimplement them.
1624  */
1625 void CmiNodeBarrier(void);
1626 void CmiNodeAllBarrier(void);
1627 CmiNodeLock CmiCreateLock();
1628 void CmiDestroyLock(CmiNodeLock lock);
1629
1630 #endif
1631
1632 /** IMMEDIATE MESSAGES
1633
1634  * If immediate messages are supported, the following function is needed. There
1635  * is an exeption if the machine progress is also defined (see later for this).
1636
1637  * Moreover, the file "immediate.c" should be included, otherwise all its
1638  * functions and variables have to be redefined.
1639 */
1640
1641 #if CMK_CCS_AVAILABLE
1642
1643 #include "immediate.c"
1644
1645 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1646 void CmiProbeImmediateMsg();
1647 #endif
1648
1649 #endif
1650
1651
1652 /* Dummy implementation */
1653 extern int CmiBarrier() {
1654   CmiNodeBarrier();
1655   if (CmiMyRank() == 0)
1656     CmiNetworkBarrier(1);
1657   CmiNodeBarrier();
1658   return 0;
1659 }
1660
1661
1662 static pami_result_t machine_network_barrier(pami_context_t   my_context, 
1663                                              int              to_lock) 
1664 {
1665     pami_result_t result = PAMI_SUCCESS;    
1666     if (to_lock)
1667       PAMIX_CONTEXT_LOCK(my_context);    
1668     result = PAMI_Collective(my_context, &pami_barrier);       
1669     if (to_lock)
1670       PAMIX_CONTEXT_UNLOCK(my_context);
1671
1672     if (result != PAMI_SUCCESS)
1673       fprintf (stderr, "Error. Unable to issue  collective. result = %d\n", result);
1674
1675     return result;
1676 }
1677
1678 pami_result_t network_barrier_handoff(pami_context_t context, void *msg)
1679 {
1680   return machine_network_barrier(context, 0);
1681 }
1682
1683 static void CmiNetworkBarrier(int async) {
1684     pami_context_t my_context = cmi_pami_contexts[0];
1685     pami_barrier_flag = 1;
1686 #if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
1687     if (async) {
1688       pami_work_t work;
1689       PAMI_Context_post(my_context, &work, network_barrier_handoff, NULL);
1690       while (pami_barrier_flag);
1691       //fprintf (stderr, "After Network Barrier\n");
1692     }
1693     else 
1694 #endif
1695     {
1696       machine_network_barrier(my_context, 1);    
1697       PAMIX_CONTEXT_LOCK(my_context);
1698       while (pami_barrier_flag)
1699         PAMI_Context_advance (my_context, 100);
1700       PAMIX_CONTEXT_UNLOCK(my_context);
1701     }
1702 }
1703
1704 #if CMK_NODE_QUEUE_AVAILABLE
1705 static void CmiSendNodeSelf(char *msg) {
1706 #if CMK_IMMEDIATE_MSG
1707     if (CmiIsImmediate(msg)) {
1708       //CmiLock(CsvAccess(NodeState).immRecvLock);
1709       CmiHandleImmediateMessage(msg);
1710       //CmiUnlock(CsvAccess(NodeState).immRecvLock);
1711       return;
1712     }
1713 #endif    
1714 #if CMK_SMP && CMK_USE_L2ATOMICS
1715     L2AtomicEnqueue(&node_recv_atomic_q, msg);    
1716 #else
1717     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1718     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1719     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1720 #endif
1721 }
1722
1723 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
1724     CmiAbort ("Async Node Send not supported\n");
1725 }
1726
1727 void CmiFreeNodeSendFn(int node, int size, char *msg) {
1728
1729     CMI_SET_BROADCAST_ROOT(msg,0);
1730     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1731     ((CmiMsgHeaderBasic *)msg)->size = size;
1732     CMI_SET_CHECKSUM(msg, size);
1733     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
1734     
1735     CQdCreate(CpvAccessOther(cQdState, CmiMyRank()), 1);
1736
1737     if (node == _Cmi_mynode) {
1738         CmiSendNodeSelf(msg);
1739     } else {
1740       CmiGeneralFreeSendN(node, SMP_NODEMESSAGE, size, msg, 1);
1741     }
1742 }
1743
1744 void CmiSyncNodeSendFn(int p, int s, char *m) {
1745     char *dupmsg;
1746     dupmsg = (char *)CmiAlloc(s);
1747     CmiMemcpy(dupmsg,m,s);
1748     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeSendFn on comm thd on node %d\n", CmiMyNode());
1749     CmiFreeNodeSendFn(p, s, dupmsg);
1750 }
1751
1752 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
1753     return NULL;
1754 }
1755
1756 void SendSpanningChildrenNode(int size, char *msg, int from_rdone) {
1757     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
1758     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
1759     assert(startnode>=0 && startnode<CmiNumNodes());
1760
1761     int dist = CmiMyNode()-startnode;
1762     if (dist<0) dist += CmiNumNodes();
1763     int i;
1764     for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
1765         int nid = BROADCAST_SPANNING_FACTOR*dist + i;
1766         if (nid > CmiNumNodes() - 1) break;
1767         nid += startnode;
1768         nid = nid%CmiNumNodes();
1769         assert(nid>=0 && nid<CmiNumNodes() && nid!=CmiMyNode());
1770         char *dupmsg = (char *)CmiAlloc(size);
1771         CmiMemcpy(dupmsg,msg,size);
1772         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
1773         CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg,!from_rdone);
1774     }
1775 }
1776
1777 /* need */
1778 void CmiFreeNodeBroadcastFn(int s, char *m) {
1779   //printf("%d: In FreeNodeBroadcastAllFn\n", CmiMyPe());
1780
1781 #if CMK_BROADCAST_SPANNING_TREE
1782     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1783     
1784     CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1785
1786     int mynode = CmiMyNode();
1787     CMI_SET_BROADCAST_ROOT(m, -mynode-1);
1788     CMI_MAGIC(m) = CHARM_MAGIC_NUMBER;
1789     ((CmiMsgHeaderBasic *)m)->size = s;
1790     CMI_SET_CHECKSUM(m, s);
1791     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
1792
1793     SendSpanningChildrenNode(s, m, 0);
1794 #else
1795     int i;
1796     for (i=0; i<CmiNumNodes(); i++) {
1797         if (i==CmiMyNode()) continue;
1798         char *dupmsg = (char *)CmiAlloc(s);
1799         CmiMemcpy(dupmsg,m,s);
1800         CmiFreeNodeSendFn(i, s, dupmsg);
1801     }
1802 #endif
1803     CmiFree(m);    
1804 }
1805
1806 void CmiSyncNodeBroadcastFn(int s, char *m) {
1807     char *dupmsg;
1808     dupmsg = (char *)CmiAlloc(s);
1809     CmiMemcpy(dupmsg,m,s);
1810     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastFn on comm thd on node %d\n", CmiMyNode());
1811     CmiFreeNodeBroadcastFn(s, dupmsg);
1812 }
1813
1814 /* need */
1815 void CmiFreeNodeBroadcastAllFn(int s, char *m) {
1816   
1817     char *dupmsg = (char *)CmiAlloc(s);
1818     CmiMemcpy(dupmsg,m,s);
1819     CMI_MAGIC(dupmsg) = CHARM_MAGIC_NUMBER;
1820     ((CmiMsgHeaderBasic *)dupmsg)->size = s;
1821     CMI_SET_CHECKSUM(dupmsg, s);
1822
1823     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1824     
1825     CQdCreate(CpvAccess(cQdState), 1);
1826     CmiSendNodeSelf(dupmsg);
1827
1828     CmiFreeNodeBroadcastFn(s, m);
1829 }
1830
1831 void CmiSyncNodeBroadcastAllFn(int s, char *m) {
1832     char *dupmsg;
1833     dupmsg = (char *)CmiAlloc(s);
1834     CmiMemcpy(dupmsg,m,s);
1835     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiSyncNodeBcastAllFn on comm thd on node %d\n", CmiMyNode());
1836     CmiFreeNodeBroadcastAllFn(s, dupmsg);
1837 }
1838
1839
1840 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
1841     return NULL;
1842 }
1843 #endif //end of CMK_NODE_QUEUE_AVAILABLE
1844
1845
1846 static void  sendAck (pami_context_t      context,
1847                       CmiPAMIRzvRecv_t      *recv) 
1848 {
1849   pami_send_immediate_t parameters;
1850   parameters.dispatch        = CMI_PAMI_ACK_DISPATCH;
1851   parameters.header.iov_base = &recv->src_buffer; 
1852   parameters.header.iov_len  = sizeof(void *);    
1853   parameters.data.iov_base   = NULL;
1854   parameters.data.iov_len    = 0;
1855   parameters.dest            = recv->src_ep;
1856   
1857   //Called from advance and hence we dont need a mutex
1858   PAMI_Send_immediate (context, &parameters);
1859 }
1860
1861
1862 void rzv_recv_done   (pami_context_t     ctxt, 
1863                       void             * clientdata, 
1864                       pami_result_t      result) 
1865 {
1866   CmiPAMIRzvRecv_t recv = *(CmiPAMIRzvRecv_t *)clientdata;
1867   recv_done(ctxt, recv.msg, PAMI_SUCCESS);
1868   sendAck(ctxt, &recv);
1869 }
1870
1871 void rzv_pkt_dispatch (pami_context_t       context,   
1872                        void               * clientdata,
1873                        const void         * header_addr,
1874                        size_t               header_size,
1875                        const void         * pipe_addr,  
1876                        size_t               pipe_size,  
1877                        pami_endpoint_t      origin,
1878                        pami_recv_t         * recv) 
1879 {
1880   INCR_ORECVS();    
1881   
1882   CmiPAMIRzv_t  *rzv_hdr = (CmiPAMIRzv_t *) header_addr;
1883   CmiAssert (header_size == sizeof(CmiPAMIRzv_t));  
1884   int alloc_size = rzv_hdr->bytes;
1885   char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(CmiPAMIRzvRecv_t));
1886   //char *buffer=(char*)CmiAlloc(alloc_size+sizeof(CmiPAMIRzvRecv_t)+sizeof(int))
1887   //*(int *)(buffer+alloc_size) = *(int *)header_addr;  
1888   CmiAssert (recv == NULL);
1889
1890   CmiPAMIRzvRecv_t *rzv_recv = (CmiPAMIRzvRecv_t *)(buffer+alloc_size);
1891   rzv_recv->msg        = buffer;
1892   rzv_recv->src_ep     = origin;
1893   rzv_recv->src_buffer = rzv_hdr->buffer;
1894   
1895   CmiAssert (pipe_addr != NULL);
1896   pami_memregion_t *mregion = (pami_memregion_t *) pipe_addr;
1897   CmiAssert (pipe_size == sizeof(pami_memregion_t));
1898
1899   //Rzv inj fifos are on the 17th core shared by all contexts
1900   pami_rget_simple_t  rget;
1901   rget.rma.dest    = origin;
1902   rget.rma.bytes   = rzv_hdr->bytes;
1903   rget.rma.cookie  = rzv_recv;
1904   rget.rma.done_fn = rzv_recv_done;
1905   rget.rma.hints.buffer_registered = PAMI_HINT_ENABLE;
1906   rget.rma.hints.use_rdma = PAMI_HINT_ENABLE;
1907   rget.rdma.local.mr      = &cmi_pami_memregion[rzv_hdr->dst_context].mregion;  
1908   rget.rdma.local.offset  = (size_t)buffer - 
1909     (size_t)cmi_pami_memregion[rzv_hdr->dst_context].baseVA;
1910   rget.rdma.remote.mr     = mregion; //from message payload
1911   rget.rdma.remote.offset = rzv_hdr->offset;
1912
1913   //printf ("starting rget\n");
1914   PAMI_Rget (context, &rget);  
1915 }
1916
1917 void ack_pkt_dispatch (pami_context_t       context,   
1918                        void               * clientdata,
1919                        const void         * header_addr,
1920                        size_t               header_size,
1921                        const void         * pipe_addr,  
1922                        size_t               pipe_size,  
1923                        pami_endpoint_t      origin,
1924                        pami_recv_t         * recv) 
1925 {
1926   char **buf = (char **)header_addr;
1927   CmiFree (*buf);
1928 }
1929
1930 #include "cmimemcpy_qpx.h"