Merge of pami-bgq-commthr.
authorSameer Kumar <sameerk@us.ibm.com>
Fri, 5 Oct 2012 18:34:20 +0000 (13:34 -0500)
committerSameer Kumar <sameerk@us.ibm.com>
Fri, 5 Oct 2012 18:34:20 +0000 (13:34 -0500)
20 files changed:
src/arch/mpi-bluegeneq/cc-xlc.sh
src/arch/pami-bluegeneq/L2AtomicQueue.h
src/arch/pami-bluegeneq/cc-xlc.sh
src/arch/pami-bluegeneq/conv-mach-smp.h
src/arch/pami-bluegeneq/conv-mach.h
src/arch/pami-bluegeneq/memalloc.c [new file with mode: 0644]
src/arch/pami/conv-common.h
src/arch/pami/machine.c
src/arch/pami/manytomany.c [new file with mode: 0644]
src/ck-core/msgalloc.C
src/conv-core/convcore.c
src/conv-core/converse.h
src/libs/ck-libs/pencilfft/Makefile
src/libs/ck-libs/pencilfft/pencil_api.h
src/libs/ck-libs/pencilfft/pencilfft.C
src/libs/ck-libs/pencilfft/pencilfft.ci
src/libs/ck-libs/pencilfft/pencilfft.h
src/util/cmimemcpy_qpx.h [new file with mode: 0644]
tests/charm++/penciltest/Makefile
tests/charm++/penciltest/testpencil.C

index a2b1c8fdda43b2db3b8ec43571262ad0ecafe9aa..d10916179b01fcf9b1c1449fa5c0eac56eb10a71 100644 (file)
@@ -1,6 +1,12 @@
-XLC_TYPICAL_PRE=/soft/compilers/ibmcmp-feb2012/vacpp/bg/12.1
+XLC_TYPICAL_PRE=/opt/ibmcmp/
+if test -d /soft/compilers/ibmcmp-may2012
+then
+XLC_TYPICAL_PRE=/soft/compilers/ibmcmp-may2012
+fi
+
+XLC_PRE=$XLC_TYPICAL_PRE/vacpp/bg/12.1
+
 XLF_TYPICAL_PRE=/soft/compilers/ibmcmp-feb2012/xlf/bg/14.1
-XLC_PRE=$XLC_TYPICAL_PRE
 
 XLC_TYPICAL_POST=bin/bg
 XLC_POST=$XLC_TYPICAL_POST
index d70573dd984f540ec731ba39a8950e0e556c9dc4..d6e7f8ec0d25fd4452e206f75b7cbb7079a6e6e8 100644 (file)
 #define L2_ATOMIC_FULL        0x8000000000000000UL
 #define L2_ATOMIC_EMPTY       0x8000000000000000UL
 
+#define L2A_SUCCESS  0
+#define L2A_EAGAIN  -1
+#define L2A_FAIL    -2
+
 typedef  void* L2AtomicQueueElement;
 
 typedef struct _l2atomicstate {
@@ -26,52 +30,69 @@ typedef struct _l2atomicstate {
 typedef struct _l2atomicq {
   L2AtomicState               * _l2state;
   volatile void * volatile    * _array;
+  int                           _useOverflowQ;
+  int                           _qsize;
   PCQueue                       _overflowQ;
   pthread_mutex_t               _overflowMutex;
 } L2AtomicQueue;
 
-void L2AtomicQueueInit(void *l2mem, size_t l2memsize, L2AtomicQueue *queue) {
+void L2AtomicQueueInit      (void           * l2mem, 
+                            size_t           l2memsize, 
+                            L2AtomicQueue  * queue,
+                            int              use_overflow,
+                            int              nelem) 
+{
   pami_result_t rc;
   
   //Verify counter array is 64-byte aligned 
   assert( (((uintptr_t) l2mem) & (0x1F)) == 0 );  
   assert (sizeof(L2AtomicState) <= l2memsize);
   
+  queue->_useOverflowQ = use_overflow;
+
+  int qsize = 2;
+  while (qsize < nelem) 
+    qsize *= 2;
+  queue->_qsize = qsize;
+
   queue->_l2state = (L2AtomicState *)l2mem;
   pthread_mutex_init(&queue->_overflowMutex, NULL);
   queue->_overflowQ = PCQueueCreate();
   L2_AtomicStore(&queue->_l2state->Consumer, 0);
   L2_AtomicStore(&queue->_l2state->Producer, 0);
-  L2_AtomicStore(&queue->_l2state->UpperBound, DEFAULT_SIZE);
+  L2_AtomicStore(&queue->_l2state->UpperBound, qsize);
   
   rc = posix_memalign ((void **)&queue->_array,
                       64, /*L1 line size for BG/Q */
-                      sizeof(L2AtomicQueueElement) * DEFAULT_SIZE);
+                      sizeof(L2AtomicQueueElement) * qsize);
 
   assert(rc == PAMI_SUCCESS);
-  memset((void*)queue->_array, 0, sizeof(L2AtomicQueueElement)*DEFAULT_SIZE);
+  memset((void*)queue->_array, 0, sizeof(L2AtomicQueueElement)*qsize);
 }
 
-void L2AtomicEnqueue (L2AtomicQueue          * queue,
-                     void                   * element) 
+int L2AtomicEnqueue (L2AtomicQueue          * queue,
+                    void                   * element) 
 {
   //fprintf(stderr,"Insert message %p\n", element);
 
+  register int qsize_1 = queue->_qsize - 1;
   uint64_t index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer);
-  ppc_msync();
+  L1P_FlushRequests();
   if (index != L2_ATOMIC_FULL) {
-    queue->_array[index & (DEFAULT_SIZE-1)] = element;
-    return;
+    queue->_array[index & qsize_1] = element;
+    return L2A_SUCCESS;
   }
   
+  //We dont want to use the overflow queue
+  if (!queue->_useOverflowQ)
+    return L2A_EAGAIN; //Q is full, try later
+  
+  //No ordering is guaranteed if there is overflow
   pthread_mutex_lock(&queue->_overflowMutex);
-  // must check again to avoid race
-  if ((index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer)) != L2_ATOMIC_FULL) {
-    queue->_array[index & (DEFAULT_SIZE-1)] = element;
-  } else {
-    PCQueuePush(queue->_overflowQ, element);
-  }
+  PCQueuePush(queue->_overflowQ, element);
   pthread_mutex_unlock(&queue->_overflowMutex);
+  
+  return L2A_SUCCESS;
 }
 
 void * L2AtomicDequeue (L2AtomicQueue    *queue)
@@ -79,42 +100,38 @@ void * L2AtomicDequeue (L2AtomicQueue    *queue)
   uint64_t head, tail;
   tail = queue->_l2state->Producer;
   head = queue->_l2state->Consumer;
+  register int qsize_1 = queue->_qsize-1;
 
   volatile void *e = NULL;
   if (head < tail) {    
-    e = queue->_array[head & (DEFAULT_SIZE-1)];
+    e = queue->_array[head & qsize_1];
     while (e == NULL) 
-      e = queue->_array[head & (DEFAULT_SIZE-1)];
+      e = queue->_array[head & qsize_1];
 
     //fprintf(stderr,"Found message %p\n", e);
 
-    queue->_array[head & (DEFAULT_SIZE-1)] = NULL;
+    queue->_array[head & qsize_1] = NULL;
     ppc_msync();
 
     head ++;
     queue->_l2state->Consumer = head;    
     
-    if (head == tail) {
-      pthread_mutex_lock(&queue->_overflowMutex);      
-      if (PCQueueLength(queue->_overflowQ) == 0) {
-       uint64_t n = head + DEFAULT_SIZE;
-       // is atomic-store needed?
-       L2_AtomicStore(&queue->_l2state->UpperBound, n);
-      }
-      pthread_mutex_unlock(&queue->_overflowMutex);
-    }
+    //Charm++ does not require message ordering
+    //So we dont acquire overflow mutex here
+    uint64_t n = head + queue->_qsize;
+    // is atomic-store needed?
+    L2_AtomicStore(&queue->_l2state->UpperBound, n);
     return (void*) e;
   }
+
+  //We dont have an overflowQ
+  if (!queue->_useOverflowQ)
+    return NULL;
   
   /* head == tail (head cannot be greater than tail) */
   if (PCQueueLength(queue->_overflowQ) > 0) {
     pthread_mutex_lock(&queue->_overflowMutex);      
     e = PCQueuePop (queue->_overflowQ);    
-    if (PCQueueLength(queue->_overflowQ) == 0) {
-      uint64_t n = head + DEFAULT_SIZE;
-      // is atomic-store needed?
-      L2_AtomicStore(&queue->_l2state->UpperBound, n);
-    }
     pthread_mutex_unlock(&queue->_overflowMutex);      
     
     return (void *) e;
@@ -128,4 +145,56 @@ int L2AtomicQueueEmpty (L2AtomicQueue *queue) {
           (queue->_l2state->Producer == queue->_l2state->Consumer) );
 }
 
+//spin block in the L2 atomic queue till there is a message. fail and
+//return after n iterations
+int L2AtomicQueueSpinWait (L2AtomicQueue    * queue,
+                          int                n)
+{
+  if (!L2AtomicQueueEmpty(queue))
+    return 0;  //queue is not empty so return
+  
+  uint64_t head, tail;
+  head = queue->_l2state->Consumer;
+  
+  size_t i = n;
+  do {
+    tail = queue->_l2state->Producer;    
+    i--;
+  }
+  //While the queue is empty and i < n
+  while (head == tail && i != 0);
+  
+  return 0; //fail queue is empty
+}
+
+//spin block in the L2 atomic queue till there is a message. fail and
+//return after n iterations
+int L2AtomicQueue2QSpinWait (L2AtomicQueue    * queue0,
+                            L2AtomicQueue    * queue1,
+                            int                n)
+{
+  if (!L2AtomicQueueEmpty(queue0))
+    return 0;  //queue0 is not empty so return
+  
+  if (!L2AtomicQueueEmpty(queue1))
+    return 0;  //queue is not empty so return  
+
+  uint64_t head0, tail0;
+  uint64_t head1, tail1;
+  
+  head0 = queue0->_l2state->Consumer;  
+  head1 = queue1->_l2state->Consumer;
+  
+  size_t i = n;
+  do {
+    tail0 = queue0->_l2state->Producer;    
+    tail1 = queue1->_l2state->Producer;    
+    i --;
+  } while (head0==tail0 && head1==tail1 && i!=0);   
+  return 0; 
+}
+
+
+
 #endif
index 8cb15c02b0bb87ee57a855a957def24d078d15c9..429c88a85f29234666f4363aa22faa2ded43eee9 100644 (file)
@@ -1,4 +1,10 @@
+
+XLC_TYPICAL_PRE=/opt/ibmcmp/
+if test -d /soft/compilers/ibmcmp-may2012
+then
 XLC_TYPICAL_PRE=/soft/compilers/ibmcmp-may2012
+fi
+
 XLC_PRE=$XLC_TYPICAL_PRE/vacpp/bg/12.1
 
 XLC_TYPICAL_POST=bin/bg
index 8d8d2ed9bf85f316f921313da9ba1de88010f823..2da099f1862845f7a4e15d86d84929fb639453c0 100644 (file)
@@ -24,3 +24,6 @@
 #define CMK_SMP_NO_COMMTHD                                 1
 
 #define CMK_FAKE_SCHED_YIELD                               1
+
+#define CMK_USE_L2ATOMICS                                  1
+
index 074a9568070f926835916b4f92c2ab994bc2c764..5887a07807989d99fda9e2683eea797e3f885dca 100644 (file)
@@ -69,6 +69,7 @@
 #define CMK_CCS_AVAILABLE                                 0
 
 #define CMK_BLUEGENEQ                                      1
+#define CMK_BLUEGENEQ_OPTCOPY                              1
 
 #define CMK_NO_ISO_MALLOC                                  1
 
diff --git a/src/arch/pami-bluegeneq/memalloc.c b/src/arch/pami-bluegeneq/memalloc.c
new file mode 100644 (file)
index 0000000..88f323d
--- /dev/null
@@ -0,0 +1,100 @@
+
+#include <converse.h>
+
+#define ALIGNMENT        64
+#define SMSG_SIZE        4096
+#define N_SMSG_ELEM      512
+#define LMSG_SIZE        16384
+#define N_LMSG_ELEM      128
+
+L2AtomicQueue *sL2MemallocVec;
+L2AtomicQueue *bL2MemallocVec;
+
+typedef struct CmiMemAllocHdr_bgq_t {
+  int rank;
+  int size;
+  //Align the application buffer to 32 bytes
+  char dummy[ALIGNMENT - sizeof(CmiChunkHeader) - 2*sizeof(int)];
+} CmiMemAllocHdr_bgq;
+
+static int _nodeStart;
+
+void *CmiAlloc_bgq (int size) {
+  CmiMemAllocHdr_bgq *hdr = NULL;
+  char *buf;
+  
+  int myrank = Kernel_ProcessorID() - _nodeStart;
+
+  if (size <= SMSG_SIZE) {
+    hdr = L2AtomicDequeue (&sL2MemallocVec[myrank]);
+    if (hdr == NULL) 
+      hdr = (CmiMemAllocHdr_bgq *)
+       //malloc_nomigrate(SMSG_SIZE + sizeof(CmiMemAllocHdr_bgq));      
+       memalign(ALIGNMENT, SMSG_SIZE + sizeof(CmiMemAllocHdr_bgq));      
+
+    hdr->size = SMSG_SIZE;
+  }
+  else if (size <= LMSG_SIZE) {
+    hdr = L2AtomicDequeue (&bL2MemallocVec[myrank]);
+    if (hdr == NULL) 
+      hdr = (CmiMemAllocHdr_bgq *)
+       //malloc_nomigrate(LMSG_SIZE + sizeof(CmiMemAllocHdr_bgq));      
+       memalign(ALIGNMENT, LMSG_SIZE + sizeof(CmiMemAllocHdr_bgq));  
+    hdr->size = LMSG_SIZE;
+  }
+  else {
+    hdr = (CmiMemAllocHdr_bgq *)
+      //malloc_nomigrate(size + sizeof(CmiMemAllocHdr_bgq));      
+      memalign(ALIGNMENT, size + sizeof(CmiMemAllocHdr_bgq));
+    hdr->size = size;
+  }
+
+  hdr->rank = myrank;
+  buf = (char*)hdr + sizeof(CmiMemAllocHdr_bgq);
+
+  return buf;
+}
+
+void CmiFree_bgq (void *buf) {
+  CmiMemAllocHdr_bgq *hdr = (CmiMemAllocHdr_bgq *)((char*)buf - sizeof(CmiMemAllocHdr_bgq));  
+  int rc = L2A_EAGAIN;
+  
+  if (hdr->size == SMSG_SIZE) 
+    rc = L2AtomicEnqueue (&sL2MemallocVec[hdr->rank], hdr);
+  else if (hdr->size == LMSG_SIZE)
+    rc = L2AtomicEnqueue (&bL2MemallocVec[hdr->rank], hdr);
+
+  if (rc == L2A_EAGAIN)
+    //queues are full or large buf
+    free_nomigrate(hdr);
+}
+
+
+void CmiMemAllocInit_bgq (void   * l2mem,
+                         size_t   l2memsize) 
+{
+  int i = 0;
+  int node_size = 64/Kernel_ProcessCount();
+  _nodeStart = node_size * Kernel_MyTcoord();
+  //We want to align headers to 32 bytes
+  CmiAssert(sizeof(CmiMemAllocHdr_bgq)+sizeof(CmiChunkHeader) == ALIGNMENT);
+
+  CmiAssert (l2memsize >= 2 * node_size * sizeof(L2AtomicState));
+  sL2MemallocVec = (L2AtomicQueue *)malloc_nomigrate(sizeof(L2AtomicQueue)*node_size);
+  bL2MemallocVec = (L2AtomicQueue *)malloc_nomigrate(sizeof(L2AtomicQueue)*node_size);
+
+  for (i = 0; i < node_size; ++i) {
+    L2AtomicQueueInit ((char *)l2mem + 2*i*sizeof(L2AtomicState),
+                      sizeof(L2AtomicState),
+                      &sL2MemallocVec[i],
+                      0, /*No Overflow*/
+                      N_SMSG_ELEM /*512 entries in short q*/);
+
+    L2AtomicQueueInit ((char *)l2mem + (2*i+1)*sizeof(L2AtomicState),
+                      sizeof(L2AtomicState),
+                      &bL2MemallocVec[i],
+                      0,
+                      N_LMSG_ELEM /*128 entries in long q*/);
+  }
+}
+
index a3686b47fe25d6b0f8f4cfdd3ab1edfd1b825120..9055340481f345b374889e6e8babb4d5ec7ffab1 100644 (file)
@@ -5,7 +5,13 @@
 
 #define CMK_HANDLE_SIGUSR                                  1
 
+//#define CMK_ENABLE_ASYNC_PROGRESS                          1
+
+#if CMK_ENABLE_ASYNC_PROGRESS
+#define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size, dstnode; CmiUInt2 redID, padding; char work[6*sizeof(uintptr_t)]; 
+#else
 #define CMK_MSG_HEADER_EXT_    CmiUInt2 rank, hdl,xhdl,info, stratid; unsigned char cksum, magic; int root, size; CmiUInt2 redID, padding; 
+#endif
 #define CMK_MSG_HEADER_BASIC  CMK_MSG_HEADER_EXT
 #define CMK_MSG_HEADER_EXT    { CMK_MSG_HEADER_EXT_ }
 #define CMK_MSG_HEADER_BIGSIM_    { CMK_MSG_HEADER_EXT_ CMK_BIGSIM_FIELDS }
@@ -34,3 +40,5 @@
 //#define CMI_DIRECT_MANY_TO_MANY_DEFINED                    0
 
 #define CMK_PERSISTENT_COMM                                0
+
+#define  CMI_DIRECT_MANY_TO_MANY_DEFINED                   1
index 0705a4bae60867584497676e7db05dba80fa8c36..99ecb9931d7a8cb801588fcf56323b5ad939aa81 100644 (file)
@@ -1,4 +1,3 @@
-
 #include <stdio.h>
 #include <errno.h>
 #include <stdlib.h>
 #include "malloc.h"
 
 #include <hwi/include/bqc/A2_inlines.h>
+#include "spi/include/kernel/process.h"
+#include "spi/include/kernel/memory.h"
 #include "pami.h"
 #include "pami_sys.h"
 
-#if CMK_SMP
-#define CMK_USE_L2ATOMICS   1
+//#if CMK_SMP
+//#define CMK_USE_L2ATOMICS   1
+//#endif
+
+#if !CMK_SMP
+#if CMK_ENABLE_ASYNC_PROGRESS
+#error "async progress non supported with non-smp
+#endif
 #endif
 
 #if CMK_SMP && CMK_USE_L2ATOMICS
 #include "L2AtomicQueue.h"
+#include "memalloc.c"
 #endif
 
+#define CMI_LIKELY(x)    (__builtin_expect(x,1))
+#define CMI_UNLIKELY(x)  (__builtin_expect(x,0))
+
 char *ALIGN_32(char *p) {
   return((char *)((((unsigned long)p)+0x1f) & (~0x1FUL)));
 }
 
-#if CMK_SMP && CMK_USE_L2ATOMICS
-CpvDeclare(L2AtomicQueue*, broadcast_q);          
-#else
-CpvDeclare(PCQueue, broadcast_q);                 //queue to send broadcast messages
-#endif
-
-#if CMK_NODE_QUEUE_AVAILABLE
-CsvDeclare(PCQueue, node_bcastq);
-CsvDeclare(CmiNodeLock, node_bcastLock);
-#endif
-
-//#define ENABLE_BROADCAST_THROTTLE 1
-
 /*To reduce the buffer used in broadcast and distribute the load from
   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
   spanning tree broadcast algorithm.
@@ -69,6 +67,15 @@ CsvDeclare(CmiNodeLock, node_bcastLock);
 /* FIXME: need a random number that everyone agrees ! */
 #define CHARM_MAGIC_NUMBER               126
 
+
+#define CMI_PAMI_SHORT_DISPATCH           7
+#define CMI_PAMI_RZV_DISPATCH             8
+#define CMI_PAMI_ACK_DISPATCH             9
+#define CMI_PAMI_DISPATCH                10
+
+#define SHORT_CUTOFF   128
+#define EAGER_CUTOFF   4096
+
 #if !CMK_OPTIMIZE
 static int checksum_flag = 0;
 extern unsigned char computeCheckSum(unsigned char *data, int len);
@@ -80,6 +87,7 @@ extern unsigned char computeCheckSum(unsigned char *data, int len);
         }
 
 #define CMI_CHECK_CHECKSUM(msg, len)    \
+  int count; \                         
         if (checksum_flag)      \
           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
@@ -115,26 +123,40 @@ CpvDeclare(void*, CmiLocalQueue);
 
 
 typedef struct ProcState {
-    /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
+    /* PCQueue      sendMsgBuf; */  /* per processor message sending queue */
 #if CMK_SMP && CMK_USE_L2ATOMICS
     L2AtomicQueue   atomic_queue;
 #endif
-    CmiNodeLock  recvLock;              /* for cs->recv */
-    CmiNodeLock bcastLock;
+  /* CmiNodeLock  recvLock;  */            /* for cs->recv */
 } ProcState;
 
 static ProcState  *procState;
 
+#if CMK_SMP && CMK_USE_L2ATOMICS
+static L2AtomicQueue node_recv_atomic_q;
+#endif
+
 #if CMK_SMP && !CMK_MULTICORE
 //static volatile int commThdExit = 0;
 //static CmiNodeLock commThdExitLock = 0;
+
+//The random seed to pick destination context
+__thread uint32_t r_seed = 0xdeadbeef;
+__thread int32_t _cmi_bgq_incommthread = 0;
 #endif
 
+//int CmiInCommThread () {
+//  //if (_cmi_bgq_incommthread)
+//  //printf ("CmiInCommThread: %d\n", _cmi_bgq_incommthread);
+//  return _cmi_bgq_incommthread;
+//}
+
 void ConverseRunPE(int everReturn);
 static void CommunicationServer(int sleepTime);
 static void CommunicationServerThread(int sleepTime);
 
-static void CmiNetworkBarrier();
+static void CmiNetworkBarrier(int async);
+static void CmiSendPeer (int rank, int size, char *msg);
 
 //So far we dont define any comm threads
 int Cmi_commthread = 0;
@@ -143,8 +165,12 @@ int Cmi_commthread = 0;
 CsvDeclare(CmiNodeState, NodeState);
 #include "immediate.c"
 
+#if CMK_ENABLE_ASYNC_PROGRESS  
+//Immediate messages not supported yet
+#define AdvanceCommunications() 
+#else
 void AdvanceCommunications();
-
+#endif
 
 #if !CMK_SMP
 /************ non SMP **************/
@@ -170,30 +196,29 @@ static void CmiStartThreads(char **argv) {
 //int received_immediate;
 //int received_broadcast;
 
+void _alias_rank (int rank);
+
 /*Add a message to this processor's receive queue, pe is a rank */
 void CmiPushPE(int pe,void *msg) {
-    CmiState cs = CmiGetStateN(pe);
-    MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
+    CmiState cs = CmiGetStateN(pe);    
 #if CMK_IMMEDIATE_MSG
     if (CmiIsImmediate(msg)) {
-        /**(CmiUInt2 *)msg = pe;*/
-        //received_immediate = 1;
-        //printf("PushPE: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
-        //CMI_DEST_RANK(msg) = pe;
-        CmiPushImmediateMsg(msg);
-        return;
+      //_alias_rank(CMI_DEST_RANK(msg));
+      //CmiLock(CsvAccess(NodeState).immRecvLock);
+      CmiHandleImmediateMessage(msg);
+      //CmiUnlock(CsvAccess(NodeState).immRecvLock);
+      //_alias_rank(0);
+      return;
     }
 #endif
-
+    
 #if CMK_SMP && CMK_USE_L2ATOMICS
     L2AtomicEnqueue(&procState[pe].atomic_queue, msg);
 #else
     PCQueuePush(cs->recv,(char *)msg);
 #endif
-    //printf("%d: PCQueue length = %d, msg = %x\n", CmiMyPe(), PCQueueLength(cs->recv), msg);
-
-    CmiIdleLock_addMessage(&cs->idle);
-    MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
+    
+    //CmiIdleLock_addMessage(&cs->idle);
 }
 
 #if CMK_NODE_QUEUE_AVAILABLE
@@ -202,19 +227,21 @@ static void CmiPushNode(void *msg) {
     MACHSTATE(3,"Pushing message into NodeRecv queue");
 #if CMK_IMMEDIATE_MSG
     if (CmiIsImmediate(msg)) {
-        //printf("PushNode: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
-        //CMI_DEST_RANK(msg) = 0;
-        CmiPushImmediateMsg(msg);
-        return;
+      //CmiLock(CsvAccess(NodeState).immRecvLock);
+      CmiHandleImmediateMessage(msg);
+      //CmiUnlock(CsvAccess(NodeState).immRecvLock);   
+      return;
     }
 #endif
+#if CMK_SMP && CMK_USE_L2ATOMICS
+    L2AtomicEnqueue(&node_recv_atomic_q, msg);    
+#else
     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
     PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
-    {
-        CmiState cs=CmiGetStateN(0);
-        CmiIdleLock_addMessage(&cs->idle);
-    }
+#endif
+    //CmiState cs=CmiGetStateN(0);
+    //CmiIdleLock_addMessage(&cs->idle);
 }
 #endif /* CMK_NODE_QUEUE_AVAILABLE */
 
@@ -229,8 +256,15 @@ static void CmiPushNode(void *msg) {
 #if CMK_PAMI_MULTI_CONTEXT
 volatile int msgQueueLen [MAX_NUM_CONTEXTS];
 volatile int outstanding_recvs [MAX_NUM_CONTEXTS];
-#define THREADS_PER_CONTEXT 4
-#define LTPS                2 //Log Threads Per Context (TPS)
+
+//#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+#define THREADS_PER_CONTEXT 2
+#define LTPS                1 //Log Threads Per Context (TPS)
+//#else
+//#define THREADS_PER_CONTEXT 4
+//#define LTPS                2 //Log Threads Per Context (TPS)
+//#endif
+
 #define  MY_CONTEXT_ID() (CmiMyRank() >> LTPS)
 #define  MY_CONTEXT()    (cmi_pami_contexts[CmiMyRank() >> LTPS])
 
@@ -241,6 +275,7 @@ volatile int outstanding_recvs [MAX_NUM_CONTEXTS];
 #define  DECR_ORECVS()   //(outstanding_recvs[CmiMyRank() >> LTPS] --)
 #define  ORECVS()        0 //(outstanding_recvs[CmiMyRank() >> LTPS])
 #else
+#define LTPS    1 
 volatile int msgQueueLen;
 volatile int outstanding_recvs;
 #define  MY_CONTEXT_ID() (0)
@@ -254,7 +289,7 @@ volatile int outstanding_recvs;
 #define  ORECVS()        (outstanding_recvs)
 #endif
 
-#if CMK_SMP 
+#if CMK_SMP  && !CMK_ENABLE_ASYNC_PROGRESS
 #define PAMIX_CONTEXT_LOCK_INIT(x)
 #define PAMIX_CONTEXT_LOCK(x)        if(LTPS) PAMI_Context_lock(x)
 #define PAMIX_CONTEXT_UNLOCK(x)      if(LTPS) {ppc_msync(); PAMI_Context_unlock(x);}
@@ -263,6 +298,7 @@ volatile int outstanding_recvs;
 #define PAMIX_CONTEXT_LOCK_INIT(x)
 #define PAMIX_CONTEXT_LOCK(x)
 #define PAMIX_CONTEXT_UNLOCK(x)
+#define PAMIX_CONTEXT_TRYLOCK(x)      1
 #endif
 
 
@@ -277,10 +313,17 @@ extern void CthInit(char **argv);
 
 static void SendMsgsUntil(int);
 
+#define A_PRIME 13
+#define B_PRIME 19
+
+static inline unsigned myrand (unsigned *seed) {
+  *seed = A_PRIME * (*seed) + B_PRIME;
+  return *seed;
+}
 
-void SendSpanningChildren(int size, char *msg);
+void SendSpanningChildren(int size, char *msg, int from_rdone);
 #if CMK_NODE_QUEUE_AVAILABLE
-void SendSpanningChildrenNode(int size, char *msg);
+void SendSpanningChildrenNode(int size, char *msg, int from_rdone);
 #endif
 
 typedef struct {
@@ -297,7 +340,6 @@ static CmiIdleState *CmiNotifyGetState(void) {
     return s;
 }
 
-
 static void send_done(pami_context_t ctxt, void *data, pami_result_t result) 
 {
   CmiFree(data);
@@ -310,11 +352,12 @@ static void recv_done(pami_context_t ctxt, void *clientdata, pami_result_t resul
 {
     char *msg = (char *) clientdata;
     int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
+    //int rank = *(int *) (msg + sndlen); //get rank from bottom of the message
+    //CMI_DEST_RANK(msg) = rank;
 
     //fprintf (stderr, "%d Recv message done \n", CmiMyPe());
     /* then we do what PumpMsgs used to do:
      * push msg to recv queue */
-    int count=0;
     CMI_CHECK_CHECKSUM(msg, sndlen);
     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
         CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
@@ -322,36 +365,15 @@ static void recv_done(pami_context_t ctxt, void *clientdata, pami_result_t resul
     }
 
 #if CMK_BROADCAST_SPANNING_TREE 
-    if (CMI_IS_BCAST_ON_CORES(msg) ) {
-        int pe = CmiMyRank(); //CMI_DEST_RANK(msg);
-       char *copymsg;
-        copymsg = (char *)CmiAlloc(sndlen);
-        CmiMemcpy(copymsg,msg,sndlen);
-
-#if   CMK_SMP && CMK_USE_L2ATOMICS
-       L2AtomicEnqueue(CpvAccessOther(broadcast_q, pe), copymsg);      
-#elif CMK_SMP
-        CmiLock(procState[pe].bcastLock);
-        PCQueuePush(CpvAccessOther(broadcast_q, pe), copymsg); 
-        CmiUnlock(procState[pe].bcastLock);
-#else
-        PCQueuePush(CpvAccess(broadcast_q), copymsg);
-#endif
-    }
+    if (CMI_IS_BCAST_ON_CORES(msg) ) 
+        //Forward along spanning tree
+        SendSpanningChildren(sndlen, msg, 1);
 #endif
 
 #if CMK_NODE_QUEUE_AVAILABLE
 #if CMK_BROADCAST_SPANNING_TREE
-    if (CMI_IS_BCAST_ON_NODES(msg)) {
-        //printf ("%d: Receiving node bcast message from %d with %d bytes for %d\n", CmiMyPe(), CMI_BROADCAST_ROOT(msg), sndlen, CMI_DEST_RANK(msg));
-        char *copymsg = (char *)CmiAlloc(sndlen);
-        CmiMemcpy(copymsg,msg,sndlen);
-        //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
-        CmiLock(CsvAccess(node_bcastLock));
-        PCQueuePush(CsvAccess(node_bcastq), copymsg);
-        CmiUnlock(CsvAccess(node_bcastLock));
-        //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
-    }
+    if (CMI_IS_BCAST_ON_NODES(msg)) 
+      SendSpanningChildrenNode(sndlen, msg, 1);
 #endif
     if (CMI_DEST_RANK(msg) == SMP_NODEMESSAGE)
       CmiPushNode(msg);
@@ -362,19 +384,34 @@ static void recv_done(pami_context_t ctxt, void *clientdata, pami_result_t resul
     DECR_ORECVS();
 }
 
-static void pkt_dispatch (pami_context_t       context,      /**< IN: PAMI context */
-                         void               * clientdata,   /**< IN: dispatch cookie */
-                         const void         * header_addr,  /**< IN: header address */
-                         size_t               header_size,  /**< IN: header size */
-                         const void         * pipe_addr,    /**< IN: address of PAMI pipe buffer */
-                         size_t               pipe_size,    /**< IN: size of PAMI pipe buffer */
+typedef struct _cmi_pami_rzv {
+  void           * buffer;
+  size_t           offset;
+  int              bytes;
+  int              dst_context;
+}CmiPAMIRzv_t;  
+
+typedef struct _cmi_pami_rzv_recv {
+  void           * msg;
+  void           * src_buffer;
+  int              src_ep;
+} CmiPAMIRzvRecv_t;
+
+static void pkt_dispatch (pami_context_t       context,      
+                         void               * clientdata,   
+                         const void         * header_addr,  
+                         size_t               header_size,  
+                         const void         * pipe_addr,    
+                         size_t               pipe_size,    
                          pami_endpoint_t      origin,
-                         pami_recv_t         * recv)        /**< OUT: receive message structure */
+                         pami_recv_t         * recv)        
 {
     //fprintf (stderr, "Received Message of size %d %p\n", pipe_size, recv);
     INCR_ORECVS();    
     int alloc_size = pipe_size;
     char * buffer  = (char *)CmiAlloc(alloc_size);
+    //char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(int));
+    //*(int *)(buffer+alloc_size) = *(int *)header_addr;
 
     if (recv) {
       recv->local_fn = recv_done;
@@ -390,77 +427,55 @@ static void pkt_dispatch (pami_context_t       context,      /**< IN: PAMI conte
     }
 }
 
-
-#if CMK_NODE_QUEUE_AVAILABLE
-void sendBroadcastMessagesNode() __attribute__((__noinline__));
-void sendBroadcastMessagesNode() {
-    //node broadcast message could be always handled by any cores (including
-    //comm thd) on this node
-    //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
-    CmiLock(CsvAccess(node_bcastLock));
-    char *msg = PCQueuePop(CsvAccess(node_bcastq));
-    CmiUnlock(CsvAccess(node_bcastLock));
-    //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
-    while (msg) {
-        //printf("sendBroadcastMessagesNode: node %d rank %d with msg root %d\n", CmiMyNode(), CmiMyRank(), CMI_BROADCAST_ROOT(msg));
-        SendSpanningChildrenNode(((CmiMsgHeaderBasic *) msg)->size, msg);
-        CmiFree(msg);
-        //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
-        CmiLock(CsvAccess(node_bcastLock));
-        msg = PCQueuePop(CsvAccess(node_bcastq));
-        CmiUnlock(CsvAccess(node_bcastLock));
-        //CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
-    }
+static void short_pkt_dispatch (pami_context_t       context,      
+                               void               * clientdata,   
+                               const void         * header_addr,  
+                               size_t               header_size,  
+                               const void         * pipe_addr,    
+                               size_t               pipe_size,    
+                               pami_endpoint_t      origin,
+                               pami_recv_t         * recv)        
+{
+  int alloc_size = pipe_size;
+  char * buffer  = (char *)CmiAlloc(alloc_size);
+  //char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(int));
+  //*(int *)(buffer+alloc_size) = *(int *)header_addr;
+  
+  memcpy (buffer, pipe_addr, pipe_size);
+  char *smsg = (char *)pipe_addr;
+  char *msg  = (char *)buffer;
+
+  CMI_CHECK_CHECKSUM(smsg, pipe_size);  
+  if (CMI_MAGIC(smsg) != CHARM_MAGIC_NUMBER) {
+    /* received a non-charm msg */
+    CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");     
+  }
+  CmiPushPE(CMI_DEST_RANK(smsg), (void *)msg);
 }
-#endif
-
-void sendBroadcastMessages() __attribute__((__noinline__));
-void sendBroadcastMessages() {
-#if CMK_SMP && CMK_USE_L2ATOMICS
-    L2AtomicQueue *toPullQ;
-#else
-    PCQueue toPullQ;
-#endif
 
-    toPullQ = CpvAccess(broadcast_q);
-    
-#if CMK_SMP && !CMK_USE_L2ATOMICS
-    CmiLock(procState[CmiMyRank()].bcastLock);
-#endif
-    
-#if CMK_SMP && CMK_USE_L2ATOMICS
-    char *msg = (char *) L2AtomicDequeue(toPullQ);
-#else
-    char *msg = (char *) PCQueuePop(toPullQ);
-#endif
-    
-#if CMK_SMP && !CMK_USE_L2ATOMICS
-    CmiUnlock(procState[CmiMyRank()].bcastLock);
-#endif
-    
-    while (msg) {      
-#if CMK_BROADCAST_SPANNING_TREE
-        SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
-#endif
-
-        CmiFree (msg);
-
-#if CMK_SMP && !CMK_USE_L2ATOMICS
-        CmiLock(procState[CmiMyRank()].bcastLock);
-#endif
-
-#if CMK_SMP && CMK_USE_L2ATOMICS
-       msg = (char *) L2AtomicDequeue(toPullQ);
-#else
-        msg = (char *) PCQueuePop(toPullQ);
-#endif
-
-#if CMK_SMP && !CMK_USE_L2ATOMICS
-        CmiUnlock(procState[CmiMyRank()].bcastLock);
-#endif
-    }
-}
 
+void rzv_pkt_dispatch (pami_context_t       context,   
+                      void               * clientdata,
+                      const void         * header_addr,
+                      size_t               header_size,
+                      const void         * pipe_addr,  
+                      size_t               pipe_size,  
+                      pami_endpoint_t      origin,
+                      pami_recv_t         * recv);
+
+void ack_pkt_dispatch (pami_context_t       context,   
+                      void               * clientdata,
+                      const void         * header_addr,
+                      size_t               header_size,
+                      const void         * pipe_addr,  
+                      size_t               pipe_size,  
+                      pami_endpoint_t      origin,
+                      pami_recv_t         * recv);
+
+void rzv_recv_done   (pami_context_t     ctxt, 
+                     void             * clientdata, 
+                     pami_result_t      result); 
 
 //approx sleep command
 size_t mysleep_iter = 0;
@@ -493,12 +508,128 @@ pami_geometry_t    world_geometry;
 pami_xfer_t        pami_barrier;
 char clientname[] = "Converse";
 
-#define CMI_PAMI_DISPATCH   10
+#if 1
+typedef struct _cmi_pami_mregion_t {
+  pami_memregion_t   mregion;
+  void             * baseVA;
+} CmiPAMIMemRegion_t;
 
-#include "malloc.h"
+//one for each of the 64 possible contexts
+CmiPAMIMemRegion_t  cmi_pami_memregion[64];
+#endif
 
+#include "malloc.h"
 void *l2atomicbuf;
 
+void _alias_rank (int rank) {
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+
+  CmiState cs = CmiGetState();
+  CmiState cs_r = CmiGetStateN(rank);
+
+  cs->rank = cs_r->rank;
+  cs->pe   = cs_r->pe;
+#endif
+}
+
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+
+pami_result_t init_comm_thread (pami_context_t   context,
+                               void           * cookie)
+{
+  CmiState cs  = CmiGetState();
+  CmiState cs0 = CmiGetStateN(0);
+  *cs = *cs0; //Alias comm thread to rank 0
+  //printf("Initialized comm thread, my rank %d, my pe %d\n", 
+  // CmiMyRank(), 
+  // CmiMyPe());
+
+  //Notify main thread comm thread has been initialized
+  *(int*)cookie = 0;
+
+#if 1
+  //set the seed to choose destination context
+  uint64_t rseedl = r_seed;
+  rseedl |= (uint64_t)context;
+  r_seed = ((uint32_t)rseedl)^((uint32_t)(rseedl >> 32));
+#endif
+
+  _cmi_bgq_incommthread = 1;
+
+  return PAMI_SUCCESS;
+}
+
+typedef void (*pamix_progress_function) (pami_context_t context, void *cookie);
+typedef pami_result_t (*pamix_progress_register_fn) 
+  (pami_context_t            context,
+   pamix_progress_function   progress_fn,
+   pamix_progress_function   suspend_fn,
+   pamix_progress_function   resume_fn,
+   void                     * cookie);
+typedef pami_result_t (*pamix_progress_enable_fn)(pami_context_t   context,
+                                                 int              event_type);
+typedef pami_result_t (*pamix_progress_disable_fn)(pami_context_t  context,
+                                                  int             event_type);
+#define PAMI_EXTENSION_OPEN(client, name, ext)  \
+({                                              \
+  pami_result_t rc;                             \
+  rc = PAMI_Extension_open(client, name, ext);  \
+  CmiAssert (rc == PAMI_SUCCESS);      \
+})
+#define PAMI_EXTENSION_FUNCTION(type, name, ext)        \
+({                                                      \
+  void* fn;                                             \
+  fn = PAMI_Extension_symbol(ext, name);                \
+  CmiAssert (fn != NULL);                              \
+  (type)fn;                                             \
+})
+
+pami_extension_t            cmi_ext_progress;
+pamix_progress_register_fn  cmi_progress_register;
+pamix_progress_enable_fn    cmi_progress_enable;
+pamix_progress_disable_fn   cmi_progress_disable;
+
+int CMI_Progress_init(int start, int ncontexts) {
+  if (CmiMyPe() == 0)
+    printf("Enabling communication threads\n");
+  
+  PAMI_EXTENSION_OPEN(cmi_pami_client,"EXT_async_progress",&cmi_ext_progress);
+  cmi_progress_register = PAMI_EXTENSION_FUNCTION(pamix_progress_register_fn, "register", cmi_ext_progress);
+  cmi_progress_enable   = PAMI_EXTENSION_FUNCTION(pamix_progress_enable_fn,   "enable",   cmi_ext_progress);
+  cmi_progress_disable  = PAMI_EXTENSION_FUNCTION(pamix_progress_disable_fn,  "disable",  cmi_ext_progress);
+  
+  int i = 0;
+  for (i = start; i < start+ncontexts; ++i) {
+    //fprintf(stderr, "Enabling progress on context %d\n", i);
+    cmi_progress_register (cmi_pami_contexts[i], 
+                          NULL, 
+                          NULL, 
+                          NULL, NULL);
+    cmi_progress_enable   (cmi_pami_contexts[i], 0 /*progress all*/);  
+  }
+
+  pami_work_t  work;
+  volatile int x;
+  for (i = start; i < start+ncontexts; ++i) {
+    x = 1;
+    PAMI_Context_post(cmi_pami_contexts[i], &work, 
+                     init_comm_thread, (void*)&x);
+    while(x);
+  }
+  
+  return 0;
+}
+
+int CMI_Progress_finalize(int start, int ncontexts) {
+  int i = 0;
+  for (i = start; i < start+ncontexts; ++i) 
+    cmi_progress_disable  (cmi_pami_contexts[i], 0 /*progress all*/);    
+  PAMI_Extension_close (cmi_ext_progress);
+}
+#endif
+
+#include "manytomany.c"
+
 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret) {
     int n, i, count;
 
@@ -527,6 +658,8 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
     }
     cmi_pami_numcontexts = _n;
 
+    //fprintf(stderr,"Creating %d pami contexts\n", _n);
+
     pami_configuration_t configuration;
     pami_result_t result;
     
@@ -540,14 +673,54 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
 
     pami_dispatch_hint_t options = (pami_dispatch_hint_t) {0};
     pami_dispatch_callback_function pfn;
-    pfn.p2p = pkt_dispatch;
-    for (i = 0; i < _n; ++i)
+    for (i = 0; i < _n; ++i) {
+      pfn.p2p = pkt_dispatch;
       PAMI_Dispatch_set (cmi_pami_contexts[i],
                         CMI_PAMI_DISPATCH,
                         pfn,
                         NULL,
                         options);
-    
+      
+      pfn.p2p = ack_pkt_dispatch;
+      PAMI_Dispatch_set (cmi_pami_contexts[i],
+                        CMI_PAMI_ACK_DISPATCH,
+                        pfn,
+                        NULL,
+                        options);
+      
+      pfn.p2p = rzv_pkt_dispatch;
+      PAMI_Dispatch_set (cmi_pami_contexts[i],
+                        CMI_PAMI_RZV_DISPATCH,
+                        pfn,
+                        NULL,
+                        options);      
+
+      pfn.p2p = short_pkt_dispatch;
+      PAMI_Dispatch_set (cmi_pami_contexts[i],
+                        CMI_PAMI_SHORT_DISPATCH,
+                        pfn,
+                        NULL,
+                        options);      
+    }
+
+#if 1
+    size_t bytes_out;
+    void * buf = malloc(sizeof(long));    
+    uint32_t retval;
+    Kernel_MemoryRegion_t k_mregion;
+    retval = Kernel_CreateMemoryRegion (&k_mregion, buf, sizeof(long));
+    assert(retval==0);  
+    for (i = 0; i < _n; ++i) {
+      cmi_pami_memregion[i].baseVA = k_mregion.BaseVa;
+      PAMI_Memregion_create (cmi_pami_contexts[i],
+                            k_mregion.BaseVa,
+                            k_mregion.Bytes,
+                            &bytes_out,
+                            &cmi_pami_memregion[i].mregion);
+    }
+    free(buf);
+#endif
+
     //fprintf(stderr, "%d Initializing Converse PAMI machine Layer on %d tasks\n", _Cmi_mynode, _Cmi_numnodes);
 
     ///////////---------------------------------/////////////////////
@@ -593,9 +766,21 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
                                            must_query_algo,
                                            must_query_md,
                                            num_algorithm[1]);
+    
+    int opt_alg = 0;
+    for (int nalg = 0; nalg < num_algorithm[0]; ++nalg) 
+      if (strstr(always_works_md[nalg].name, "GI") != NULL) {
+       opt_alg = nalg;
+       break;
+      }
+
+    if (_Cmi_mynode == 0)
+      printf ("Choosing optimized barrier algorithm name %s\n",
+             always_works_md[opt_alg]);
+             
     pami_barrier.cb_done   = pami_barrier_done;
     pami_barrier.cookie    = (void*) & pami_barrier_flag;
-    pami_barrier.algorithm = always_works_algo[0];
+    pami_barrier.algorithm = always_works_algo[opt_alg];
 
     /* Docs06:  Query the algorithm lists */
     if (result != PAMI_SUCCESS)
@@ -604,9 +789,9 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
        return;
       }
 
-    CmiNetworkBarrier();
-    CmiNetworkBarrier();
-    CmiNetworkBarrier();
+    CmiNetworkBarrier(0);
+    CmiNetworkBarrier(0);
+    CmiNetworkBarrier(0);
 
     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
     Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
@@ -628,20 +813,16 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
     CsvInitialize(CmiNodeState, NodeState);
     CmiNodeStateInit(&CsvAccess(NodeState));
 
-#if CMK_NODE_QUEUE_AVAILABLE
-    CsvInitialize(PCQueue, node_bcastq);
-    CsvAccess(node_bcastq) = PCQueueCreate();
-    CsvInitialize(CmiNodeLock, node_bcastLock);
-    CsvAccess(node_bcastLock) = CmiCreateLock();
-#endif
-
-    int actualNodeSize = _Cmi_mynodesize;
-
 #if CMK_SMP && CMK_USE_L2ATOMICS
+    //max available hardware threads
+    int actualNodeSize = 64/Kernel_ProcessCount(); 
+    //printf("Ranks per node %d, actualNodeSize %d CmiMyNodeSize() %d\n",
+    //    Kernel_ProcessCount(), actualNodeSize, _Cmi_mynodesize);
+    
     //pami_result_t rc;
     pami_extension_t l2;
     pamix_proc_memalign_fn PAMIX_L2_proc_memalign;
-    size_t size = 2 * actualNodeSize * sizeof(L2AtomicState);
+    size_t size = (4*actualNodeSize+1) * sizeof(L2AtomicState);
     rc = PAMI_Extension_open(NULL, "EXT_bgq_l2atomic", &l2);
     CmiAssert (rc == 0);
     PAMIX_L2_proc_memalign = (pamix_proc_memalign_fn)PAMI_Extension_symbol(l2, "proc_memalign");
@@ -649,20 +830,41 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
     CmiAssert (rc == 0);    
 #endif
 
-    procState = (ProcState *)CmiAlloc((actualNodeSize) * sizeof(ProcState));
-    for (i=0; i<actualNodeSize; i++) {
+    procState = (ProcState *)malloc((_Cmi_mynodesize) * sizeof(ProcState));
+    for (i=0; i<_Cmi_mynodesize; i++) {
         /*    procState[i].sendMsgBuf = PCQueueCreate();   */
-        procState[i].recvLock = CmiCreateLock();
-        procState[i].bcastLock = CmiCreateLock();
+        //procState[i].recvLock = CmiCreateLock();
 #if CMK_SMP && CMK_USE_L2ATOMICS
        L2AtomicQueueInit ((char *) l2atomicbuf + sizeof(L2AtomicState)*i,
                           sizeof(L2AtomicState),
-                          &procState[i].atomic_queue);
+                          &procState[i].atomic_queue,
+                          1, /*use overflow*/
+                          DEFAULT_SIZE /*1024 entries*/);
 #endif
     }
 
+#if CMK_SMP && CMK_USE_L2ATOMICS    
+    CmiMemAllocInit_bgq ((char*)l2atomicbuf + 
+                        2*actualNodeSize*sizeof(L2AtomicState),
+                        2*actualNodeSize*sizeof(L2AtomicState)); 
+    L2AtomicQueueInit ((char*)l2atomicbuf + 
+                      4*actualNodeSize*sizeof(L2AtomicState),
+                      sizeof(L2AtomicState),
+                      &node_recv_atomic_q,
+                      1, /*use overflow*/
+                      DEFAULT_SIZE /*1024 entries*/);                 
+#endif
+    
+    //Initialize the manytomany api
+    _cmidirect_m2m_initialize (cmi_pami_contexts, _n);
+
     //printf ("Starting Threads\n");
     CmiStartThreads(argv);
+
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+    CMI_Progress_init(0, _n);
+#endif
+
     ConverseRunPE(initret);
 }
 
@@ -675,7 +877,7 @@ int PerrorExit (char *err) {
 
 
 void ConverseRunPE(int everReturn) {
-    //printf ("ConverseRunPE on rank %d\n", CmiMyPe());
+    //    printf ("ConverseRunPE on rank %d\n", CmiMyPe());    
 
     CmiIdleState *s=CmiNotifyGetState();
     CmiState cs;
@@ -693,34 +895,25 @@ void ConverseRunPE(int everReturn) {
 
     CthInit(CmiMyArgv);
 
-#if CMK_SMP && CMK_USE_L2ATOMICS
-    CpvInitialize(L2AtomicQueue*, broadcast_q);
-    //Initialize broadcastq
-    L2AtomicQueue *l2q = malloc(sizeof(L2AtomicQueue));
-    CpvAccess(broadcast_q) = l2q;
-    L2AtomicQueueInit (l2atomicbuf + sizeof(L2AtomicState)*(_Cmi_mynodesize + CmiMyRank()),
-                      sizeof(L2AtomicState), l2q);
-#else
-    CpvInitialize(PCQueue, broadcast_q);
-    CpvAccess(broadcast_q) = PCQueueCreate();
-#endif
-
     //printf ("Before Converse Common Init\n");
     ConverseCommonInit(CmiMyArgv);
 
     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
 
+    //printf ("before calling CmiBarrier() \n");
     CmiBarrier();
 
     /* Converse initialization finishes, immediate messages can be processed.
        node barrier previously should take care of the node synchronization */
     _immediateReady = 1;
 
+    //printf("calling the startfn\n");
+
     if (!everReturn) {
       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
       if (Cmi_usrsched==0) CsdScheduler(-1);
       ConverseExit();
-    }
+    }    
 }
 
 void ConverseExit(void) {
@@ -742,7 +935,10 @@ void ConverseExit(void) {
     int rank0 = 0;
     if (CmiMyRank() == 0) {
         rank0 = 1;
-        //CmiFree(procState);
+        //free(procState);     
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+       CMI_Progress_finalize(0, cmi_pami_numcontexts);
+#endif
        PAMI_Context_destroyv(cmi_pami_contexts, cmi_pami_numcontexts);
        PAMI_Client_destroy(&cmi_pami_client);
     }
@@ -751,8 +947,10 @@ void ConverseExit(void) {
     //  CmiNodeAllBarrier ();
     //fprintf(stderr, "Before Exit\n");
 #if CMK_SMP
-    if (rank0)
-      exit(1);
+    if (rank0) {
+      Delay(100000);
+      exit(0);
+    }
     else
       pthread_exit(0);
 #else
@@ -776,20 +974,31 @@ void CmiAbort(const char * message) {
 
 #if CMK_NODE_QUEUE_AVAILABLE
 char *CmiGetNonLocalNodeQ(void) {
-    CmiState cs = CmiGetState();
+    //CmiState cs = CmiGetState();
     char *result = 0;
-    CmiIdleLock_checkMessage(&cs->idle);
-    if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
-        MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
+    //CmiIdleLock_checkMessage(&cs->idle);
 
-        if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
-            //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
-            result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
-            CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
-        }
+#if CMK_SMP && CMK_USE_L2ATOMICS
+    if (!L2AtomicQueueEmpty(&node_recv_atomic_q)) {
+      if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
+       result = (char*)L2AtomicDequeue(&node_recv_atomic_q);
+       CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
+      }
+    }
+#else
+    if (!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
+      MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
+      
+      if (CmiTryLock(CsvAccess(NodeState).CmiNodeRecvLock) == 0) {
+       //CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
+       result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
+       CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
+      }
 
-        MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
+      MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
     }
+#endif
+    
     return result;
 }
 #endif
@@ -799,14 +1008,16 @@ void *CmiGetNonLocal() {
 
     void *msg = NULL;
     CmiState cs = CmiGetState();
-    CmiIdleLock_checkMessage(&cs->idle);
+    //CmiIdleLock_checkMessage(&cs->idle);
 
 #if CMK_SMP && CMK_USE_L2ATOMICS
     msg = L2AtomicDequeue(&procState[CmiMyRank()].atomic_queue);
+#if !(CMK_ENABLE_ASYNC_PROGRESS)
     if (msg == NULL) {
       AdvanceCommunications();     
       msg = L2AtomicDequeue(&procState[CmiMyRank()].atomic_queue);
     }
+#endif
 #else
     if (PCQueueLength(cs->recv)==0)
       AdvanceCommunications();
@@ -814,19 +1025,19 @@ void *CmiGetNonLocal() {
     msg =  PCQueuePop(cs->recv);
 #endif
 
+    //if (msg != NULL)
+    //fprintf(stderr, "%d: Returning a message\n", CmiMyPe());
+
     return msg;
 }
 
 static void CmiSendSelf(char *msg) {
 #if CMK_IMMEDIATE_MSG
     if (CmiIsImmediate(msg)) {
-        /* CmiBecomeNonImmediate(msg); */
-        //printf("In SendSelf, N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
-        CmiPushImmediateMsg(msg);
-#if CMK_MULTICORE
-        CmiHandleImmediate();
-#endif
-        return;
+      //CmiLock(CsvAccess(NodeState).immRecvLock);
+      CmiHandleImmediateMessage(msg);
+      //CmiUnlock(CsvAccess(NodeState).immRecvLock);
+      return;
     }
 #endif
     
@@ -835,22 +1046,7 @@ static void CmiSendSelf(char *msg) {
 
 #if CMK_SMP
 static void CmiSendPeer (int rank, int size, char *msg) {
-#if CMK_BROADCAST_SPANNING_TREE
-    if (CMI_BROADCAST_ROOT(msg) != 0) {
-        char *copymsg;
-        copymsg = (char *)CmiAlloc(size);
-        CmiMemcpy(copymsg,msg,size);
-
-#if CMK_USE_L2ATOMICS
-       L2AtomicEnqueue(CpvAccessOther(broadcast_q, rank), copymsg);
-#else
-        CmiLock(procState[rank].bcastLock);
-        PCQueuePush(CpvAccessOther(broadcast_q, rank), copymsg);
-        CmiUnlock(procState[rank].bcastLock);
-#endif
-    }
-#endif
-    
+    //fprintf(stderr, "%d Send messages to peer\n", CmiMyPe());
     CmiPushPE (rank, msg);
 }
 #endif
@@ -864,11 +1060,10 @@ void CmiGeneralFreeSendN (int node, int rank, int size, char * msg, int to_lock)
  */
 void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
 
-  if (destPE < 0 || destPE > CmiNumPes ())
-    printf ("Sending to %d\n", destPE);
-
-  CmiAssert (destPE >= 0 && destPE < CmiNumPes());
-
+    if (destPE < 0 || destPE > CmiNumPes ())
+      printf ("Sending to %d\n", destPE);
+    
+    CmiAssert (destPE >= 0 && destPE < CmiNumPes());    
     CmiState cs = CmiGetState();
 
     if (destPE==cs->pe) {
@@ -879,59 +1074,105 @@ void  CmiGeneralFreeSend(int destPE, int size, char* msg) {
     CmiGeneralFreeSendN (CmiNodeOf (destPE), CmiRankOf (destPE), size, msg, 1);
 }
 
-void CmiGeneralFreeSendN (int node, int rank, int size, char * msg, int to_lock) {
 
-    //printf ("%d, %d: Sending Message to node %d rank %d \n", CmiMyPe(),
-    //  CmiMyNode(), node, rank);
+pami_result_t machine_send_handoff (pami_context_t context, void *msg);
+void  machine_send       (pami_context_t      context, 
+                         int                 node, 
+                         int                 rank, 
+                         int                 size, 
+                         char              * msg, 
+                         int                 to_lock)__attribute__((always_inline));
 
+void CmiGeneralFreeSendN(int node, int rank, int size, char * msg, int to_lock)
+{
 #if CMK_SMP
     CMI_DEST_RANK(msg) = rank;
-    //CMI_SET_CHECKSUM(msg, size);
-
     if (node == CmiMyNode()) {
         CmiSendPeer (rank, size, msg);
         return;
     }
 #endif
 
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+    int c = node % cmi_pami_numcontexts;
+    //int c = myrand(&r_seed) % cmi_pami_numcontexts;
+    pami_context_t my_context = cmi_pami_contexts[c];    
+    CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
+    hdr->dstnode = node;
+    hdr->rank    = rank;
+
+    PAMI_Context_post(my_context, (pami_work_t *)hdr->work, 
+                     machine_send_handoff, msg);
+#else
+    pami_context_t my_context = MY_CONTEXT();    
+    machine_send (my_context, node, rank, size, msg, to_lock);
+#endif
+}
+
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+pami_result_t machine_send_handoff (pami_context_t context, void *msg) {
+  CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
+  int node = hdr->dstnode;
+  int rank = hdr->rank;
+  int size = hdr->size;
+  
+  //As this is executed on the comm thread no locking is necessary
+  machine_send(context, node, rank, size, msg, 0);
+  return PAMI_SUCCESS;
+}
+#endif
+
+void  machine_send       (pami_context_t      context, 
+                         int                 node, 
+                         int                 rank, 
+                         int                 size, 
+                         char              * msg, 
+                         int                 to_lock) 
+{
+    CMI_DEST_RANK(msg) = rank;
+
     pami_endpoint_t target;
 #if CMK_PAMI_MULTI_CONTEXT
-    size_t dst_context = (rank != SMP_NODEMESSAGE) ? (rank>>LTPS) : 0;
+    //size_t dst_context = (rank != SMP_NODEMESSAGE) ? (rank>>LTPS) : (rand_r(&r_seed) % cmi_pami_numcontexts);
+    //Choose a context at random
+    size_t dst_context = myrand(&r_seed) % cmi_pami_numcontexts;
 #else
     size_t dst_context = 0;
 #endif
     PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)node, dst_context, &target);
-
-    //CmiAssert ((MY_CONTEXT_ID()) >= 0 &&
-    //        (MY_CONTEXT_ID()) < cmi_pami_numcontexts);
-
+    
     //fprintf (stderr, "Calling PAMI Send to %d magic %d size %d\n", node, CMI_MAGIC(msg), size);
-    if (size < 128) {
+    if (CMI_LIKELY(size < SHORT_CUTOFF)) {
       pami_send_immediate_t parameters;
+      
       parameters.dispatch        = CMI_PAMI_DISPATCH;
-      parameters.header.iov_base = NULL;
-      parameters.header.iov_len  = 0;
+      if ( CMI_LIKELY(CMI_BROADCAST_ROOT(msg) == 0))
+#if CMK_NODE_QUEUE_AVAILABLE
+       if ( CMI_LIKELY(rank != SMP_NODEMESSAGE) )
+#endif
+         //use short callback if not a bcast and not an SMP node message
+         parameters.dispatch        = CMI_PAMI_SHORT_DISPATCH;
+
+      parameters.header.iov_base = NULL; //&rank;
+      parameters.header.iov_len  = 0;    //sizeof(int);
       parameters.data.iov_base   = msg;
       parameters.data.iov_len    = size;
       parameters.dest = target;
       
-      pami_context_t my_context = MY_CONTEXT();
-      //CmiAssert (my_context != NULL);
-      
       if(to_lock)
-       PAMIX_CONTEXT_LOCK(my_context);
-
-      PAMI_Send_immediate (my_context, &parameters);
-
+       PAMIX_CONTEXT_LOCK(context);
+      
+      PAMI_Send_immediate (context, &parameters);
+      
       if(to_lock)
-       PAMIX_CONTEXT_UNLOCK(my_context);
+       PAMIX_CONTEXT_UNLOCK(context);
       CmiFree(msg);
     }
-    else {
+    else if (size < EAGER_CUTOFF) {
       pami_send_t parameters;
       parameters.send.dispatch        = CMI_PAMI_DISPATCH;
-      parameters.send.header.iov_base = NULL;
-      parameters.send.header.iov_len  = 0;
+      parameters.send.header.iov_base = NULL; //&rank;
+      parameters.send.header.iov_len  = 0;    //sizeof(int);
       parameters.send.data.iov_base   = msg;
       parameters.send.data.iov_len    = size;
       parameters.events.cookie        = msg;
@@ -940,15 +1181,35 @@ void CmiGeneralFreeSendN (int node, int rank, int size, char * msg, int to_lock)
       memset(&parameters.send.hints, 0, sizeof(parameters.send.hints));
       parameters.send.dest = target;
       
-      pami_context_t my_context = MY_CONTEXT();
-      //CmiAssert (my_context != NULL);
-      
       if (to_lock)
-       PAMIX_CONTEXT_LOCK(my_context);
+       PAMIX_CONTEXT_LOCK(context);
       INCR_MSGQLEN();
-      PAMI_Send (my_context, &parameters);
+      PAMI_Send (context, &parameters);
       if (to_lock)
-       PAMIX_CONTEXT_UNLOCK(my_context);
+       PAMIX_CONTEXT_UNLOCK(context);
+    }
+    else {
+      CmiPAMIRzv_t   rzv;
+      rzv.bytes       = size;
+      rzv.buffer      = msg;
+      rzv.offset      = (size_t)msg - (size_t)cmi_pami_memregion[0].baseVA;
+      rzv.dst_context = dst_context;
+
+      pami_send_immediate_t parameters;
+      parameters.dispatch        = CMI_PAMI_RZV_DISPATCH;
+      parameters.header.iov_base = &rzv;
+      parameters.header.iov_len  = sizeof(rzv);
+      parameters.data.iov_base   = &cmi_pami_memregion[0].mregion;      
+      parameters.data.iov_len    = sizeof(pami_memregion_t);
+      parameters.dest = target;
+      
+      if(to_lock)
+       PAMIX_CONTEXT_LOCK(context);
+      
+      PAMI_Send_immediate (context, &parameters);
+      
+      if(to_lock)
+       PAMIX_CONTEXT_UNLOCK(context);
     }
 }
 
@@ -979,17 +1240,14 @@ void CmiSyncSendFn1(int destPE, int size, char *msg) {
     copymsg = (char *)CmiAlloc(size);
     CmiMemcpy(copymsg, msg, size);
 
-    //  asm volatile("sync" ::: "memory");
-
     CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
     ((CmiMsgHeaderBasic *)copymsg)->size = size;
-    CMI_SET_CHECKSUM(copymsg, size);
-
+    CMI_SET_CHECKSUM(copymsg, size);    
     CmiGeneralFreeSend(destPE,size,copymsg);
 }
 
 /* send msg to its spanning children in broadcast. G. Zheng */
-void SendSpanningChildren(int size, char *msg) {
+void SendSpanningChildren(int size, char *msg, int from_rdone) {
     int startnode = CMI_BROADCAST_ROOT(msg)-1;
     int myrank = CMI_DEST_RANK(msg);
     int i;
@@ -1013,7 +1271,7 @@ void SendSpanningChildren(int size, char *msg) {
        ((CmiMsgHeaderBasic *)copymsg)->size = size;
        CMI_SET_CHECKSUM(copymsg, size);
        
-       CmiGeneralFreeSendN(p,0,size,copymsg,1);        
+       CmiGeneralFreeSendN(p,0,size,copymsg, !from_rdone);     
     }    
 
 #if CMK_SMP    
@@ -1049,7 +1307,7 @@ void CmiFreeBroadcastFn(int size, char *msg) {
 
     CMI_SET_BROADCAST_ROOT(msg, CmiMyNode()+1);
     CMI_DEST_RANK(msg) = CmiMyRank();
-    SendSpanningChildren(size, msg);
+    SendSpanningChildren(size, msg, 0);
     CmiFree(msg);
 #else
     int i;
@@ -1078,7 +1336,6 @@ void CmiFreeBroadcastAllFn(int size, char *msg) {
 
     CmiState cs = CmiGetState();
 #if CMK_BROADCAST_SPANNING_TREE
-
     //printf ("%d: Starting Spanning Tree Broadcast of size %d bytes\n", CmiMyPe(), size);
     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeBroadcastAllFn on comm thd on node %d\n", CmiMyNode());
 
@@ -1088,7 +1345,7 @@ void CmiFreeBroadcastAllFn(int size, char *msg) {
 
     CMI_SET_BROADCAST_ROOT(msg, CmiMyNode()+1);
     CMI_DEST_RANK(msg) = CmiMyRank();
-    SendSpanningChildren(size, msg);
+    SendSpanningChildren(size, msg, 0);
     CmiFree(msg);
 #else
     int i ;
@@ -1102,13 +1359,13 @@ void CmiFreeBroadcastAllFn(int size, char *msg) {
 #endif
 }
 
+#if !CMK_ENABLE_ASYNC_PROGRESS  
+//threads have to progress contexts themselves   
 void AdvanceCommunications() {
-
     pami_context_t my_context = MY_CONTEXT();
-   
+
 #if CMK_SMP
-    CmiAssert (my_context != NULL);
-    //PAMI_Context_trylock_advancev(&my_context, 1, 1);
+    //CmiAssert (my_context != NULL);
     if (PAMIX_CONTEXT_TRYLOCK(my_context))
     {
       PAMI_Context_advance(my_context, 1);
@@ -1117,41 +1374,32 @@ void AdvanceCommunications() {
 #else
     PAMI_Context_advance(my_context, 1);
 #endif
-
-#if CMK_SMP && CMK_USE_L2ATOMICS
-    if (!L2AtomicQueueEmpty(CpvAccess(broadcast_q)))
-#else        
-    if (PCQueueLength(CpvAccess(broadcast_q)) > 0)
-#endif
-      sendBroadcastMessages();
-#if CMK_NODE_QUEUE_AVAILABLE
-    if (PCQueueLength(CsvAccess(node_bcastq)) > 0)
-      sendBroadcastMessagesNode();
-#endif
-    
-    
-#if CMK_IMMEDIATE_MSG && CMK_MULTICORE
-    CmiHandleImmediate();
-#endif
-}
-
-#if 0
-static void SendMsgsUntil(int targetm) {
-
-    pami_context_t my_context = MY_CONTEXT();
-
-    while (MSGQLEN() > targetm) {
-#if CMK_SMP
-      PAMI_Context_trylock_advancev(&my_context, 1, 1);
-#else
-      PAMI_Context_advance(my_context, 1);
-#endif    
-    }
 }
 #endif
 
+
 void CmiNotifyIdle() {
   AdvanceCommunications();
+#if CMK_SMP && CMK_PAMI_MULTI_CONTEXT
+#if !CMK_ENABLE_ASYNC_PROGRESS && CMK_USE_L2ATOMICS
+  //Wait on the atomic queue to get a message with very low core
+  //overheads. One thread calls advance more frequently
+  if ((CmiMyRank()% THREADS_PER_CONTEXT) == 0)
+    //spin wait for 2-4us when idle
+    //process node queue messages every 10us
+    //Idle cores will only use one LMQ slot and an int sum
+    L2AtomicQueue2QSpinWait(&procState[CmiMyRank()].atomic_queue, 
+                           &node_recv_atomic_q,
+                           10);
+  else
+#endif
+#if CMK_USE_L2ATOMICS
+    //spin wait for 50-100us when idle waiting for a message
+    L2AtomicQueue2QSpinWait(&procState[CmiMyRank()].atomic_queue, 
+                           &node_recv_atomic_q,
+                           1000);
+#endif
+#endif
 }
 
 
@@ -1209,31 +1457,84 @@ void CmiSyncListSendFn(int npes, int *pes, int size, char *msg) {
     CmiFreeListSendFn(npes, pes, size, msg);
 }
 
-//#define OPTIMIZED_MULTICAST  0
+typedef struct ListMulticastVec_t {
+  int   *pes;
+  int    npes;
+  char  *msg;
+  int    size;
+} ListMulticastVec;
+
+void machineFreeListSendFn(pami_context_t    context, 
+                          int               npes, 
+                          int             * pes, 
+                          int               size, 
+                          char            * msg);
+
+pami_result_t machineFreeList_handoff(pami_context_t context, void *cookie)
+{
+  ListMulticastVec *lvec = (ListMulticastVec *) cookie;
+  machineFreeListSendFn(context, lvec->npes, lvec->pes, lvec->size, lvec->msg);
+  CmiFree(cookie);
+}
 
 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
+    //printf("%d: In Free List Send Fn imm %d\n", CmiMyPe(), CmiIsImmediate(msg));
 
     CMI_SET_BROADCAST_ROOT(msg,0);
     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
-    ((CmiMsgHeaderBasic *)msg)->size = size;
+    CmiMsgHeaderBasic *hdr = (CmiMsgHeaderBasic *)msg;
+    hdr->size = size;
     CMI_SET_CHECKSUM(msg, size);
 
-    //printf("CmiFreeListSendFn on node %d rank %d\n", CmiMyNode(), CmiMyRank());
-    //printf("%d: In Free List Send Fn\n", CmiMyPe());
-    int new_npes = 0;
-
-    int i, count = 0, my_loc = -1;
-    for (i=0; i<npes; i++) {
-        if (CmiNodeOf(pes[i]) == CmiMyNode()) 
-            CmiSyncSend(pes[i], size, msg);
+    //Fast path
+    if (npes == 1) {
+      CmiGeneralFreeSendN(CmiNodeOf(pes[0]), CmiRankOf(pes[0]), size, msg, 1);
+      return;
     }
 
     pami_context_t my_context = MY_CONTEXT();
-    PAMIX_CONTEXT_LOCK(my_context);
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+    ListMulticastVec *lvec = (ListMulticastVec *) 
+      CmiAlloc(sizeof(ListMulticastVec) + sizeof(int)*npes);
+    lvec->pes = (int*)((char*)lvec + sizeof(ListMulticastVec));
+    int i = 0;
+    for (i=0; i<npes; i++) 
+      lvec->pes[i] = pes[i];
+    lvec->npes = npes;
+    lvec->msg  = msg;
+    lvec->size = size;
+    PAMI_Context_post(my_context, (pami_work_t*)hdr->work, 
+                     machineFreeList_handoff, lvec);
+#else
+    machineFreeListSendFn(my_context, npes, pes, size, msg);
+#endif
+}
 
+void machineFreeListSendFn(pami_context_t my_context, int npes, int *pes, int size, char *msg) {
+    int i;
     char *copymsg;
+#if CMK_SMP
+    for (i=0; i<npes; i++) {
+      if (CmiNodeOf(pes[i]) == CmiMyNode()) {
+       //CmiSyncSend(pes[i], size, msg);
+       copymsg = (char *)CmiAlloc(size);
+       CmiAssert(copymsg != NULL);
+       CmiMemcpy(copymsg,msg,size);      
+       CmiSendPeer(CmiRankOf(pes[i]), size, copymsg);
+      }
+    }
+#else
+    for (i=0; i<npes; i++) {
+      if (CmiNodeOf(pes[i]) == CmiMyNode()) {
+       CmiSyncSend(pes[i], size, msg);
+      }
+    }
+#endif
+
+    PAMIX_CONTEXT_LOCK(my_context);
+    
     for (i=0;i<npes;i++) {
-      if (CmiNodeOf(pes[i]) == CmiMyNode());
+        if (CmiNodeOf(pes[i]) == CmiMyNode());
         else if (i < npes - 1) {
 #if !CMK_SMP
            CmiReference(msg);
@@ -1243,13 +1544,14 @@ void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
            CmiAssert(copymsg != NULL);
            CmiMemcpy(copymsg,msg,size);
 #endif
-            CmiGeneralFreeSendN(CmiNodeOf(pes[i]), CmiRankOf(pes[i]), size, copymsg, 0);
+           CmiGeneralFreeSendN(CmiNodeOf(pes[i]), CmiRankOf(pes[i]), size, copymsg, 0);
+           //machine_send(my_context, CmiNodeOf(pes[i]), CmiRankOf(pes[i]), size, copymsg, 0);
         }
     }
 
     if (npes  && CmiNodeOf(pes[npes-1]) != CmiMyNode())
-      //CmiSyncSendAndFree(pes[npes-1], size, msg); //Sameto CmiFreeSendFn
       CmiGeneralFreeSendN(CmiNodeOf(pes[npes-1]), CmiRankOf(pes[npes-1]), size, msg, 0);
+      //machine_send(my_context, CmiNodeOf(pes[npes-1]), CmiRankOf(pes[npes-1]), size, msg, 0);      
     else
       CmiFree(msg);    
 
@@ -1351,48 +1653,71 @@ void CmiProbeImmediateMsg();
 extern int CmiBarrier() {
   CmiNodeBarrier();
   if (CmiMyRank() == 0)
-    CmiNetworkBarrier();
+    CmiNetworkBarrier(1);
   CmiNodeBarrier();
   return 0;
 }
 
-static void CmiNetworkBarrier() {
-    //mysleep(1000000000UL);
 
-    pami_result_t result;
-    pami_barrier_flag = 1;
-    pami_context_t my_context = cmi_pami_contexts[0];
-    PAMIX_CONTEXT_LOCK(my_context);
-    result = PAMI_Collective(my_context, &pami_barrier);   
-    PAMIX_CONTEXT_UNLOCK(my_context);
-    
+static pami_result_t machine_network_barrier(pami_context_t   my_context, 
+                                            int              to_lock) 
+{
+    pami_result_t result = PAMI_SUCCESS;    
+    if (to_lock)
+      PAMIX_CONTEXT_LOCK(my_context);    
+    result = PAMI_Collective(my_context, &pami_barrier);       
+    if (to_lock)
+      PAMIX_CONTEXT_UNLOCK(my_context);
+
     if (result != PAMI_SUCCESS)
-    {
       fprintf (stderr, "Error. Unable to issue  collective. result = %d\n", result);
-      return;
+
+    return result;
+}
+
+pami_result_t network_barrier_handoff(pami_context_t context, void *msg)
+{
+  return machine_network_barrier(context, 0);
+}
+
+static void CmiNetworkBarrier(int async) {
+    pami_context_t my_context = cmi_pami_contexts[0];
+    pami_barrier_flag = 1;
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+    if (async) {
+      pami_work_t work;
+      PAMI_Context_post(my_context, &work, network_barrier_handoff, NULL);
+      while (pami_barrier_flag);
+      //fprintf (stderr, "After Network Barrier\n");
+    }
+    else 
+#endif
+    {
+      machine_network_barrier(my_context, 1);    
+      PAMIX_CONTEXT_LOCK(my_context);
+      while (pami_barrier_flag)
+       PAMI_Context_advance (my_context, 100);
+      PAMIX_CONTEXT_UNLOCK(my_context);
     }
-    
-    PAMIX_CONTEXT_LOCK(my_context);
-    while (pami_barrier_flag)
-      result = PAMI_Context_advance (my_context, 100);
-    PAMIX_CONTEXT_UNLOCK(my_context);
 }
 
 #if CMK_NODE_QUEUE_AVAILABLE
 static void CmiSendNodeSelf(char *msg) {
 #if CMK_IMMEDIATE_MSG
     if (CmiIsImmediate(msg)) {
-        //printf("SendNodeSelf: N[%d]P[%d]R[%d] received an imm msg with hdl: %p\n", CmiMyNode(), CmiMyPe(), CmiMyRank(), CmiGetHandler(msg));
-        CmiPushImmediateMsg(msg);
-#if CMK_MULTICORE
-        CmiHandleImmediate();
-#endif
-        return;
+      //CmiLock(CsvAccess(NodeState).immRecvLock);
+      CmiHandleImmediateMessage(msg);
+      //CmiUnlock(CsvAccess(NodeState).immRecvLock);
+      return;
     }
 #endif    
+#if CMK_SMP && CMK_USE_L2ATOMICS
+    L2AtomicEnqueue(&node_recv_atomic_q, msg);    
+#else
     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
+#endif
 }
 
 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg) {
@@ -1407,7 +1732,7 @@ void CmiFreeNodeSendFn(int node, int size, char *msg) {
     CMI_SET_CHECKSUM(msg, size);
     //if(CmiMyRank()==CmiMyNodeSize()) printf("CmiFreeNodeSendFn on comm thd on node %d\n", CmiMyNode());
     
-    CQdCreate(CpvAccess(cQdState), 1);
+    CQdCreate(CpvAccessOther(cQdState, CmiMyRank()), 1);
 
     if (node == _Cmi_mynode) {
         CmiSendNodeSelf(msg);
@@ -1428,7 +1753,7 @@ CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m) {
     return NULL;
 }
 
-void SendSpanningChildrenNode(int size, char *msg) {
+void SendSpanningChildrenNode(int size, char *msg, int from_rdone) {
     int startnode = -CMI_BROADCAST_ROOT(msg)-1;
     //printf("on node %d rank %d, send node spanning children with root %d\n", CmiMyNode(), CmiMyRank(), startnode);
     assert(startnode>=0 && startnode<CmiNumNodes());
@@ -1445,7 +1770,7 @@ void SendSpanningChildrenNode(int size, char *msg) {
         char *dupmsg = (char *)CmiAlloc(size);
         CmiMemcpy(dupmsg,msg,size);
         //printf("In SendSpanningChildrenNode, sending bcast msg (root %d) from node %d to node %d\n", startnode, CmiMyNode(), nid);
-        CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg,1);
+        CmiGeneralFreeSendN(nid, SMP_NODEMESSAGE, size, dupmsg,!from_rdone);
     }
 }
 
@@ -1465,7 +1790,7 @@ void CmiFreeNodeBroadcastFn(int s, char *m) {
     CMI_SET_CHECKSUM(m, s);
     //printf("In CmiFreeNodeBroadcastFn, sending bcast msg from root node %d\n", CMI_BROADCAST_ROOT(m));
 
-    SendSpanningChildrenNode(s, m);
+    SendSpanningChildrenNode(s, m, 0);
 #else
     int i;
     for (i=0; i<CmiNumNodes(); i++) {
@@ -1518,7 +1843,88 @@ CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m) {
 #endif //end of CMK_NODE_QUEUE_AVAILABLE
 
 
-//void bzero (void *__s, size_t __n) {
-//  memset(__s, 0, __n);
-//}
+static void  sendAck (pami_context_t      context,
+                     CmiPAMIRzvRecv_t      *recv) 
+{
+  pami_send_immediate_t parameters;
+  parameters.dispatch        = CMI_PAMI_ACK_DISPATCH;
+  parameters.header.iov_base = &recv->src_buffer; 
+  parameters.header.iov_len  = sizeof(void *);    
+  parameters.data.iov_base   = NULL;
+  parameters.data.iov_len    = 0;
+  parameters.dest            = recv->src_ep;
+  
+  //Called from advance and hence we dont need a mutex
+  PAMI_Send_immediate (context, &parameters);
+}
+
+
+void rzv_recv_done   (pami_context_t     ctxt, 
+                     void             * clientdata, 
+                     pami_result_t      result) 
+{
+  CmiPAMIRzvRecv_t recv = *(CmiPAMIRzvRecv_t *)clientdata;
+  recv_done(ctxt, recv.msg, PAMI_SUCCESS);
+  sendAck(ctxt, &recv);
+}
+
+void rzv_pkt_dispatch (pami_context_t       context,   
+                      void               * clientdata,
+                      const void         * header_addr,
+                      size_t               header_size,
+                      const void         * pipe_addr,  
+                      size_t               pipe_size,  
+                      pami_endpoint_t      origin,
+                      pami_recv_t         * recv) 
+{
+  INCR_ORECVS();    
+  
+  CmiPAMIRzv_t  *rzv_hdr = (CmiPAMIRzv_t *) header_addr;
+  CmiAssert (header_size == sizeof(CmiPAMIRzv_t));  
+  int alloc_size = rzv_hdr->bytes;
+  char * buffer  = (char *)CmiAlloc(alloc_size + sizeof(CmiPAMIRzvRecv_t));
+  //char *buffer=(char*)CmiAlloc(alloc_size+sizeof(CmiPAMIRzvRecv_t)+sizeof(int))
+  //*(int *)(buffer+alloc_size) = *(int *)header_addr;  
+  CmiAssert (recv == NULL);
+
+  CmiPAMIRzvRecv_t *rzv_recv = (CmiPAMIRzvRecv_t *)(buffer+alloc_size);
+  rzv_recv->msg        = buffer;
+  rzv_recv->src_ep     = origin;
+  rzv_recv->src_buffer = rzv_hdr->buffer;
+  
+  CmiAssert (pipe_addr != NULL);
+  pami_memregion_t *mregion = (pami_memregion_t *) pipe_addr;
+  CmiAssert (pipe_size == sizeof(pami_memregion_t));
+
+  //Rzv inj fifos are on the 17th core shared by all contexts
+  pami_rget_simple_t  rget;
+  rget.rma.dest    = origin;
+  rget.rma.bytes   = rzv_hdr->bytes;
+  rget.rma.cookie  = rzv_recv;
+  rget.rma.done_fn = rzv_recv_done;
+  rget.rma.hints.buffer_registered = PAMI_HINT_ENABLE;
+  rget.rma.hints.use_rdma = PAMI_HINT_ENABLE;
+  rget.rdma.local.mr      = &cmi_pami_memregion[rzv_hdr->dst_context].mregion;  
+  rget.rdma.local.offset  = (size_t)buffer - 
+    (size_t)cmi_pami_memregion[rzv_hdr->dst_context].baseVA;
+  rget.rdma.remote.mr     = mregion; //from message payload
+  rget.rdma.remote.offset = rzv_hdr->offset;
+
+  //printf ("starting rget\n");
+  PAMI_Rget (context, &rget);  
+}
+
+void ack_pkt_dispatch (pami_context_t       context,   
+                      void               * clientdata,
+                      const void         * header_addr,
+                      size_t               header_size,
+                      const void         * pipe_addr,  
+                      size_t               pipe_size,  
+                      pami_endpoint_t      origin,
+                      pami_recv_t         * recv) 
+{
+  char **buf = (char **)header_addr;
+  CmiFree (*buf);
+}
 
+#include "cmimemcpy_qpx.h"
diff --git a/src/arch/pami/manytomany.c b/src/arch/pami/manytomany.c
new file mode 100644 (file)
index 0000000..26862bb
--- /dev/null
@@ -0,0 +1,534 @@
+
+#include "converse.h"
+#include "cmidirectmanytomany.h"
+#define MAX_CONN   8
+
+#define M2M_PAMI_S8DISPATCH 13
+#define M2M_PAMI_SDISPATCH  14
+#define M2M_PAMI_DISPATCH   15
+
+typedef struct _pami_m2mhdr {
+  int8_t    dstrank;
+  int8_t    connid;
+  int32_t   srcindex;
+} PAMI_M2mHeader; 
+
+typedef struct _pami_m2m_work {
+  pami_work_t    work;
+  int            start;
+  int            end;
+  void         * handle;
+  pami_context_t context;
+} PAMI_M2mWork_t;
+
+typedef struct _m2m_completionmsg {
+  char  hdr [CmiMsgHeaderSizeBytes];
+  void  *handle;
+  int    rank;
+} M2mCompletionMsg;
+
+#define MAX_NWORK 8
+
+typedef struct _pami_cmidhandle {
+  int                   myrank;
+  unsigned              m2m_rcvcounter ;
+  unsigned              m2m_nzrcvranks;  
+  char                * m2m_rcvbuf     ;
+  unsigned            * m2m_rcvlens    ;
+  unsigned            * m2m_rdispls    ;
+
+  unsigned              m2m_nsndranks;
+  unsigned              m2m_srankIndex;                      
+  char                * m2m_sndbuf     ;
+  unsigned            * m2m_sndlens    ;
+  unsigned            * m2m_sdispls    ;
+  unsigned              m2m_sndcounter ;
+  unsigned            * m2m_permutation;
+  unsigned            * m2m_lranks     ;
+  pami_endpoint_t     * m2m_node_eps;
+
+  PAMI_M2mWork_t        swork[MAX_NWORK];  
+  int                   n_work;
+
+  CmiDirectM2mHandler   m2m_rdone;
+  void                * m2m_rdonecontext;
+  PAMI_M2mHeader      * m2m_hdrs;
+  M2mCompletionMsg      cmsg;
+
+  unsigned              m2m_ntotalrcvranks;
+  unsigned              m2m_initialized;  
+  unsigned              m2m_rrankIndex; 
+  CmiDirectM2mHandler   m2m_sdone;
+  void                * m2m_sdonecontext;
+} PAMICmiDirectM2mHandle;  
+
+CpvDeclare(PAMICmiDirectM2mHandle*, _handle);
+CpvDeclare(int, _completion_handler);
+
+static void m2m_recv_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
+{
+  PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
+  //acquire lock if processed by many comm threads and contexts?
+  handle->m2m_rcvcounter ++;
+    
+  if (handle->m2m_rcvcounter == handle->m2m_nzrcvranks) {
+    //printf ("Calling manytomany rdone for handle %p on rank %d counter %d nexp %d\n", 
+    //    handle, CmiMyPe(),
+    //    handle->m2m_rcvcounter, handle->m2m_nzrcvranks);
+    handle->m2m_rcvcounter = 0;
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+    //Called from comm thread
+    CmiSendPeer (handle->myrank, sizeof(M2mCompletionMsg), (char*)&handle->cmsg);
+#else
+    if (handle->m2m_rdone)
+      handle->m2m_rdone(handle->m2m_rdonecontext);
+#endif
+  }
+}
+
+static void m2m_send_done(pami_context_t ctxt, void *clientdata, pami_result_t result) 
+{
+  PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)clientdata;  
+  //acquire lock if processed by many comm threads and contexts?
+  handle->m2m_sndcounter ++;
+  if (handle->m2m_sndcounter == handle->m2m_nsndranks) {
+    //if in comm thread send a converse message
+    //else
+    handle->m2m_sndcounter = 0;
+    if (handle->m2m_sdone)
+      handle->m2m_sdone(handle->m2m_sdonecontext); 
+  }
+}
+
+static void m2m_rdone_mainthread (void *m) {
+  M2mCompletionMsg *msg = (M2mCompletionMsg *) m;
+  PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)msg->handle;
+  if (handle->m2m_rdone)
+    handle->m2m_rdone(handle->m2m_rdonecontext);
+}
+
+static void m2m_s8_dispatch (pami_context_t       context,  
+                            void               * clientdata,
+                            const void         * header_addr, 
+                            size_t               header_size, 
+                            const void         * pipe_addr,   
+                            size_t               pipe_size,   
+                            pami_endpoint_t      origin,
+                            pami_recv_t         * recv)       
+{
+  PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
+  PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);  
+  PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
+  char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
+
+  //Copy 8 bytes
+  *(uint64_t *)buffer = *(uint64_t*)pipe_addr;
+  m2m_recv_done (context, handle, PAMI_SUCCESS);
+}
+
+
+static void m2m_spkt_dispatch (pami_context_t       context,  
+                             void               * clientdata,
+                             const void         * header_addr, 
+                             size_t               header_size, 
+                             const void         * pipe_addr,   
+                             size_t               pipe_size,   
+                             pami_endpoint_t      origin,
+                             pami_recv_t         * recv)       
+{
+  PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
+  PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);   
+  PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
+
+  char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
+  memcpy (buffer, pipe_addr, pipe_size);
+  m2m_recv_done (context, handle, PAMI_SUCCESS);
+}
+
+
+
+static void m2m_pkt_dispatch (pami_context_t       context,  
+                             void               * clientdata,
+                             const void         * header_addr, 
+                             size_t               header_size, 
+                             const void         * pipe_addr,   
+                             size_t               pipe_size,   
+                             pami_endpoint_t      origin,
+                             pami_recv_t         * recv)       
+{
+  PAMI_M2mHeader *hdr = (PAMI_M2mHeader *) header_addr;
+
+  //CmiAssert (hdr->dstrank < CmiMyNodeSize());
+  //CmiAssert (hdr->connid  < MAX_CONN);
+
+  PAMICmiDirectM2mHandle *handlevec = CpvAccessOther(_handle, hdr->dstrank);
+  //CmiAssert (handlevec != NULL);
+  
+  //fprintf(stderr, "m2m_pkt_dispatch: mype %d connid %d dstrank %d handlevec %p\n",
+  //  CmiMyPe(), hdr->connid, hdr->dstrank, handlevec);
+  
+  PAMICmiDirectM2mHandle *handle = &handlevec[hdr->connid];
+
+  char *buffer = handle->m2m_rcvbuf + handle->m2m_rdispls[hdr->srcindex];
+
+  if (recv) {
+    recv->local_fn = m2m_recv_done;
+    recv->cookie   = handle;
+    recv->type     = PAMI_TYPE_BYTE;
+    recv->addr     = buffer;
+    recv->offset   = 0;
+    recv->data_fn  = PAMI_DATA_COPY;
+  }
+  else {
+    memcpy (buffer, pipe_addr, pipe_size);
+    m2m_recv_done (context, handle, PAMI_SUCCESS);
+  }
+}
+
+
+void * CmiDirect_manytomany_allocate_handle () {  
+  if (!CpvInitialized(_handle))
+    CpvInitialize(PAMICmiDirectM2mHandle*, _handle);
+  if (!CpvInitialized(_completion_handler))
+    CpvInitialize(int, _completion_handler);  
+  ppc_msync();
+  
+  if (CpvAccess(_handle) == NULL) {
+    CpvAccess(_handle) = (PAMICmiDirectM2mHandle *)malloc (MAX_CONN *sizeof(PAMICmiDirectM2mHandle));
+    memset (CpvAccess(_handle),0,MAX_CONN*sizeof (PAMICmiDirectM2mHandle));
+    CpvAccess(_completion_handler) = CmiRegisterHandler(m2m_rdone_mainthread);
+  }
+  
+  //printf ("allocate_handle on rank %d %p\n", CmiMyPe(), CpvAccess(_handle));
+  return CpvAccess(_handle);
+}
+
+
+void   CmiDirect_manytomany_initialize_recvbase(void                 * h,
+                                               unsigned               tag,
+                                               CmiDirectM2mHandler    donecb,
+                                               void                 * context,
+                                               char                 * rcvbuf,
+                                               unsigned               nranks,
+                                               unsigned               myIdx )
+{
+  PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
+  //PAMICmiDirectM2mHandle *handle = &(CpvAccess(_handle)[tag]);
+
+  //printf ("manytomany recvbase on rank %d handle %p conn %d nranks %d\n", 
+  //  CmiMyPe(), handle, tag, nranks);
+
+  handle->myrank = CmiMyRank();
+  handle->cmsg.handle = handle;
+  CmiSetHandler (&handle->cmsg, CpvAccess(_completion_handler));
+
+  handle->m2m_initialized = 1;
+  assert ( tag < MAX_CONN  );
+  handle->m2m_rcvbuf   = rcvbuf;
+
+  handle->m2m_rdone        = donecb;
+  handle->m2m_rdonecontext = context;
+  handle->m2m_ntotalrcvranks    = nranks;
+  
+  //Receiver is not sender
+  //if (myIdx == (unsigned)-1) 
+  //(handle->m2m_ntotalrcvranks)++;
+    
+  handle->m2m_rcvlens   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
+  handle->m2m_rdispls   = malloc (sizeof(int) * handle->m2m_ntotalrcvranks);
+  
+  assert (handle->m2m_rcvlens != NULL);
+  
+  memset (handle->m2m_rcvlens, 0, handle->m2m_ntotalrcvranks * sizeof(int));
+  memset (handle->m2m_rdispls, 0, handle->m2m_ntotalrcvranks * sizeof(int));
+  
+  //Receiver is not sender
+  //if (myIdx == (unsigned)-1) {
+  //Receiver doesnt send any data
+  //  myIdx =     handle->m2m_ntotalrcvranks - 1;
+  //CmiDirect_manytomany_initialize_recv (h, tag,  myIdx, 0, 0, CmiMyPe());
+  //}
+  handle->m2m_rrankIndex = myIdx;
+}
+
+void   CmiDirect_manytomany_initialize_recv ( void          * h,
+                                             unsigned        tag,
+                                             unsigned        idx,
+                                             unsigned        displ,
+                                             unsigned        bytes,
+                                             unsigned        rank )
+{
+  PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
+  assert ( tag < MAX_CONN  );
+  
+  if (handle->m2m_rcvlens[idx] == 0 && bytes > 0)
+    handle->m2m_nzrcvranks ++;
+
+  handle->m2m_rcvlens  [idx]   = bytes;
+  handle->m2m_rdispls  [idx]   = displ;
+}
+
+
+void   CmiDirect_manytomany_initialize_sendbase( void                 * h,
+                                                unsigned               tag,
+                                                CmiDirectM2mHandler    donecb,
+                                                void                 * context,
+                                                char                 * sndbuf,
+                                                unsigned               nranks,
+                                                unsigned               myIdx )
+{
+  PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
+  assert ( tag < MAX_CONN  );
+  handle->m2m_sndbuf       = sndbuf;
+  handle->m2m_sdone        = donecb;
+  handle->m2m_sdonecontext = context;
+  
+  handle->m2m_nsndranks    = nranks;
+  handle->m2m_srankIndex   = myIdx;  
+  handle->m2m_sndlens      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
+  handle->m2m_sdispls      = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
+  handle->m2m_lranks       = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
+  handle->m2m_node_eps     = (pami_endpoint_t *) malloc (sizeof(pami_endpoint_t) * nranks);
+  handle->m2m_permutation  = (unsigned int *) malloc (sizeof(unsigned int) * nranks);
+  handle->m2m_hdrs = (PAMI_M2mHeader *) malloc(sizeof(PAMI_M2mHeader) * nranks);
+
+  memset (handle->m2m_sndlens,    0, nranks * sizeof(int));
+  memset (handle->m2m_sdispls,    0, nranks * sizeof(int));
+  memset (handle->m2m_lranks,     0, nranks * sizeof(int));
+  memset (handle->m2m_node_eps,   0, nranks * sizeof(pami_endpoint_t));
+  memset (handle->m2m_permutation,0, nranks * sizeof(int));  
+
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+  //we have a completion callback
+  if (handle->m2m_sdone != NULL) {
+    handle->swork[0].start = 0;
+    handle->swork[0].end   = handle->m2m_nsndranks;   
+    handle->swork[0].handle = handle;
+    handle->n_work = 1;
+
+    int context_id = MY_CONTEXT_ID();
+    context_id ++;
+    if (context_id >= cmi_pami_numcontexts)
+      context_id = 0;        
+    pami_context_t context = cmi_pami_contexts[context_id];    
+    handle->swork[0].context = context;
+  }
+  else {
+    int i = 0;
+    int context_id = MY_CONTEXT_ID();
+    pami_context_t context = NULL;
+    int start = 0, nranks = 0;
+    int ncontexts = cmi_pami_numcontexts;
+    if (ncontexts > MAX_NWORK)
+      ncontexts = MAX_NWORK;
+    if (ncontexts > handle->m2m_nsndranks)
+      ncontexts = handle->m2m_nsndranks;
+    handle->n_work = ncontexts;
+
+    nranks = handle->m2m_nsndranks / ncontexts;   
+    for (i = 0; i < ncontexts; ++i) {
+      handle->swork[i].start  = start;
+      handle->swork[i].end    = start + nranks;   
+      handle->swork[i].handle = handle;
+      start += nranks;
+      if (i == ncontexts - 1)
+       handle->swork[i].end  = handle->m2m_nsndranks;
+
+      context_id ++;
+      if (context_id >= cmi_pami_numcontexts)
+       context_id = 0;       
+      context = cmi_pami_contexts[context_id];
+      handle->swork[i].context = context;
+    }
+  }
+#else
+  PAMIX_CONTEXT_LOCK(MY_CONTEXT());
+  handle->swork[0].start = 0;
+  handle->swork[0].end   = handle->m2m_nsndranks;   
+  handle->swork[0].handle = handle;
+  handle->n_work = 1;
+  handle->swork[0].context = MY_CONTEXT();
+  PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
+#endif
+}
+
+#define PRIME_A  3010349UL
+#define PRIME_B  3571UL
+
+void   CmiDirect_manytomany_initialize_send ( void        * h,
+                                             unsigned      tag, 
+                                             unsigned      idx,
+                                             unsigned      displ,
+                                             unsigned      bytes,
+                                             unsigned      pe )
+{
+  PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
+  assert ( tag < MAX_CONN  );  
+  handle->m2m_sndlens    [idx]   = bytes;
+  handle->m2m_sdispls    [idx]   = displ;
+  
+  int lrank                      = CmiRankOf(pe);
+  handle->m2m_lranks     [idx]   = lrank;
+  
+  pami_endpoint_t target;
+  //get the destination context
+#if CMK_PAMI_MULTI_CONTEXT 
+  size_t dst_context = (lrank>>LTPS);
+#else
+  size_t dst_context = 0;
+#endif
+  PAMI_Endpoint_create (cmi_pami_client, (pami_task_t)CmiNodeOf(pe), 
+                       dst_context, &target);
+  handle->m2m_node_eps   [idx]   = target;
+
+  //uint64_t p_rand = ((uint64_t)idx+1)*PRIME_A + PRIME_B*(CmiMyPe()+1);
+  unsigned seed = CmiMyPe()+1;
+  //start at a random location and move linearly from there
+  uint64_t p_rand = rand_r(&seed) + idx + 1;
+  //uint64_t p_rand = (uint64_t)idx + 1 + CmiMyPe();
+  //uint64_t p_rand   =  idx + 1;
+  handle->m2m_permutation[idx]   = (uint32_t)(p_rand%handle->m2m_nsndranks);
+  handle->m2m_hdrs[idx].connid   = tag;  
+  handle->m2m_hdrs[idx].dstrank  = lrank; 
+  handle->m2m_hdrs[idx].srcindex = handle->m2m_srankIndex;
+}
+
+static void  _internal_machine_send   ( pami_context_t      context, 
+                                       pami_endpoint_t     target_ep, 
+                                       int                 rank, 
+                                       int                 hdrsize,
+                                       char              * hdr,
+                                       int                 size, 
+                                       char              * msg,
+                                       pami_event_function cb_done,
+                                       void              * cd)
+{
+  if (size < 128) {
+    pami_send_immediate_t parameters;
+    parameters.dispatch        = (size == 8)? M2M_PAMI_S8DISPATCH : M2M_PAMI_SDISPATCH;
+    //parameters.dispatch        = M2M_PAMI_SDISPATCH;
+    parameters.header.iov_base = hdr;
+    parameters.header.iov_len  = hdrsize;
+    parameters.data.iov_base   = msg;
+    parameters.data.iov_len    = size;
+    parameters.dest            = target_ep;
+    
+    PAMI_Send_immediate (context, &parameters);
+    //if (cb_done)
+    //cb_done (context, cd, PAMI_SUCCESS);
+  }
+  else {
+    pami_send_t parameters;
+    parameters.send.dispatch        = M2M_PAMI_DISPATCH;
+    parameters.send.header.iov_base = hdr;
+    parameters.send.header.iov_len  = hdrsize;
+    parameters.send.data.iov_base   = msg;
+    parameters.send.data.iov_len    = size;
+    parameters.events.cookie        = cd;
+    parameters.events.local_fn      = cb_done;
+    parameters.events.remote_fn     = NULL;
+    memset(&parameters.send.hints, 0, sizeof(parameters.send.hints));
+    parameters.send.dest            = target_ep;
+    
+    PAMI_Send (context, &parameters);
+  }
+}
+
+pami_result_t   _cmidirect_m2m_send_post_handler (pami_context_t     context,
+                                                 void             * cd) 
+{
+  PAMI_M2mWork_t  *work = (PAMI_M2mWork_t *) cd;
+  PAMICmiDirectM2mHandle *handle = (PAMICmiDirectM2mHandle *)work->handle;
+  
+  int i = 0;
+  int pidx = 0;
+  char *buffer = NULL;
+  int bytes = NULL;
+
+  pami_event_function cb_done = m2m_send_done;
+  void *clientdata = handle;
+
+  if (handle->m2m_sdone == NULL) {
+    cb_done     = NULL;
+    clientdata  = NULL;
+  }
+
+  for (i = work->start; i < work->end; ++i) {
+    pidx   = handle->m2m_permutation[i];
+    buffer = handle->m2m_sndbuf + handle->m2m_sdispls[pidx];
+    bytes  = handle->m2m_sndlens[pidx];
+    
+    _internal_machine_send(context,
+                          handle->m2m_node_eps[pidx],
+                          handle->m2m_lranks[pidx],
+                          sizeof(PAMI_M2mHeader),
+                          (char*)&(handle->m2m_hdrs[pidx]),
+                          bytes, 
+                          buffer,
+                          cb_done,
+                          clientdata);
+  }  
+
+  return PAMI_SUCCESS;
+}
+
+
+void _cmidirect_m2m_initialize (pami_context_t *contexts, int nc) {
+  pami_dispatch_hint_t options = (pami_dispatch_hint_t) {0};
+  pami_dispatch_callback_function pfn;
+  int i = 0;
+  for (i = 0; i < nc; ++i) {
+    pfn.p2p = m2m_pkt_dispatch;
+    PAMI_Dispatch_set (contexts[i],
+                      M2M_PAMI_DISPATCH,
+                      pfn,
+                      NULL,
+                      options);
+
+    pfn.p2p = m2m_spkt_dispatch;
+    PAMI_Dispatch_set (contexts[i],
+                      M2M_PAMI_SDISPATCH,
+                      pfn,
+                      NULL,
+                      options);
+
+    pfn.p2p = m2m_s8_dispatch;
+    PAMI_Dispatch_set (contexts[i],
+                      M2M_PAMI_S8DISPATCH,
+                      pfn,
+                      NULL,
+                      options);
+  }
+}
+
+
+void   CmiDirect_manytomany_start ( void       * h,
+                                   unsigned     tag ) {
+  PAMICmiDirectM2mHandle *handle = &(((PAMICmiDirectM2mHandle *) h)[tag]);
+  assert (tag < MAX_CONN);
+
+  //printf ("Calling manytomany_start for conn %d handle %p on rank %d\n", tag, 
+  //  handle, CmiMyPe());
+  
+#if CMK_SMP && CMK_ENABLE_ASYNC_PROGRESS
+  //we have a completion callback
+  if (handle->m2m_sdone != NULL) {
+    PAMI_Context_post ( handle->swork[0].context, 
+                      &handle->swork[0].work, 
+                      _cmidirect_m2m_send_post_handler,
+                      &handle->swork[0]);
+  }
+  else {
+    for (int i = 0; i < handle->n_work; ++i) {
+      PAMI_Context_post( handle->swork[i].context, 
+                       &handle->swork[i].work, 
+                       _cmidirect_m2m_send_post_handler,
+                       &handle->swork[i]);
+    }
+  }
+#else
+  PAMIX_CONTEXT_LOCK(MY_CONTEXT());
+  _cmidirect_m2m_send_post_handler (MY_CONTEXT(), &handle->swork[0]);
+  PAMIX_CONTEXT_UNLOCK(MY_CONTEXT());
+#endif
+}
index b7085921b947441cd939ea30585ca666cbacc20b..ba407e5dc9671d910443e25d821155eeeed2fc3d 100644 (file)
@@ -68,6 +68,7 @@ void* CkCopyMsg(void **pMsg)
   register int size = UsrToEnv(srcMsg)->getTotalsize();
   register envelope *newenv = (envelope *) CmiAlloc(size);
   CmiMemcpy(newenv, UsrToEnv(srcMsg), size);
+  //memcpy(newenv, UsrToEnv(srcMsg), size);
   if(UsrToEnv(srcMsg)->isPacked() && _msgTable[msgidx]->unpack) {
     srcMsg = _msgTable[msgidx]->unpack(srcMsg);
     UsrToEnv(srcMsg)->setPacked(0);
index 46c011b2e0c9465a3a22eac925f4e67f180d6e20..3eec22ca9b14cef341fb1f918a8a768fa7e43c13 100644 (file)
@@ -217,6 +217,10 @@ void infi_freeMultipleSend(void *ptr);
 void infi_unregAndFreeMeta(void *ch);
 #endif
 
+#if CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
+void * CmiAlloc_bgq (int     size);
+void   CmiFree_bgq  (void  * buf);
+#endif
 
 #if CMK_GRID_QUEUE_AVAILABLE
 CpvDeclare(void *, CkGridObject);
@@ -2853,6 +2857,8 @@ void *CmiAlloc(int size)
   res =(char *) CmiPoolAlloc(size+sizeof(CmiChunkHeader));
 #elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
   MPI_Alloc_mem(size+sizeof(CmiChunkHeader), MPI_INFO_NULL, &res);
+#elif CMK_SMP && CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
+  res = (char *) CmiAlloc_bgq(size+sizeof(CmiChunkHeader));
 #else
   res =(char *) malloc_nomigrate(size+sizeof(CmiChunkHeader));
 #endif
@@ -2956,6 +2962,8 @@ void CmiFree(void *blk)
     CmiPoolFree(BLKSTART(parentBlk));
 #elif USE_MPI_CTRLMSG_SCHEME && CMK_CONVERSE_MPI
     MPI_Free_mem(parentBlk);
+#elif CMK_SMP && CMK_BLUEGENEQ && CMK_USE_L2ATOMICS
+    CmiFree_bgq(BLKSTART(parentBlk));
 #else
     free_nomigrate(BLKSTART(parentBlk));
 #endif
index 1b5029376ce26a6f41700025db0f7bc8d206be1a..a3400415a637f37db1eec045f7cae9dc922b189c 100644 (file)
@@ -170,6 +170,8 @@ typedef int CmiNodeLock;
 #define CmiTryLock(lock)  ((lock)?1:((lock)=1,0))
 #define CmiDestroyLock(lock) /*empty*/
 
+#define CmiInCommThread() (0)
+
 #endif
 
 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
@@ -224,12 +226,25 @@ extern CmiNodeLock CmiMemLock_lock;
 #define CmiMemUnlock() do{if (CmiMemLock_lock) CmiUnlock(CmiMemLock_lock);} while (0)
 
 
+#if CMK_BLUEGENEQ && CMK_ENABLE_ASYNC_PROGRESS
+extern __thread int32_t _cmi_bgq_incommthread;
+#define CmiInCommThread()  (_cmi_bgq_incommthread)
+#else
+#define CmiInCommThread()  (CmiMyRank() == CmiMyNodeSize())
 #endif
 
+#endif //POSIX_THREADS_SMP
+
 #include "string.h"
 
 #if CMK_BLUEGENEL || CMK_BLUEGENEP
 #include "cmimemcpy.h"
+#elif CMK_BLUEGENEQ && CMK_BLUEGENEQ_OPTCOPY  
+void CmiMemcpy_qpx (void *dst, const void *src, size_t n);
+#define CmiMemcpy(dst,src,n)                                   \
+  if ( (n > 512+32) &&  ((((size_t)dst|(size_t)src) & 0x1F)==0) ) \
+    CmiMemcpy_qpx(dst, src, n);                        \
+  else memcpy(dst,src,n);
 #else
 #define CmiMemcpy(dest, src, size) memcpy((dest), (src), (size))
 #endif
@@ -435,9 +450,14 @@ for each processor in the node.
        } \
     } while(0)
 #define CpvInitialized(v) (0!=CMK_TAG(Cpv_,v))
+
+#if CMK_BLUEGENEQ && CMK_ENABLE_ASYNC_PROGRESS && CMK_IMMEDIATE_MSG
+  #define CpvAccess(v) (*(CMK_TAG(Cpv_addr_,v)[CmiMyRank()]))
+#else
 #define CpvAccess(v) (*CMK_TAG(Cpv_,v))
-#define CpvAccessOther(v, r) (*(CMK_TAG(Cpv_addr_,v)[r]))
+#endif
 
+#define CpvAccessOther(v, r) (*(CMK_TAG(Cpv_addr_,v)[r]))
 #else
 
 #define CpvDeclare(t,v) t* CMK_TAG(Cpv_,v)
index 7974bc20c171e3e2544ce0e32801fa458600213f..4878aeb07fcff60ce06f398de5296b664ff675ab 100644 (file)
@@ -1,12 +1,12 @@
 include ../common.mk
 
-FFTW_HOME=$(HOME)/fftw
+FFTW_HOME=$(HOME)/fftw-bgq
 
 FFTW_LIBDIR=$(FFTW_HOME)/lib
 
 OPTS=-g -O3
 
-INCLUDE=-I$(FFTW_HOME)/include -DFFTW_ENABLE_FLOAT=1
+INCLUDE=-I$(FFTW_HOME)/include -DFFTW_ENABLE_FLOAT=1 -DUSE_FFTW_DECLS=1
 COMPILER=$(CHARMC) $(INCLUDE)
 
 LIB=libmodulePencilFFT.a
index 5325c04fd83d790098175d5996ea3f3ba8f97ccf..2d334ccd90df89dac2b35775c6a5081c9e9a5039 100644 (file)
@@ -9,7 +9,8 @@ inline void configureLineFFTInfo (LineFFTInfo *info,
                                  int grainX, int grainY, int grainZ,
                                  CkCallback  *kcallback,
                                  LineFFTCompletion complid,
-                                 bool              normalize) 
+                                 bool              normalize,
+                                 int               numiter) 
 {  
   info->sizeX   =  sizeX;
   info->sizeY   =  sizeY;
@@ -24,6 +25,7 @@ inline void configureLineFFTInfo (LineFFTInfo *info,
   
   info->completionId = complid;
   info->normalize    = normalize;
+  info->numIter      = numiter;
 }
 
 
@@ -37,50 +39,58 @@ inline void  createLineFFTArray (LineFFTInfo *info) {
   CkAssert (info->grainY > 0);
   CkAssert (info->grainZ > 0);
   
-  info->xProxy = CProxy_LineFFTArray::ckNew();
-  info->yProxy = CProxy_LineFFTArray::ckNew();
-  info->zProxy = CProxy_LineFFTArray::ckNew();
+  int nx, ny, nz;
 
-  int x, y, z;
+  nx = info->sizeX / info->grainX;
+  ny = info->sizeY / info->grainY;
+  nz = info->sizeZ / info->grainZ;
 
-  double pe = 0.0;
-  double stride = 
-    (1.0 *CkNumPes() * info->grainZ * info->grainY)/
-    (info->sizeZ * info->sizeY);  
+  printf ("Creating Line FFT Array (%dx%dx%d, %dx%dx%d) on %d nodes, %d PEs\n",
+         info->sizeX, 
+         info->sizeY, 
+         info->sizeZ, 
+         info->grainX, 
+         info->grainY, 
+         info->grainZ,
+         CmiNumNodes(),
+         CkNumPes());
 
-  for (pe = 0.0, z = 0; z < (info->sizeZ)/(info->grainZ); z ++) {
-    for (y = 0; y < (info->sizeY)/(info->grainY); y++) {
-      if(pe >= CkNumPes()) pe = pe - CkNumPes();
-      info->xProxy(y, z).insert(*info, (int) PHASE_X, (int) pe);
-      pe +=  stride;
-    }
-  }
-  info->xProxy.doneInserting();
+  info->mapx =  CProxy_PencilMapX::ckNew(*info);
+  info->mapy =  CProxy_PencilMapY::ckNew(*info);
+  info->mapz =  CProxy_PencilMapZ::ckNew(*info);
+  
+  CkArrayOptions optsx;
+  optsx.setMap (info->mapx);
+  info->xProxy = CProxy_LineFFTArray::ckNew(*info, 
+                                           (int) PHASE_X, optsx);
 
-  stride = 
-    (1.0 *CkNumPes() * info->grainX * info->grainZ)/
-    (info->sizeX * info->sizeZ);  
+  CkArrayOptions optsy;
+  optsy.setMap (info->mapy);
+  info->yProxy = CProxy_LineFFTArray::ckNew(*info, 
+                                           (int) PHASE_Y, optsy);
 
-  for (pe=1.0, x = 0; x < (info->sizeX)/(info->grainX); x ++) {
-    for (z = 0; z < (info->sizeZ)/(info->grainZ); z ++) {
-      if(pe >= CkNumPes()) pe = pe - CkNumPes();
-      info->yProxy(z, x).insert(*info, (int) PHASE_Y, (int) pe);
-      pe += stride;
-    }
-  }
+  CkArrayOptions optsz;
+  optsz.setMap (info->mapz);
+  info->zProxy = CProxy_LineFFTArray::ckNew(*info, 
+                                           (int) PHASE_Z, optsz);
+  
+  int x,y,z;
+  for (z = 0; z < (info->sizeZ)/(info->grainZ); z ++) 
+    for (y = 0; y < (info->sizeY)/(info->grainY); y++)
+      info->xProxy(y, z).insert(*info, (int) PHASE_X);
+  
+  info->xProxy.doneInserting();
+  
+  for (x = 0; x < (info->sizeX)/(info->grainX); x ++)
+    for (z = 0; z < (info->sizeZ)/(info->grainZ); z ++) 
+      info->yProxy(z, x).insert(*info, (int) PHASE_Y);
+  
   info->yProxy.doneInserting();
   
-  stride = 
-    (1.0 *CkNumPes() * info->grainY * info->grainX)/
-    (info->sizeY * info->sizeX);  
-
-  for (pe=0.0, y = 0; y < (info->sizeY)/(info->grainY); y ++) {
-    for (x = 0; x < (info->sizeX)/(info->grainX); x ++) {
-      if(pe >= CkNumPes()) pe = pe - CkNumPes();
-      info->zProxy(x, y).insert(*info, (int) PHASE_Z, (int) pe);
-      pe += stride;
-    }
-  }
+  for (y = 0; y < (info->sizeY)/(info->grainY); y ++) 
+    for (x = 0; x < (info->sizeX)/(info->grainX); x ++) 
+      info->zProxy(x, y).insert(*info, (int) PHASE_Z);
+  
   info->zProxy.doneInserting();
 }
 
index 5f76ea165bfbe5502e79d5336640a3c6a538455f..33a7de797833cdcc347f248d5be70bae5cc9edf4 100644 (file)
@@ -1,9 +1,15 @@
 
 #include "pencilfft.h"
 
+#define PRIORITY_SIZE ((int) sizeof(int)*8)
+
 #define CONST_A  1001
 #define CONST_B  17
 
+//#define VERIFY_FFT        1
+
+CmiNodeLock fftw_plan_lock;
+
 ///
 /// \brief main constructor
 ///
@@ -37,23 +43,86 @@ LineFFTArray::LineFFTArray (LineFFTInfo &info, int phase): _info (info)
     break;
   };    
 
-  _line = new complex [_nElements];
+  _line = new complex[_nElements];
+  memset(_line, 0, sizeof(complex) * _nElements);
   
   LineFFTAssert ((_nFFTMessages % CONST_B) != 0);
-
-#if 0 //__LINEFFT_DEBUG__
+  
+#ifdef VERIFY_FFT 
   for (int count = 0; count < _nElements; count++)
-    _line[count] = complex (1.0 * count, 0.0);
-#endif
+    _line[count] = complex (1.0, 0.0);
+#endif  
 
+  CmiLock (fftw_plan_lock);
   _fwdplan = fftw_create_plan(_fftSize, FFTW_FORWARD, FFTW_IN_PLACE | 
                              FFTW_MEASURE | FFTW_USE_WISDOM);
   _bwdplan = fftw_create_plan(_fftSize, FFTW_BACKWARD, FFTW_IN_PLACE | 
                              FFTW_MEASURE | FFTW_USE_WISDOM);
+  CmiUnlock (fftw_plan_lock);
 
   _curFFTMessages  = 0;
   _curGridMessages = 0;
   _nGridMessages   = 0;
+  _iter = 0;
+
+#if USE_MANY_TO_MANY
+  initialize_many_to_many();
+#endif
+}
+
+void LineFFTArray::receive_transpose (int           chunk_id, 
+                                     complex     * data,
+                                     int           gx,
+                                     int           gy,
+                                     int           gz) 
+{
+  int start = 0;
+  int localindex = 0;
+  int index = 0;
+  switch (_phase) {
+  case PHASE_X:
+    start = chunk_id * gx;
+    for (int z = 0; z < gz; z++) 
+      for (int y = 0; y < gy; y++) 
+       for (int x = start; x < start + gx; x++) {
+         localindex = _info.sizeX * gy * z + _info.sizeX * y + x;
+#if __LINEFFT_DEBUG__
+         LineFFTAssert (localindex < _nElements);
+#endif
+         _line [localindex] = data[index++];   
+       }    
+    break;
+    
+  case PHASE_Y:  
+    start = chunk_id * gy;
+    for (int z = 0; z < gz; z++) 
+      for (int y = start; y < start+gy; y++) 
+       for (int x = 0; x < gx; x++) {
+         localindex = _info.sizeY * gz * x + _info.sizeY * z + y;
+#if __LINEFFT_DEBUG__
+         LineFFTAssert (localindex < _nElements);
+#endif
+         _line [localindex] = data[index++];   
+       }
+    break;
+    
+  case PHASE_Z:
+    start = chunk_id * gz;
+    for (int z = start; z < start+gz; z++) 
+      for (int y = 0; y < gy; y++) 
+       for (int x = 0; x < gx; x++) {
+         localindex = _info.sizeZ * gx * y + _info.sizeZ * x + z;
+#if __LINEFFT_DEBUG__
+         LineFFTAssert (localindex < _nElements);
+#endif
+         _line [localindex] = data[index++];   
+       }
+    break;
+
+  default:
+    CkAbort ("Phase undefined");
+    break;
+  };    
 }
 
 
@@ -73,59 +142,72 @@ void LineFFTArray::receiveFFTMessage (LineFFTMsg *msg) {
   LineFFTAssert (msg->phase == _phase);
 #endif
 
+  receive_transpose(msg->chunk_id, msg->data, 
+                   _info.grainX, _info.grainY, _info.grainZ);    
+  _curFFTMessages ++;
+  
+  if (_curFFTMessages == _nFFTMessages) {
+    _curFFTMessages = 0;
+    start_fft (msg->direction);
+  }
+  
+  CkFreeMsg (msg);
+}
+
+void LineFFTArray::send_transpose (int           chunk_id, 
+                                  complex   *   data,
+                                  int           gx,
+                                  int           gy,
+                                  int           gz) 
+{
+  int start = 0;
+  int localindex = 0;
+  int index = 0;
   switch (_phase) {
   case PHASE_X:
-    start = msg->chunk_id * _info.grainX;
-    for (int z = 0; z < _info.grainZ; z++) 
-      for (int y = 0; y < _info.grainY; y++) 
-       for (int x = start; x < start + _info.grainX; x++) {
-         localindex = _info.sizeX * _info.grainY * z + _info.sizeX * y + x;
+    start = chunk_id * gx;
+    for (int z = 0; z < gz; z++) 
+      for (int y = 0; y < gy; y++) 
+       for (int x = start; x < start+gx; x++) {
+         localindex = _info.sizeX * gy * z + _info.sizeX * y + x;
 #if __LINEFFT_DEBUG__
          LineFFTAssert (localindex < _nElements);
 #endif
-         _line [localindex] = msg->data[index++];   
-       }    
+         data[index++] = _line [localindex];
+       }        
     break;
     
-  case PHASE_Y:  
-    start = msg->chunk_id * _info.grainY;
-    for (int z = 0; z < _info.grainZ; z++) 
-      for (int y = start; y < start+_info.grainY; y++) 
-       for (int x = 0; x < _info.grainX; x++) {
-         localindex = _info.sizeY * _info.grainZ * x + _info.sizeY * z + y;
+  case PHASE_Y:
+    start = chunk_id * gy;
+    for (int z = 0; z < gz; z++) 
+      for (int y = start; y < start+gy; y++) 
+       for (int x = 0; x < gx; x++) {
+         localindex = _info.sizeY * gz * x + _info.sizeY * z + y;
 #if __LINEFFT_DEBUG__
          LineFFTAssert (localindex < _nElements);
 #endif
-         _line [localindex] = msg->data[index++];   
+         data[index++] = _line [localindex];
        }
-    break;
     
+    break;
+      
   case PHASE_Z:
-    start = msg->chunk_id * _info.grainZ;
-    for (int z = start; z < start+_info.grainZ; z++) 
-      for (int y = 0; y < _info.grainY; y++) 
-       for (int x = 0; x < _info.grainX; x++) {
-         localindex = _info.sizeZ * _info.grainX * y + _info.sizeZ * x + z;
+    start = chunk_id * gz;
+    for (int z = start; z < start+gz; z++) 
+      for (int y = 0; y < gy; y++) 
+       for (int x = 0; x < gx; x++) {
+         localindex = _info.sizeZ * gx * y + _info.sizeZ * x + z;
 #if __LINEFFT_DEBUG__
          LineFFTAssert (localindex < _nElements);
 #endif
-         _line [localindex] = msg->data[index++];   
-       }
+         data[index++] = _line [localindex];
+       }        
     break;
-
+    
   default:
     CkAbort ("Phase undefined");
     break;
   };
-    
-  _curFFTMessages ++;
-
-  if (_curFFTMessages == _nFFTMessages) {
-    _curFFTMessages = 0;
-    startFFT (msg->direction);
-  }
-
-  CkFreeMsg (msg);
 }
 
 ///
@@ -141,10 +223,12 @@ void LineFFTArray::sendFFTMessages (int dir) {
   int arridx_x = 0, arridx_y =0;
 
   for (int count = 0; count < _nFFTMessages; count ++) {
-    int start = 0, index = 0, localindex =0;
+    int index = 0, localindex =0;
     int chunk_id = (CONST_A*CkMyPe() + CONST_B * count) % _nFFTMessages;
 
     LineFFTMsg *msg = new (size, sizeof(int) * 8) LineFFTMsg;
+    send_transpose(chunk_id, msg->data,
+                  _info.grainX, _info.grainY, _info.grainZ);    
 
     if (dir == FFTW_FORWARD) {
       arridx_x = thisIndex.y;
@@ -159,35 +243,12 @@ void LineFFTArray::sendFFTMessages (int dir) {
 
     switch (_phase) {
     case PHASE_X:
-      start = chunk_id * _info.grainX;
-      for (int z = 0; z < _info.grainZ; z++) 
-       for (int y = 0; y < _info.grainY; y++) 
-         for (int x = start; x < start+_info.grainX; x++) {
-           localindex = _info.sizeX * _info.grainY * z + _info.sizeX * y + x;
-#if __LINEFFT_DEBUG__
-           LineFFTAssert (localindex < _nElements);
-#endif
-           msg->data[index++] = _line [localindex];
-         }
-  
       LineFFTAssert (dir == FFTW_FORWARD);
       proxy = _info.yProxy;
-      msg->phase = PHASE_Y;
-      
+      msg->phase = PHASE_Y;      
       break;
       
     case PHASE_Y:
-      start = chunk_id * _info.grainY;
-      for (int z = 0; z < _info.grainZ; z++) 
-       for (int y = start; y < start+_info.grainY; y++) 
-         for (int x = 0; x < _info.grainX; x++) {
-           localindex = _info.sizeY * _info.grainZ * x + _info.sizeY * z + y;
-#if __LINEFFT_DEBUG__
-           LineFFTAssert (localindex < _nElements);
-#endif
-           msg->data[index++] = _line [localindex];
-         }
-
       if (dir == FFTW_FORWARD) {
        proxy = _info.zProxy;
        msg->phase = PHASE_Z;
@@ -196,38 +257,20 @@ void LineFFTArray::sendFFTMessages (int dir) {
        proxy = _info.xProxy;
        msg->phase = PHASE_X;
       }
-
       break;
     
     case PHASE_Z:
-      start = chunk_id * _info.grainZ;
-      for (int z = start; z < start+_info.grainZ; z++) 
-       for (int y = 0; y < _info.grainY; y++) 
-         for (int x = 0; x < _info.grainX; x++) {
-           localindex = _info.sizeZ * _info.grainX * y + _info.sizeZ * x + z;
-#if __LINEFFT_DEBUG__
-           LineFFTAssert (localindex < _nElements);
-#endif
-           msg->data[index++] = _line [localindex];
-         }
-
       LineFFTAssert (dir == FFTW_BACKWARD);
       proxy = _info.yProxy;
-      msg->phase = PHASE_Y;
-
-      break;
-      
-    default:
-      CkAbort ("Phase undefined");
+      msg->phase = PHASE_Y;      
       break;
+      //Default case already verified
     };
-
+    
     msg->direction =  dir;
-
     //Send data to the intersecting neighbor
     proxy(arridx_x, arridx_y).receiveFFTMessage (msg);
-  }
-  
+  }  
 }
 
 ///
@@ -235,7 +278,16 @@ void LineFFTArray::sendFFTMessages (int dir) {
 /// or intiated when all the grid/fft messages have been received.
 ///
 void LineFFTArray::startFFT (int dir) {
-  
+  _iter = 0;
+  start_fft(dir);
+}
+
+///
+/// \brief Start the 3D fft operation. This can be called externally
+/// or intiated when all the grid/fft messages have been received.
+///
+void LineFFTArray::start_fft (int dir) {
+
 #if __LINEFFT_DEBUG__
   CkPrintf ("[%d, %d, %d, %d] Pencil FFT start fft %d,%d\n", 
            thisIndex.x, thisIndex.y, _phase, dir, _nElements, _fftSize);
@@ -248,38 +300,70 @@ void LineFFTArray::startFFT (int dir) {
   else
     plan = _bwdplan;
   
-  for (int count = 0; count < _nElements; count += _fftSize) {
-    CmiNetworkProgress();
+  for (int count = 0; count < _nElements; count += _fftSize)
     fftw_one (plan, (fftw_complex *) (_line + count), NULL);
-  }
-  
-  if (dir == FFTW_FORWARD && _phase == PHASE_Z) {
-    if (_info.normalize) {
-      fftw_real scale = 1.0 / (_info.sizeX * _info.sizeY * _info.sizeZ);
-      for (int count = 0; count < _nElements; count ++)
-       _line [count] *= scale;
-    }
-    
+
+  if (dir == FFTW_FORWARD && _phase == PHASE_Z) { 
     //call_kspace_callback ();    
-    startFFT (FFTW_BACKWARD);
+    start_fft (FFTW_BACKWARD);
     return;
   }
 
+  //Increment iter in backward phase
+  if (dir == FFTW_BACKWARD)
+    _iter ++;    
   if (dir == FFTW_BACKWARD && _phase == PHASE_X) {
-#if __LINEFFT_DEBUG__
+#ifdef VERIFY_FFT
+    if ( _info.normalize ) {
+      fftw_real scale = 1.0 / (_info.sizeX * _info.sizeY * _info.sizeZ);
+      for (int count = 0; count < _nElements; count ++)
+       _line [count].re *= scale;
+    }
     for (int count = 0; count < _nElements; count++) {
-      if (!((_line[count].re <= 1.001 * count)
-           && (_line[count].re >= 0.999 * count)))
-       CkPrintf ("[%d, %d] val = (%5.3f, %5.3f) \n", thisIndex.x, thisIndex.y, 
+      if (!((_line[count].re <= 1.001)
+           && (_line[count].re >= 0.999)))
+       CkPrintf ("[%d, %d] val[%d] = (%5.3f, %5.3f) \n", 
+                 thisIndex.x, thisIndex.y,
+                 count,
                  _line[count].re, _line[count].im);
     }
-#endif
 
-    call_donecallback ();
-    return;
+    for (int count = 0; count < _nElements; count++)
+      _line[count] = complex (1.0, 0.0);
+#endif      
+    
+    if (_iter == _info.numIter) {
+      call_donecallback ();      
+      return;
+    }
+    
+    dir = FFTW_FORWARD;
+    start_fft(dir);
+    return; //We starting the next iteration
   }
   
-  sendFFTMessages (dir);
+
+#if USE_MANY_TO_MANY  
+  if (_iter > 2) {
+    if (_info.grainX == 1 && _info.grainY == 1 && _info.grainZ ==1)
+      for (int idx = 0; idx < _nFFTMessages; ++idx) 
+       many_to_many_data[idx] = _line[idx];
+    else {
+      int size = _info.grainX * _info.grainY * _info.grainZ;
+      for (int idx = 0; idx < _nFFTMessages; ++idx) 
+       send_transpose(idx, many_to_many_data + idx * size,
+                      _info.grainX, _info.grainY, _info.grainZ);    
+    }
+    
+    int id = 0;
+    if (dir == FFTW_BACKWARD)
+      id = 1;
+    CmiAssert (tag_s[id] != -1);
+    CmiDirect_manytomany_start(handle, tag_s[id]);
+  }
+  else
+#endif
+    sendFFTMessages (dir);
 }
 
 
@@ -323,10 +407,10 @@ void LineFFTArray::receiveGridMessage (LineFFTGridMsg   *msg) {
 
   CkFreeMsg (msg);
   _curGridMessages ++;
-
+  
   if (_curGridMessages == _nGridMessages) {
     _curGridMessages = 0;
-    startFFT (FFTW_FORWARD);
+    start_fft (FFTW_FORWARD);
   }
 }
 
@@ -371,5 +455,200 @@ void LineFFTArray::sendGridMessages () {
   }
 }
 
+#if USE_MANY_TO_MANY
+
+void call_ck_cb(void *m) {
+  LineFFTCbWrapper *cw = (LineFFTCbWrapper *) m;
+  //cw->cb.send(cw->msg);
+  cw->me->many_to_many_recvFFT(cw->msg);
+}
+
+void LineFFTArray::initialize_many_to_many () {
+  bool sfwd = false, sbwd=false;
+  bool rfwd = false, rbwd=false;
+  int fchunk_id = thisIndex.x;
+  int bchunk_id = thisIndex.y;
+  
+  CProxy_Group sf_proxy;
+  if (_phase == PHASE_X) {
+    sfwd = true;
+    sf_proxy = _info.mapy;
+  }
+  else if (_phase == PHASE_Y) {
+    sfwd = true;    
+    sf_proxy = _info.mapz;
+  }
+
+  CProxy_Group rf_proxy;
+  if (_phase == PHASE_Y) {
+    rfwd = true;
+    rf_proxy = _info.mapx;
+  }
+  else if (_phase == PHASE_Z) {
+    rfwd = true;
+    rf_proxy = _info.mapy;
+  }
+
+  CProxy_Group sb_proxy;
+  if (_phase == PHASE_Z) {
+    sbwd = true;
+    sb_proxy = _info.mapy;
+  }
+  else if (_phase == PHASE_Y) {
+    sbwd = true;    
+    sb_proxy = _info.mapx;
+  }
+
+  CProxy_Group rb_proxy;
+  if (_phase == PHASE_Y) {
+    rbwd = true;
+    rb_proxy = _info.mapz;
+  }
+  else if (_phase == PHASE_X) {
+    rbwd = true;
+    rb_proxy = _info.mapy;
+  }
+
+  handle = CmiDirect_manytomany_allocate_handle();
+  many_to_many_data = new complex[_nElements];
+  memset(many_to_many_data, 0, _nElements * sizeof(complex));
+
+  tag_s[0] = tag_s[1] = -1;
+
+  if (sfwd) { //we have forward send
+    CmiDirect_manytomany_initialize_sendbase (handle, _phase+1, NULL, NULL, 
+                                             (char *)many_to_many_data, 
+                                             _nFFTMessages, fchunk_id);
+    tag_s[0] = _phase+1;
+  }
+  
+  if (sbwd) {//we have forward send
+    CmiDirect_manytomany_initialize_sendbase (handle, _phase-1+NUM_PHASES, 
+                                             NULL, NULL, 
+                                             (char *)many_to_many_data, 
+                                             _nFFTMessages, bchunk_id);
+    tag_s[1] = _phase - 1 + NUM_PHASES;
+  }
+
+  //printf ("After send base\n");
+    
+  int arridx_x, arridx_y;
+  int bytes = _info.grainX * _info.grainY * _info.grainZ *sizeof(complex);
+  int index = 0;    
+  if (sfwd || sbwd) {
+    for (index = 0; index < _nFFTMessages; index++) {
+      if (sfwd) {
+       arridx_x = thisIndex.y;
+       arridx_y = index;
+       CkArrayMap *amap = (CkArrayMap *) CkLocalBranch(sf_proxy);
+       CkArrayIndex2D idx (arridx_x, arridx_y);    
+       int pe = amap->procNum (0, idx);    
+       LineFFTAssert (pe >= 0);
+       LineFFTAssert (pe < CkNumPes());
+       
+       CmiDirect_manytomany_initialize_send (handle, _phase+1, index, 
+                                             bytes*index, bytes, pe);        
+      }
+      if (sbwd) {
+       arridx_x = index;
+       arridx_y = thisIndex.x;
+       CkArrayMap *amap = (CkArrayMap *) CkLocalBranch(sb_proxy);
+       CkArrayIndex2D idx (arridx_x, arridx_y);    
+       int pe = amap->procNum (0, idx);    
+       LineFFTAssert (pe >= 0);
+       LineFFTAssert (pe < CkNumPes());
+       
+       CmiDirect_manytomany_initialize_send (handle, _phase-1+NUM_PHASES,
+                                             index, bytes*index, bytes, pe); 
+      }
+    }
+  }
+  
+  //printf("After initialize send\n");
+
+  if (rfwd) {
+    CkArrayIndex2D aidx(thisIndex.x, thisIndex.y);
+    cbf_recv.cb = CkCallback(CkIndex_LineFFTArray::many_to_many_recvFFT(NULL), 
+                            aidx, thisProxy.ckGetArrayID());
+    LineFFTCompleteMsg *lmsg = new (PRIORITY_SIZE) LineFFTCompleteMsg;
+    lmsg->dir = FFTW_FORWARD;
+    cbf_recv.msg = lmsg;
+    cbf_recv.me  = this;
+    CmiDirect_manytomany_initialize_recvbase (handle, 
+                                             _phase, 
+                                             call_ck_cb, 
+                                             &cbf_recv, 
+                                             (char *) many_to_many_data, 
+                                             _nFFTMessages,
+                                             -1); //no local messages
+  }
+  
+  if (rbwd) {
+    CkArrayIndex2D aidx(thisIndex.x, thisIndex.y);
+    cbb_recv.cb = CkCallback(CkIndex_LineFFTArray::many_to_many_recvFFT(NULL), 
+                            aidx, thisProxy.ckGetArrayID());
+    
+    LineFFTCompleteMsg *lmsg = new (PRIORITY_SIZE) LineFFTCompleteMsg;
+    lmsg->dir = FFTW_BACKWARD;
+    cbb_recv.msg = lmsg;
+    cbb_recv.me  = this;
+
+    CmiDirect_manytomany_initialize_recvbase (handle, 
+                                             _phase + NUM_PHASES, 
+                                             call_ck_cb, 
+                                             &cbb_recv, 
+                                             (char *) many_to_many_data, 
+                                             _nFFTMessages,
+                                             -1); //no local messages
+  }
+
+  if (rfwd || rbwd) {
+    for (index = 0; index < _nFFTMessages; index++) {
+      if (rfwd) {
+       arridx_x = index; 
+       arridx_y = thisIndex.x; 
+       CkArrayMap *amap = (CkArrayMap *) CkLocalBranch(rf_proxy);
+       CkArrayIndex2D idx (arridx_x, arridx_y);    
+       int pe = amap->procNum (0, idx);    
+       LineFFTAssert (pe >= 0);
+       LineFFTAssert (pe < CkNumPes());         
+       
+       CmiDirect_manytomany_initialize_recv (handle, _phase, index, 
+                                             bytes*index, bytes, pe);        
+      }
+      if (rbwd) {
+       arridx_x = thisIndex.y;
+       arridx_y = index; 
+       CkArrayMap *amap = (CkArrayMap *) CkLocalBranch(rb_proxy);
+       CkArrayIndex2D idx (arridx_x, arridx_y);    
+       int pe = amap->procNum (0, idx);    
+       LineFFTAssert (pe >= 0);
+       LineFFTAssert (pe < CkNumPes());         
+       
+       CmiDirect_manytomany_initialize_recv (handle, _phase+NUM_PHASES, 
+                                             index, bytes*index, bytes, pe);
+      }
+    }    
+  }  
+}
+
+void LineFFTArray::many_to_many_recvFFT (LineFFTCompleteMsg *msg) {
+  //printf("in many_to_many_recvFFT\n");
+  if (_info.grainX == 1 && _info.grainY == 1 && _info.grainZ == 1) {
+    for (int idx = 0; idx < _nFFTMessages; ++idx) 
+      _line[idx] = many_to_many_data[idx];
+  }
+  else {
+    int size = _info.grainX *_info.grainY *_info.grainZ;    
+    for (int idx = 0; idx < _nFFTMessages; ++idx )
+      receive_transpose(idx, many_to_many_data + idx * size,
+                       _info.grainX, _info.grainY, _info.grainZ);    
+  }
+
+  start_fft(msg->dir);
+}
+
+#endif
+
 #include "PencilFFT.def.h"
 
index 72e4a60b06ba9fd6dea12c06918ab1022ce1112d..c1a25ffd0ffca0ef909a6800e0add1cde1126702 100644 (file)
@@ -3,12 +3,26 @@ module PencilFFT {
 
        message LineFFTMsg     {complex   data[];};
        message LineFFTGridMsg {fftw_real data[];};
+       message LineFFTCompleteMsg;
 
        array [2D] LineFFTArray {
                entry LineFFTArray (LineFFTInfo info, int phase);
                entry void receiveGridMessage (LineFFTGridMsg *msg);
                entry void setNumGridMessages (int n, CkCallback cb);
                entry void receiveFFTMessage  (LineFFTMsg *msg);
+               entry void many_to_many_recvFFT  (LineFFTCompleteMsg *msg);
                entry void startFFT (void);
        };
+
+       group PencilMapX : CkArrayMap {
+               entry  PencilMapX(LineFFTInfo  info);
+       }
+
+       group PencilMapY : CkArrayMap {
+               entry  PencilMapY(LineFFTInfo  info);
+       }
+
+       group PencilMapZ : CkArrayMap {
+               entry  PencilMapZ(LineFFTInfo  info);
+       }       
 };
index 72e9d0282eb7c304c16f5d1252dafcf829462df9..a979f5e25069235b486c7ac59f0053a293da6e66 100644 (file)
@@ -4,9 +4,14 @@
 
 #include "charm++.h"
 #include "ckcomplex.h"
-
 #include "sfftw.h"
 
+//#define USE_MANY_TO_MANY  1
+
+#if USE_MANY_TO_MANY
+#include "cmidirectmanytomany.h"
+#endif
+
 #define LineFFTAssert(x)  //CkAssert(x)
 #define __LINEFFT_DEBUG__  0
 
@@ -16,7 +21,8 @@
 enum LineFFTPhase {
   PHASE_X = 0,
   PHASE_Y,
-  PHASE_Z
+  PHASE_Z,
+  NUM_PHASES
 };
 
 
@@ -66,15 +72,19 @@ struct LineFFTInfo {
   CProxy_LineFFTArray     yProxy; 
   CProxy_LineFFTArray     zProxy;
 
+  CProxy_PencilMapX       mapx;
+  CProxy_PencilMapY       mapy;
+  CProxy_PencilMapZ       mapz;
+
   CkCallback              kSpaceCallback;
   LineFFTCompletion       completionId;
 
   bool                    normalize;
+  int                     numIter;
 };
 
 PUPbytes (LineFFTInfo)
 
-
 ///////////////////////////////////////////////////////////////
 //             The Pencil FFT Array Class
 //
@@ -105,6 +115,20 @@ class LineFFTGridMsg : public CMessage_LineFFTGridMsg {
   fftw_real         * data;  
 };
 
+class LineFFTCompleteMsg :  public CMessage_LineFFTCompleteMsg {
+ public:
+  int     dir;
+};
+
+class LineFFTArray;
+
+struct LineFFTCbWrapper {
+  CkCallback                    cb;
+  LineFFTCompleteMsg          * msg;
+  LineFFTArray                * me;
+};
+
+PUPbytes (LineFFTCbWrapper)
 
 ///
 /// Main Pencil FFT Chare Array
@@ -126,6 +150,8 @@ class LineFFTArray : public CBase_LineFFTArray {
   
   LineFFTPhase  _phase;  //X, Y or Z chare
 
+  ///\brief Iteration count
+  int         _iter;
 
   ///
   /// \brief the current number of grid messages received so far
@@ -197,18 +223,28 @@ class LineFFTArray : public CBase_LineFFTArray {
   ///
   void receiveGridMessage (LineFFTGridMsg   *msg);
 
+  ///
+  /// \brief transpose an intermediate FFT message
+  ///
+  void receive_transpose (int id, complex *data, int, int, int);
+
   ///
   /// \brief Receive an intermediate FFT message
   ///
   void receiveFFTMessage (LineFFTMsg *msg);
 
+  ///
+  /// \brief Start the 3D fft operation. This can be called externally
+  /// or intiated when all the grid/fft messages have been received.
+  ///
+  void start_fft (int direction = FFTW_FORWARD);
+
   ///
   /// \brief Start the 3D fft operation. This can be called externally
   /// or intiated when all the grid/fft messages have been received.
   ///
   void startFFT (int direction = FFTW_FORWARD);
 
-  
   ///
   /// \brief Set the number of grid messages to expect
   ///
@@ -224,12 +260,36 @@ class LineFFTArray : public CBase_LineFFTArray {
     contribute (sizeof (int), &x, CkReduction::sum_int, cb);
   }
 
+#if USE_MANY_TO_MANY 
+  void             * handle;   //many to many handle
+  int                tag_s[2]; //fwd and bwd send tags
+  complex          * many_to_many_data;
+  LineFFTCbWrapper   cbf_recv; //fwd recv completion
+  LineFFTCbWrapper   cbb_recv; //bwd recv completion
+
+  void initialize_many_to_many ();
+
+  ///
+  /// \brief many to many recv completion
+  ///
+  void many_to_many_recvFFT(LineFFTCompleteMsg *msg);
+#else
+  void many_to_many_recvFFT(LineFFTCompleteMsg *msg) {
+    //CmiAbort();
+  }
+#endif
+
   /////////////////////////////////////////////////////////////
   ///           End Entry Methods
   /////////////////////////////////////////////////////////////
   
  private:
 
+  ///
+  /// \brief Send the fft messages for the next few phases
+  /// 
+  void send_transpose (int id, complex *data, int, int, int);
+
   ///
   /// \brief Send the fft messages for the next few phases
   /// 
@@ -258,4 +318,187 @@ class LineFFTArray : public CBase_LineFFTArray {
   }
 };   // -- End class LineFFTArray
 
+extern CmiNodeLock fftw_plan_lock;
+
+class PencilMapX : public CBase_PencilMapX
+{
+  LineFFTInfo   _info;
+  int         * _mapcache;
+  int           _sizeY;
+
+ public:
+  PencilMapX(LineFFTInfo  &info) { 
+    _info = info;         
+    if (CmiMyRank() == 0)
+      if (fftw_plan_lock == NULL)
+       fftw_plan_lock = CmiCreateLock();
+    
+    initialize();
+  }
+  PencilMapX(CkMigrateMessage *m){}
+
+  void initialize () {    
+    double pe = 0.0;
+    int y = 0, z = 0;    
+    int nelem = (_info.sizeY * _info.sizeZ)/
+      (_info.grainY * _info.grainZ);    
+    if ((CkNumPes() % nelem) != 0)
+      nelem ++;
+    double stride = (int) ((1.0 *CkNumPes())/ nelem);
+#if USE_MANY_TO_MANY
+    CmiAssert (stride >= 1.0);
+#endif
+    _mapcache = new int[nelem];
+    memset (_mapcache, 0, sizeof(int)*nelem);
+    _sizeY = _info.sizeY/_info.grainY; 
+
+    int idx = 0;
+    for (pe = 0.0, z = 0; z < (_info.sizeZ)/(_info.grainZ); z ++) {
+      for (y = 0; y < (_info.sizeY)/(_info.grainY); y++) {
+       if(pe >= CkNumPes()) {
+         pe = pe - CkNumPes(); 
+         if ((int)pe == 0)
+           pe++;
+       }
+       idx = z * _sizeY + y;   
+       _mapcache[idx] = (int)pe;
+       pe +=  stride;
+      }
+    }
+    //if (CkMyPe() == 0)
+    //printf("X Last allocation to %d\n", (int)pe);    
+  }
+
+  int procNum(int, const CkArrayIndex &idx) {
+    CkArrayIndex2D idx2d = *(CkArrayIndex2D *) &idx;    
+    int y = idx2d.index[0];
+    int z = idx2d.index[1];
+    int id = z * _sizeY + y;
+
+    return _mapcache[id];
+  }
+};
+
+
+class PencilMapY : public CBase_PencilMapY
+{
+  LineFFTInfo   _info;
+  int         * _mapcache;
+  int           _sizeZ;
+
+ public:
+  PencilMapY(LineFFTInfo  &info) { 
+    _info = info;         
+    if (CmiMyRank() == 0)
+      if (fftw_plan_lock == NULL)
+       fftw_plan_lock = CmiCreateLock();
+    
+    initialize();
+  }
+  PencilMapY(CkMigrateMessage *m){}
+
+  void initialize () {    
+    double pe = 0.0;
+    int z = 0, x = 0;    
+    int nelem = (_info.sizeZ * _info.sizeX)/
+      (_info.grainZ * _info.grainX);    
+    if ((CkNumPes() % nelem) != 0)
+      nelem ++;
+    double stride = (int)((1.0 *CkNumPes())/ nelem);
+#if USE_MANY_TO_MANY
+    CmiAssert (stride >= 1.0);
+#endif
+    _mapcache = new int[nelem];
+    memset (_mapcache, 0, sizeof(int)*nelem);
+    _sizeZ = _info.sizeZ/_info.grainZ; 
+
+    int idx = 0;
+    for (pe = 1.0, x = 0; x < (_info.sizeX)/(_info.grainX); x ++) {
+      for (z = 0; z < (_info.sizeZ)/(_info.grainZ); z++) {
+       if(pe >= CkNumPes()) {
+         pe = pe - CkNumPes(); 
+         if ((int)pe == 1)
+           pe++;
+       }
+       idx = x * _sizeZ + z;   
+       _mapcache[idx] = (int)pe;
+       pe +=  stride;
+      }
+    }
+
+    //if (CkMyPe() == 0)
+    //printf("Y Last allocation to %d\n", (int)pe);
+  }
+
+  int procNum(int, const CkArrayIndex &idx) {
+    CkArrayIndex2D idx2d = *(CkArrayIndex2D *) &idx;    
+    int z = idx2d.index[0];
+    int x = idx2d.index[1];
+    int id = x * _sizeZ + z;
+
+    return _mapcache[id];
+  }
+};
+
+////////////////////////////////////////////
+
+class PencilMapZ : public CBase_PencilMapZ
+{
+  LineFFTInfo   _info;
+  int         * _mapcache;
+  int           _sizeX;
+
+ public:
+  PencilMapZ(LineFFTInfo  &info) { 
+    _info = info;         
+    if (CmiMyRank() == 0)
+      if (fftw_plan_lock == NULL)
+       fftw_plan_lock = CmiCreateLock();
+    
+    initialize();
+  }
+  PencilMapZ(CkMigrateMessage *m){}
+
+  void initialize () {    
+    double pe = 0.0;
+    int x = 0, y = 0;    
+    int nelem = (_info.sizeX * _info.sizeY)/
+      (_info.grainX * _info.grainY);    
+    if ((CkNumPes() % nelem) != 0)
+      nelem ++;
+    double stride = (int)((1.0 *CkNumPes())/ nelem);
+#if USE_MANY_TO_MANY
+    CmiAssert (stride >= 1.0);
+#endif
+    _mapcache = new int[nelem];    
+    memset (_mapcache, 0, sizeof(int)*nelem);
+    _sizeX = _info.sizeX/_info.grainX; 
+
+    int idx = 0;
+    for (pe = 0.0, x = 0; x < (_info.sizeX)/(_info.grainX); x ++) {
+      for (y = 0; y < (_info.sizeY)/(_info.grainY); y++) {
+       if(pe >= CkNumPes()) {
+         pe = pe - CkNumPes(); 
+         if((int) pe == 0)
+           pe++;
+       }
+       idx = y * _sizeX + x;   
+       _mapcache[idx] = (int)pe;
+       pe +=  stride;
+      }
+    }
+    //if (CkMyPe() == 0)
+    //printf("Z Last allocation to %d\n", (int)pe);
+  }
+
+  int procNum(int, const CkArrayIndex &idx) {
+    CkArrayIndex2D idx2d = *(CkArrayIndex2D *) &idx;    
+    int x = idx2d.index[0];
+    int y = idx2d.index[1];
+    int id = y * _sizeX + x;
+
+    return _mapcache[id];
+  }
+};
+
 #endif
diff --git a/src/util/cmimemcpy_qpx.h b/src/util/cmimemcpy_qpx.h
new file mode 100644 (file)
index 0000000..b2d25ca
--- /dev/null
@@ -0,0 +1,124 @@
+
+#ifndef  __CMI_MEMCPY_QPX__
+#define  __CMI_MEMCPY_QPX__
+
+#include <string.h>
+
+#define QPX_LOAD(si,sb,fp) \
+  do {                                                                 \
+  asm volatile("qvlfdx %0,%1,%2": "=f"(fp) : "b" (si), "r" (sb));      \
+  } while(0);
+
+#define QPX_STORE(si,sb,fp)                                            \
+  do {                                                                 \
+  asm volatile("qvstfdx %2,%0,%1": : "b" (si), "r" (sb), "f"(fp) :"memory"); \
+  } while(0);
+
+#ifndef __GNUC__
+#define FP_REG(i)   asm("f"#i)
+#define FP_REG1(i)  "fr"#i
+#else
+#define FP_REG(i)  asm("fr"#i)
+#define FP_REG1(i)  "fr"#i
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//Copy 512 bytes from a 32b aligned pointers
+static inline size_t quad_copy_512( char* dest, char* src ) {  
+    register double *fpp1_1, *fpp1_2;
+    register double *fpp2_1, *fpp2_2;
+
+    register double f0 FP_REG(0);
+    register double f1 FP_REG(1);
+    register double f2 FP_REG(2);
+    register double f3 FP_REG(3);
+    register double f4 FP_REG(4);
+    register double f5 FP_REG(5);
+    register double f6 FP_REG(6);
+    register double f7 FP_REG(7);
+
+    int r0;
+    int r1;
+    int r2;
+    int r3;
+    int r4;
+    int r5;
+    int r6;
+    int r7;
+    r0 = 0;
+    r1 = 64;
+    r2 = 128;
+    r3 = 192;
+    r4 = 256;
+    r5 = 320;
+    r6 = 384;
+    r7 = 448;
+
+    fpp1_1 = (double *)src;
+    fpp1_2 = (double *)src +4;
+
+    fpp2_1 = (double *)dest;
+    fpp2_2 = (double *)dest +4;
+
+    QPX_LOAD(fpp1_1,r0,f0);
+    //asm volatile("qvlfdx 0,%0,%1": : "Ob" (fpp1_1), "r"(r0) :"memory");
+    QPX_LOAD(fpp1_1,r1,f1);
+    QPX_LOAD(fpp1_1,r2,f2);
+    QPX_LOAD(fpp1_1,r3,f3);
+    QPX_LOAD(fpp1_1,r4,f4);
+    QPX_LOAD(fpp1_1,r5,f5);
+    QPX_LOAD(fpp1_1,r6,f6);
+    QPX_LOAD(fpp1_1,r7,f7);
+
+    QPX_STORE(fpp2_1,r0,f0);
+    QPX_LOAD(fpp1_2,r0,f0);
+    QPX_STORE(fpp2_1,r1,f1);
+    QPX_LOAD(fpp1_2,r1,f1);
+    QPX_STORE(fpp2_1,r2,f2);
+    QPX_LOAD(fpp1_2,r2,f2);   
+    QPX_STORE(fpp2_1,r3,f3);
+    QPX_LOAD(fpp1_2,r3,f3);
+    QPX_STORE(fpp2_1,r4,f4);
+    QPX_LOAD(fpp1_2,r4,f4);
+    QPX_STORE(fpp2_1,r5,f5);
+    QPX_LOAD(fpp1_2,r5,f5);
+    QPX_STORE(fpp2_1,r6,f6);
+    QPX_LOAD(fpp1_2,r6,f6);
+    QPX_STORE(fpp2_1,r7,f7);
+    QPX_LOAD(fpp1_2,r7,f7);
+    
+    QPX_STORE(fpp2_2,r0,f0);
+    QPX_STORE(fpp2_2,r1,f1);
+    QPX_STORE(fpp2_2,r2,f2);
+    QPX_STORE(fpp2_2,r3,f3);
+    QPX_STORE(fpp2_2,r4,f4);
+    QPX_STORE(fpp2_2,r5,f5);
+    QPX_STORE(fpp2_2,r6,f6);
+    QPX_STORE(fpp2_2,r7,f7);
+
+    return 0;
+}
+
+void CmiMemcpy_qpx (void *dst, const void *src, size_t n) 
+{
+  const char *s = src;
+  char *d = dst;
+  int n512 = n >> 9;
+  while (n512 --) {
+    quad_copy_512(d, s);
+    d += 512;
+    s += 512;
+  }
+  
+  if ( (n & 511UL) != 0 ) 
+    memcpy (d, s, n & 511UL);  
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
index 459be82067ab146e41ccbd2bc6747debc07c6a5a..54458ba3f05c2a2844a844a9a1d538499da9fb24 100644 (file)
@@ -1,12 +1,12 @@
 #Set FFTW_HOME home to the fftw-2.1.5 local installation
 
 #FFTW_HOME=/bgl/local/fftw-2.1.5/
-FFTW_HOME=$(HOME)/fftw
+FFTW_HOME=$(HOME)/fftw-bgq
 OPTS=-g -O3 
 
 CHARMC=../../../bin/charmc $(OPTS)
 INCLUDES=-I$(FFTW_HOME)/include -I$(CHARMBASE)/include/fftlib \
-        -DFFTW_ENABLE_FLOAT=1
+        -DFFTW_ENABLE_FLOAT=1 -DUSE_FFTW_DECLS=1
 
 LIBS=-module PencilFFT -L$(FFTW_HOME)/lib -language charm++ -lsfftw -lsrfftw
 
index 15708010a847334f86dd528823b5568baef1e507..ab905804d54961082abc1038b5f2730bade5140f 100644 (file)
@@ -3,8 +3,9 @@
 #include "pencilfft/pencil_api.h"
 #include "testpencil.decl.h"
 
-#define START_TIMING   10
-#define MAX_ITERATIONS 100
+#define NUM_FFT_ITER   200
+#define START_TIMING   1
+#define MAX_ITERATIONS 5
 
 LineFFTInfo   info;
 int           iteration;
@@ -13,7 +14,8 @@ double        startTime;
 void red_handler (void *param, int size, void *data) {
 
   iteration ++;
-
+  //printf ("Iteration Complete\n", iteration);
+  
   if (iteration == START_TIMING)
     startTime = CmiWallTimer ();
   
@@ -24,10 +26,11 @@ void red_handler (void *param, int size, void *data) {
     CkAssert (MAX_ITERATIONS > START_TIMING);
     CkPrintf ("Time to perform a pair of (%d, %d, %d) 3D FFT operations %g ms\n", 
              info.sizeX, info.sizeY, info.sizeZ,
-             (endTime - startTime) * 1000.0/ (MAX_ITERATIONS - START_TIMING));
+             (endTime - startTime) * 1000.0/ 
+             (NUM_FFT_ITER * (MAX_ITERATIONS - START_TIMING)));
     CkExit ();
   }
-
+  
   startLineFFTArray (&info);
 }
 
@@ -67,7 +70,10 @@ main::main (CkArgMsg *m) {
   CkPrintf ("Calling Configure\n");
   configureLineFFTInfo (&info, sizeX, sizeY, sizeZ, 
                        grainX, grainY, grainZ,
-                       NULL, ARRAY_REDUCTION, true);
+                       NULL, 
+                       ARRAY_REDUCTION, 
+                       true,
+                       NUM_FFT_ITER);
 
   CkPrintf ("Calling Create\n");
   createLineFFTArray (&info);