added pxshm persistent support for gemini.
authorGengbin Zheng <gzheng@illinois.edu>
Mon, 12 Sep 2011 04:19:13 +0000 (21:19 -0700)
committerGengbin Zheng <gzheng@illinois.edu>
Mon, 12 Sep 2011 04:19:13 +0000 (21:19 -0700)
12 files changed:
src/arch/gemini_gni-crayxe/conv-mach-pxshm.h [new file with mode: 0644]
src/arch/gemini_gni-crayxe/conv-mach-pxshm.sh [new file with mode: 0644]
src/arch/gemini_gni/Makefile.machine
src/arch/gemini_gni/conv-common.h
src/arch/gemini_gni/machine-persistent.c [new file with mode: 0644]
src/arch/gemini_gni/machine-persistent.h [new file with mode: 0644]
src/arch/util/machine-common-core.c
src/arch/util/machine-lrts.h
src/arch/util/machine-pxshm.c [new file with mode: 0644]
src/arch/util/persist-comm.c
src/conv-core/persistent.h
src/scripts/Makefile

diff --git a/src/arch/gemini_gni-crayxe/conv-mach-pxshm.h b/src/arch/gemini_gni-crayxe/conv-mach-pxshm.h
new file mode 100644 (file)
index 0000000..03ff919
--- /dev/null
@@ -0,0 +1,13 @@
+#undef CMK_USE_PXSHM
+#define CMK_USE_PXSHM                  1
+
+#undef CMK_IMMEDIATE_MSG
+#define CMK_IMMEDIATE_MSG       0
+
+#undef CMK_BROADCAST_HYPERCUBE
+#define CMK_BROADCAST_HYPERCUBE                                   1
+
+#undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
+#define CMK_WHEN_PROCESSOR_IDLE_USLEEP  0
+
+#define PXSHM_LOCK                      1
diff --git a/src/arch/gemini_gni-crayxe/conv-mach-pxshm.sh b/src/arch/gemini_gni-crayxe/conv-mach-pxshm.sh
new file mode 100644 (file)
index 0000000..a269d3c
--- /dev/null
@@ -0,0 +1 @@
+#CMK_LIBS="$CMK_LIBS -lrt"
index 68dc45d014dee110576cb90a9252064957bed538..41bac81e582e4db164aa9bfd7593ab820a982ac5 100644 (file)
@@ -1,2 +1,2 @@
-$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h
+$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-pxshm.c mempool.c
 
index 54f48da7725243c504d9e354468d1e6370b8f71b..cc46b9626035b5ab8c151fe4334b580eca614cc1 100644 (file)
@@ -36,5 +36,5 @@
 
 #define CMK_LB_CPUTIMER                                           0
 
-#define CMK_PERSISTENT_COMM                                0
+#define CMK_PERSISTENT_COMM                                1
 
diff --git a/src/arch/gemini_gni/machine-persistent.c b/src/arch/gemini_gni/machine-persistent.c
new file mode 100644 (file)
index 0000000..b8a2866
--- /dev/null
@@ -0,0 +1,196 @@
+/** @file
+ * Elan persistent communication
+ * @ingroup Machine
+*/
+
+/*
+  included in machine.c
+  Gengbin Zheng, 9/6/2011
+*/
+
+/*
+  machine specific persistent comm functions:
+  * CmiSendPersistentMsg
+  * CmiSyncSendPersistent
+  * PumpPersistent
+  * PerAlloc PerFree      // persistent message memory allocation/free functions
+  * persist_machine_init  // machine specific initialization call
+*/
+
+
+void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
+{
+  CmiAssert(h!=NULL);
+  PersistentSendsTable *slot = (PersistentSendsTable *)h;
+  CmiAssert(slot->used == 1);
+  CmiAssert(slot->destPE == destPE);
+  if (size > slot->sizeMax) {
+    CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
+    CmiAbort("Abort: Invalid size\n");
+  }
+
+/*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destPE=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), destPE, slot->destAddress[0], size);*/
+
+  if (slot->destBuf[0].destAddress) {
+#if 0
+    ELAN_EVENT *e1, *e2;
+    int strategy = STRATEGY_ONE_PUT;
+    /* if (size > 280) strategy = STRATEGY_TWO_ELANPUT; */
+    int *footer = (int*)((char*)m + size);
+    footer[0] = size;
+    footer[1] = 1;
+    if (strategy == STRATEGY_ONE_PUT) CMI_MESSAGE_SIZE(m) = size;
+    else CMI_MESSAGE_SIZE(m) = 0;
+    e1 = elan_put(elan_base->state, m, slot->destAddress[0], size+sizeof(int)*2, destPE);
+    switch (strategy ) {
+    case STRATEGY_ONE_PUT:
+    case STRATEGY_TWO_PUT:  {
+      PMSG_LIST *msg_tmp;
+      NEW_PMSG_LIST(e1, m, size, destPE, slot->destSizeAddress[0], h, strategy);
+      APPEND_PMSG_LIST(msg_tmp);
+      swapSendSlotBuffers(slot);
+      break;
+      }
+    case 2:
+      elan_wait(e1, ELAN_POLL_EVENT);
+      e2 = elan_put(elan_base->state, &size, slot->destSizeAddress[0], sizeof(int), destPE);
+      elan_wait(e2, ELAN_POLL_EVENT);
+      CMI_MESSAGE_SIZE(m) = 0;
+      /*CmiPrintf("[%d] elan finished. \n", CmiMyPe());*/
+      CmiFree(m);
+    }
+#else
+     // uGNI part
+    CmiFree(m);
+#endif
+  }
+  else {
+#if 1
+    if (slot->messageBuf != NULL) {
+      CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
+      CmiAbort("");
+    }
+    slot->messageBuf = m;
+    slot->messageSize = size;
+#else
+    /* normal send */
+    PersistentHandle  *phs_tmp = phs;
+    int phsSize_tmp = phsSize;
+    phs = NULL; phsSize = 0;
+    CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
+    CmiSyncSendAndFree(slot->destPE, size, m);
+    phs = phs_tmp; phsSize = phsSize_tmp;
+#endif
+  }
+}
+
+void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
+{
+  CmiState cs = CmiGetState();
+  char *dupmsg = (char *) CmiAlloc(size);
+  memcpy(dupmsg, msg, size);
+
+  /*  CmiPrintf("Setting root to %d\n", 0); */
+  CMI_SET_BROADCAST_ROOT(dupmsg, 0);
+
+  if (cs->pe==destPE) {
+    CQdCreate(CpvAccess(cQdState), 1);
+    CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
+  }
+  else
+    CmiSendPersistentMsg(h, destPE, size, dupmsg);
+}
+
+extern void CmiReference(void *blk);
+
+#if 0
+
+/* called in PumpMsgs */
+int PumpPersistent()
+{
+  int status = 0;
+  PersistentReceivesTable *slot = persistentReceivesTableHead;
+  while (slot) {
+    char *msg = slot->messagePtr[0];
+    int size = *(slot->recvSizePtr[0]);
+    if (size)
+    {
+      int *footer = (int*)(msg + size);
+      if (footer[0] == size && footer[1] == 1) {
+/*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
+
+#if 0
+      void *dupmsg;
+      dupmsg = CmiAlloc(size);
+                                                                                
+      _MEMCHECK(dupmsg);
+      memcpy(dupmsg, msg, size);
+      memset(msg, 0, size+2*sizeof(int));
+      msg = dupmsg;
+#else
+      /* return messagePtr directly and user MUST make sure not to delete it. */
+      /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
+
+      CmiReference(msg);
+      swapRecvSlotBuffers(slot);
+#endif
+
+      CmiPushPE(CMI_DEST_RANK(msg), msg);
+#if CMK_BROADCAST_SPANNING_TREE
+      if (CMI_BROADCAST_ROOT(msg))
+          SendSpanningChildren(size, msg);
+#endif
+      /* clear footer after message used */
+      *(slot->recvSizePtr[0]) = 0;
+      footer[0] = footer[1] = 0;
+
+#if 0
+      /* not safe at all! */
+      /* instead of clear before use, do it earlier */
+      msg=slot->messagePtr[0];
+      size = *(slot->recvSizePtr[0]);
+      footer = (int*)(msg + size);
+      *(slot->recvSizePtr[0]) = 0;
+      footer[0] = footer[1] = 0;
+#endif
+      status = 1;
+      }
+    }
+    slot = slot->next;
+  }
+  return status;
+}
+
+#endif
+
+void *PerAlloc(int size)
+{
+  return CmiAlloc(size);
+}
+                                                                                
+void PerFree(char *msg)
+{
+  //elan_CmiStaticFree(msg);
+  CmiFree(msg);
+}
+
+/* machine dependent init call */
+void persist_machine_init(void)
+{
+}
+
+void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
+{
+  int i;
+  for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+    char *buf = PerAlloc(maxBytes+sizeof(int)*2);
+    _MEMCHECK(buf);
+    memset(buf, 0, maxBytes+sizeof(int)*2);
+    slot->destBuf[i].destAddress = buf;
+    /* note: assume first integer in elan converse header is the msg size */
+    slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
+  }
+  slot->sizeMax = maxBytes;
+}
+
+
diff --git a/src/arch/gemini_gni/machine-persistent.h b/src/arch/gemini_gni/machine-persistent.h
new file mode 100644 (file)
index 0000000..9304f3b
--- /dev/null
@@ -0,0 +1,53 @@
+/** @file
+ * General implementation of persistent communication support
+ * @ingroup Machine
+ */
+
+/**
+ * \addtogroup Machine
+*/
+/*@{*/
+
+#define PERSIST_BUFFERS_NUM             1
+
+typedef struct  _PersistentBuf {
+  void *destAddress;
+  void *destSizeAddress;
+  gni_mem_handle_t    mem_hndl;
+} PersistentBuf;
+
+typedef struct _PersistentSendsTable {
+  int destPE;
+  int sizeMax;
+  PersistentHandle   destHandle;  
+  PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
+  void *messageBuf;
+  int messageSize;
+  char used;
+} PersistentSendsTable;
+
+typedef struct _PersistentReceivesTable {
+#if 0
+  void *messagePtr[PERSIST_BUFFERS_NUM];      /* preallocated message buffer of size "sizeMax" */
+  unsigned int *recvSizePtr[PERSIST_BUFFERS_NUM];   /* pointer to the size */
+#endif
+  PersistentBuf     destBuf[PERSIST_BUFFERS_NUM];
+  int sizeMax;
+  struct _PersistentReceivesTable *prev, *next;
+} PersistentReceivesTable;
+
+extern PersistentReceivesTable *persistentReceivesTableHead;
+extern PersistentReceivesTable *persistentReceivesTableTail;
+
+extern PersistentHandle  *phs;
+extern int phsSize;
+
+void *PerAlloc(int size);
+void PerFree(char *msg);
+void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m);
+int PumpPersistent();
+void swapSendSlotBuffers(PersistentSendsTable *slot);
+void swapRecvSlotBuffers(PersistentReceivesTable *slot);
+void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes);
+
+/*@}*/
index b0b60b448d634d50593f21d4820048ce43a0aa38..0ce523eb8c27625220d25c9de6932b6531e8f841 100644 (file)
@@ -440,12 +440,30 @@ void CmiSyncSendFn(int destPE, int size, char *msg) {
     CmiFreeSendFn(destPE, size, dupmsg);
 }
 
+#if CMK_USE_PXSHM
+inline int CmiValidPxshm(int dst, int size);
+void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot);
+void CmiInitPxshm(char **argv);
+inline void CommunicationServerPxshm();
+void CmiExitPxshm();
+#endif
+
 void CmiFreeSendFn(int destPE, int size, char *msg) {
     CMI_SET_BROADCAST_ROOT(msg, 0);
     CQdCreate(CpvAccess(cQdState), 1);
     if (CmiMyPe()==destPE) {
         CmiSendSelf(msg);
     } else {
+#if CMK_USE_PXSHM
+        int refcount = 0;
+        int ret=CmiValidPxshm(destPE, size);
+        if (ret) {
+          CMI_DEST_RANK(msg) = CmiRankOf(destPE);
+          CmiSendMessagePxshm(msg, size, destPE, &refcount, CmiRankOf(destPE), 0);
+          //for (int i=0; i<refcount; i++) CmiReference(msg);
+          return;
+        }
+#endif
         int destNode = CmiNodeOf(destPE);
 #if CMK_SMP
         if (CmiMyNode()==destNode) {
@@ -568,6 +586,7 @@ if (  MSG_STATISTIC)
     for(_ii=0; _ii<22; _ii++)
         msg_histogram[_ii] = 0;
 }
+
     LrtsInit(&argc, &argv, &_Cmi_numnodes, &_Cmi_mynode);
 
     _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
@@ -577,6 +596,10 @@ if (  MSG_STATISTIC)
     Cmi_startfn = fn;
     Cmi_usrsched = usched;
 
+#if CMK_USE_PXSHM
+    CmiInitPxshm(argv);
+#endif
+
     /* CmiTimerInit(); */
 #if CMK_BROADCAST_HYPERCUBE
     /* CmiNodesDim = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
@@ -604,6 +627,7 @@ if (  MSG_STATISTIC)
 #endif
 
     CmiStartThreads(argv);
+
     ConverseRunPE(initret);
 }
 
@@ -677,6 +701,10 @@ static void ConverseRunPE(int everReturn) {
 static INLINE_KEYWORD void AdvanceCommunication() {
     int doProcessBcast = 1;
 
+#if CMK_USE_PXSHM
+    CommunicationServerPxshm();
+#endif
+
     LrtsAdvanceCommunication();
 
 #if CMK_OFFLOAD_BCAST_PROCESS
@@ -727,6 +755,9 @@ void ConverseExit(void) {
     LrtsDrainResources();
 #endif
 
+#if CMK_USE_PXSHM
+        CmiExitPxshm();
+#endif
     ConverseCommonExit();
 
 if (MSG_STATISTIC)
@@ -863,3 +894,10 @@ static char *CopyMsg(char *msg, int len) {
     memcpy(copy, msg, len);
     return copy;
 }
+
+
+#if CMK_USE_PXSHM
+#include "machine-pxshm.c"
+#endif
+
+
index 67c6bb866ae620fadd76ee00612c80c373b656d0..4c70b4b354b8297a6350f29ec89374046c71f997 100644 (file)
@@ -1,5 +1,8 @@
 #ifndef  _MACHINE_LRTS_H_
 #define  _MACHINE_LRTS_H_
+
+void LrtsPrepareEnvelope(char *msg, int size);
+
 /* The machine-specific send function */
 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode);
 
diff --git a/src/arch/util/machine-pxshm.c b/src/arch/util/machine-pxshm.c
new file mode 100644 (file)
index 0000000..4c2a563
--- /dev/null
@@ -0,0 +1,893 @@
+/** @file
+                       size = CmiMsgHeaderGetLength(msg);
+ * pxshm --> posix shared memory based network layer for communication
+ * between processes on the same node
+ * This is not going to be the primary mode of communication 
+ * but only for messages below a certain size between
+ * processes on the same node
+ * for non-smp version only
+ * * @ingroup NET
+ * contains only pxshm code for 
+ * - CmiInitPxshm()
+ * - DeliverViaPxShm()
+ * - CommunicationServerPxshm()
+ * - CmiMachineExitPxshm()
+
+
+There are three options here for synchronization:
+      PXSHM_FENCE is the default. It uses memory fences
+      PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
+      PXSHM_LOCK will cause POSIX semaphores to be used
+
+  created by 
+       Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
+*/
+
+/**
+ * @addtogroup NET
+ * @{
+ */
+
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+
+
+/************** 
+   Determine which type of synchronization to use 
+*/
+#if PXSHM_OSSPINLOCK
+#include <libkern/OSAtomic.h>
+#elif PXSHM_LOCK
+#include <semaphore.h>
+#else
+/* Default to using fences */
+#define PXSHM_FENCE 1
+#endif
+
+
+#define MEMDEBUG(x) //x
+
+#define PXSHM_STATS 0
+
+
+/*** The following code was copied verbatim from pcqueue.h file ***/
+#undef CmiMemoryWriteFence
+#if PXSHM_FENCE
+#ifdef POWER_PC
+#define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
+#else
+#define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
+//#define CmiMemoryWriteFence(startPtr,nBytes) 
+#endif
+#else
+#undef CmiMemoryWriteFence
+#define CmiMemoryWriteFence(startPtr,nBytes)  
+#endif
+
+#undef CmiMemoryReadFence
+#if PXSHM_FENCE
+#ifdef POWER_PC
+#define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
+#else
+#define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
+//#define CmiMemoryReadFence(startPtr,nBytes) 
+#endif
+#else
+#define CmiMemoryReadFence(startPtr,nBytes) 
+#endif
+
+/***************************************************************************************/
+
+enum entities {SENDER,RECEIVER};
+
+/************************
+ *     Implementation currently assumes that
+ *     1) all nodes have the same number of processors
+ *  2) in the nodelist all processors in a node are listed in sequence
+ *   0 1 2 3      4 5 6 7 
+ *   -------      -------
+ *    node 1       node 2 
+ ************************/
+
+#define NAMESTRLEN 60
+#define PREFIXSTRLEN 50 
+
+#define SHMBUFLEN (1024*1024*1)
+
+#define SENDQSTARTSIZE 128
+
+
+/// This struct is used as the first portion of a shared memory region, followed by data
+typedef struct {
+       int count; //number of messages
+       int bytes; //number of bytes
+
+#if PXSHM_OSSPINLOCK
+       OSSpinLock lock;
+#endif
+
+#if PXSHM_FENCE
+       volatile int flagSender;
+        CmiMemorySMPSeparation_t pad1;
+       volatile int flagReceiver;
+        CmiMemorySMPSeparation_t pad2;
+       volatile int turn;
+       int dummy;
+#endif 
+
+} sharedBufHeader;
+
+
+typedef struct {
+#if PXSHM_LOCK
+       sem_t *mutex;
+#endif
+       sharedBufHeader *header;        
+       char *data;
+} sharedBufData;
+
+#if ! CMK_NET_VERSION
+typedef struct OutgoingMsgRec
+{
+  char *data;
+  int  *refcount;
+  int   size;
+}
+OutgoingMsgRec;
+#endif
+
+typedef struct {
+       int size; //total size of data array
+       int begin; //position of first element
+       int end;        //position of next element
+       int numEntries; //number of entries
+
+       OutgoingMsgRec *data;
+
+} PxshmSendQ;
+
+typedef struct {
+       int nodesize;
+       int noderank;
+       int nodestart,nodeend;//proc numbers for the start and end of this node
+       char prefixStr[PREFIXSTRLEN];
+       char **recvBufNames;
+       char **sendBufNames;
+
+       sharedBufData *recvBufs;
+       sharedBufData *sendBufs;
+
+       PxshmSendQ **sendQs;
+
+
+#if PXSHM_STATS
+       int sendCount;
+       int validCheckCount;
+       int lockRecvCount;
+       double validCheckTime;
+       double sendTime;
+       double commServerTime;
+#endif
+
+} PxshmContext;
+
+
+
+PxshmContext *pxshmContext=NULL; //global context
+
+
+void calculateNodeSizeAndRank(char **);
+void setupSharedBuffers();
+void initAllSendQs();
+
+/******************
+ *     Initialization routine
+ *     currently just testing start up
+ * ****************/
+void CmiInitPxshm(char **argv){
+       MACHSTATE(3,"CminitPxshm start");
+       pxshmContext = (PxshmContext *)calloc(1,sizeof(PxshmContext));
+
+#if CMK_NET_VERSION
+       if(Cmi_charmrun_pid <= 0){
+               CmiAbort("pxshm must be run with charmrun");
+       }
+#endif
+       calculateNodeSizeAndRank(argv);
+       if(pxshmContext->nodesize == 1) return;
+       
+       MACHSTATE1(3,"CminitPxshm  %d calculateNodeSizeAndRank",pxshmContext->nodesize);
+
+#if ! CMK_NET_VERSION
+        srand(getpid());
+        int Cmi_charmrun_pid = rand();
+        PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
+#endif
+       snprintf(&(pxshmContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
+
+       MACHSTATE2(3,"CminitPxshm %s %d pre setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
+
+       setupSharedBuffers();
+
+       MACHSTATE2(3,"CminitPxshm %s %d setupSharedBuffers",pxshmContext->prefixStr,pxshmContext->nodesize);
+
+       initAllSendQs();
+       
+       MACHSTATE2(3,"CminitPxshm %s %d initAllSendQs",pxshmContext->prefixStr,pxshmContext->nodesize);
+
+       MACHSTATE2(3,"CminitPxshm %s %d done",pxshmContext->prefixStr,pxshmContext->nodesize);
+
+#if PXSHM_STATS
+       pxshmContext->sendCount=0;
+       pxshmContext->sendTime=0.0;
+       pxshmContext->validCheckCount=0;
+       pxshmContext->validCheckTime=0.0;
+       pxshmContext->commServerTime = 0;
+       pxshmContext->lockRecvCount = 0;
+#endif
+
+};
+
+/**************
+ * shutdown shmem objects and semaphores
+ *
+ * *******************/
+void tearDownSharedBuffers();
+
+void CmiExitPxshm(){
+       int i=0;
+       
+       if(pxshmContext->nodesize != 1){
+               tearDownSharedBuffers();
+       
+               for(i=0;i<pxshmContext->nodesize;i++){
+                       if(i != pxshmContext->noderank){
+                               break;
+                       }
+               }
+               free(pxshmContext->recvBufNames[i]);
+               free(pxshmContext->sendBufNames[i]);
+
+               free(pxshmContext->recvBufNames);
+               free(pxshmContext->sendBufNames);
+
+               free(pxshmContext->recvBufs);
+               free(pxshmContext->sendBufs);
+
+       }
+#if PXSHM_STATS
+CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
+#endif
+       free(pxshmContext);
+}
+
+/******************
+ *Should this message be sent using PxShm or not ?
+ * ***********************/
+
+inline int CmiValidPxshm(int dst, int size){
+#if PXSHM_STATS
+       pxshmContext->validCheckCount++;
+#endif
+
+/*     if(pxshmContext->nodesize == 1){
+               return 0;
+       }*/
+       //replace by bitmap later
+       //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
+       if(dst >= pxshmContext->nodestart && dst <= pxshmContext->nodeend && size < SHMBUFLEN ){
+               return 1;
+       }else{
+               return 0;
+       }
+};
+
+
+inline int PxshmRank(int dst){
+       return dst - pxshmContext->nodestart;
+}
+inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
+inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ);
+inline int flushSendQ(int dstRank);
+
+inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
+  return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
+}
+
+/***************
+ *
+ *Send this message through shared memory
+ *if you cannot get lock, put it in the sendQ
+ *Before sending messages pick them from sendQ
+ *
+ * ****************************/
+
+void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
+{
+
+#if PXSHM_STATS
+       double _startSendTime = CmiWallTimer();
+#endif
+
+        LrtsPrepareEnvelope(msg, size);
+       
+       int dstRank = PxshmRank(dstpe);
+       MEMDEBUG(CmiMemoryCheck());
+  
+       MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
+       MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
+
+       CmiAssert(dstRank >=0 && dstRank != pxshmContext->noderank);
+       
+       sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
+
+
+#if PXSHM_OSSPINLOCK
+       if(! OSSpinLockTry(&dstBuf->header->lock)){
+#elif PXSHM_LOCK
+       if(sem_trywait(dstBuf->mutex) < 0){
+#elif PXSHM_FENCE
+       dstBuf->header->flagSender = 1;
+       dstBuf->header->turn = RECEIVER;
+       CmiMemoryReadFence(0,0);
+       CmiMemoryWriteFence(0,0);
+       //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
+       if(dstBuf->header->flagReceiver){
+               dstBuf->header->flagSender = 0;
+#endif
+               /**failed to get the lock 
+               insert into q and retain the message*/
+
+               pushSendQ(pxshmContext->sendQs[dstRank], msg, size, refcount);
+               (*refcount)++;
+               MEMDEBUG(CmiMemoryCheck());
+               return;
+       }else{
+
+               /***
+                * We got the lock for this buffer
+                * first write all the messages in the sendQ and then write this guy
+                * */
+                if(pxshmContext->sendQs[dstRank]->numEntries == 0){
+                               // send message user event
+                               int ret = sendMessage(msg,size,refcount,dstBuf,pxshmContext->sendQs[dstRank]);
+                               MACHSTATE(3,"Pxshm Send succeeded immediately");
+                }else{
+                               (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
+                               pushSendQ(pxshmContext->sendQs[dstRank],msg,size,refcount);
+                               MACHSTATE3(3,"Pxshm ogm %p pushed to sendQ length %d refcount %d",ogm,pxshmContext->sendQs[dstRank]->numEntries,ogm->refcount);
+                               int sent = flushSendQ(dstRank);
+                               (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
+                               MACHSTATE1(3,"Pxshm flushSendQ sent %d messages",sent);
+                }
+                /* unlock the recvbuffer*/
+
+#if PXSHM_OSSPINLOCK
+                OSSpinLockUnlock(&dstBuf->header->lock);
+#elif PXSHM_LOCK
+                sem_post(dstBuf->mutex);
+#elif PXSHM_FENCE
+                CmiMemoryReadFence(0,0);                       
+                CmiMemoryWriteFence(0,0);
+                dstBuf->header->flagSender = 0;
+#endif
+       }
+#if PXSHM_STATS
+               pxshmContext->sendCount ++;
+               pxshmContext->sendTime += (CmiWallTimer()-_startSendTime);
+#endif
+       MEMDEBUG(CmiMemoryCheck());
+
+};
+
+inline void emptyAllRecvBufs();
+inline void flushAllSendQs();
+
+/**********
+ * Extract all the messages from the recvBuffers you can
+ * Flush all sendQs
+ * ***/
+inline void CommunicationServerPxshm(){
+       
+#if PXSHM_STATS
+       double _startCommServerTime =CmiWallTimer();
+#endif 
+       
+       MEMDEBUG(CmiMemoryCheck());
+       emptyAllRecvBufs();
+       flushAllSendQs();
+
+#if PXSHM_STATS
+       pxshmContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
+#endif
+
+       MEMDEBUG(CmiMemoryCheck());
+};
+
+static void CmiNotifyStillIdlePxshm(CmiIdleState *s){
+       CommunicationServerPxshm();
+}
+
+
+static void CmiNotifyBeginIdlePxshm(CmiIdleState *s)
+{
+       CmiNotifyStillIdle(s);
+}
+
+
+void calculateNodeSizeAndRank(char **argv){
+       pxshmContext->nodesize=1;
+       MACHSTATE(3,"calculateNodeSizeAndRank start");
+       //CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
+       CmiGetArgIntDesc(argv, "+nodesize", &(pxshmContext->nodesize),"Number of cores in this node");
+       MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",pxshmContext->nodesize);
+
+       pxshmContext->noderank = _Cmi_mynode % (pxshmContext->nodesize);
+       
+       MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",pxshmContext->noderank);
+       
+       pxshmContext->nodestart = _Cmi_mynode -pxshmContext->noderank;
+       
+       MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
+
+       pxshmContext->nodeend = pxshmContext->nodestart + pxshmContext->nodesize -1;
+
+       if(pxshmContext->nodeend >= _Cmi_numnodes){
+               pxshmContext->nodeend = _Cmi_numnodes-1;
+               pxshmContext->nodesize = (pxshmContext->nodeend - pxshmContext->nodestart) +1;
+       }
+       
+       MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",pxshmContext->nodestart,pxshmContext->nodesize,pxshmContext->noderank);
+}
+
+void allocBufNameStrings(char ***bufName);
+void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames);
+/***************
+ *     calculate the name of the shared objects and semaphores
+ *     
+ *     name scheme
+ *     shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
+ *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
+ *                the semaphore name used by us is the same as the shared memory object name
+ *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
+ *
+ *     open these shared objects and semaphores
+ * *********/
+void setupSharedBuffers(){
+       int i=0;
+
+       allocBufNameStrings(&(pxshmContext->recvBufNames));
+       
+       MACHSTATE(3,"allocBufNameStrings for recvBufNames done");
+       MEMDEBUG(CmiMemoryCheck());
+
+       allocBufNameStrings((&pxshmContext->sendBufNames));
+       
+       MACHSTATE(3,"allocBufNameStrings for sendBufNames done");
+
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       snprintf(pxshmContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,pxshmContext->noderank+pxshmContext->nodestart,i+pxshmContext->nodestart);
+                       MACHSTATE2(3,"recvBufName %s with rank %d",pxshmContext->recvBufNames[i],i)
+                       snprintf(pxshmContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",pxshmContext->prefixStr,i+pxshmContext->nodestart,pxshmContext->noderank+pxshmContext->nodestart);
+                       MACHSTATE2(3,"sendBufName %s with rank %d",pxshmContext->sendBufNames[i],i);
+               }
+       }
+       
+       createShmObjectsAndSems(&(pxshmContext->recvBufs),pxshmContext->recvBufNames);
+       createShmObjectsAndSems(&(pxshmContext->sendBufs),pxshmContext->sendBufNames);
+       
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       //CmiAssert(pxshmContext->sendBufs[i].header->count == 0);
+                       pxshmContext->sendBufs[i].header->count = 0;
+                       pxshmContext->sendBufs[i].header->bytes = 0;
+               }
+       }
+}
+
+void allocBufNameStrings(char ***bufName){
+       int i,count;
+       
+       int totalAlloc = sizeof(char)*NAMESTRLEN*(pxshmContext->nodesize-1);
+       char *tmp = malloc(totalAlloc);
+       
+       MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
+
+       *bufName = (char **)malloc(sizeof(char *)*pxshmContext->nodesize);
+       
+       for(i=0,count=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
+                       count++;
+               }else{
+                       (*bufName)[i] = NULL;
+               }
+       }
+}
+
+void createShmObject(char *name,int size,char **pPtr);
+
+void createShmObjectsAndSems(sharedBufData **bufs,char **bufNames){
+       int i=0;
+       
+       *bufs = (sharedBufData *)calloc(pxshmContext->nodesize, sizeof(sharedBufData));
+       
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
+                        memset(((*bufs)[i].header), 0, SHMBUFLEN+sizeof(sharedBufHeader));
+                       (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
+#if PXSHM_OSSPINLOCK
+                       (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
+#elif PXSHM_LOCK
+                       (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
+#endif
+               }else{
+                       (*bufs)[i].header = NULL;
+                       (*bufs)[i].data = NULL;
+#if PXSHM_LOCK
+                       (*bufs)[i].mutex = NULL;
+#endif
+               }
+       }       
+}
+
+
+void createShmObject(char *name,int size,char **pPtr){
+       int fd=-1;
+       int flags;      // opening flags for shared object
+       int open_repeat_count = 0;
+
+       flags= O_RDWR | O_CREAT; // open file in read-write mode and create it if its not there
+       
+       while(fd<0 && open_repeat_count < 100){
+         open_repeat_count++;
+         fd = shm_open(name,flags, S_IRUSR | S_IWUSR); // create the shared object with permissions for only the user to read and write
+         
+         if(fd < 0 && open_repeat_count > 10){
+           fprintf(stderr,"Error(attempt=%d) from shm_open %s while opening %s \n",open_repeat_count, strerror(errno),name);
+           fflush(stderr);
+         }
+       }
+
+       CmiAssert(fd >= 0);
+
+       ftruncate(fd,size); //set the size of the shared memory object
+
+       *pPtr = mmap(NULL,size,PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 
+       CmiAssert(*pPtr != NULL);
+
+       close(fd);
+}
+
+void tearDownSharedBuffers(){
+       int i;
+       for(i= 0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       if(     shm_unlink(pxshmContext->recvBufNames[i]) < 0){
+                               fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
+                       }
+
+#if PXSHM_LOCK
+                       sem_close(pxshmContext->recvBufs[i].mutex);
+                       //sem_unlink(pxshmContext->recvBufNames[i]);
+                       sem_close(pxshmContext->sendBufs[i].mutex);
+#endif
+               }
+       }
+};
+
+
+void initSendQ(PxshmSendQ *q,int size);
+
+void initAllSendQs(){
+       int i=0;
+       pxshmContext->sendQs = (PxshmSendQ **) malloc(sizeof(PxshmSendQ *)*pxshmContext->nodesize);
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       (pxshmContext->sendQs)[i] = (PxshmSendQ *)calloc(1, sizeof(PxshmSendQ));
+                       initSendQ((pxshmContext->sendQs)[i],SENDQSTARTSIZE);
+               }else{
+                       (pxshmContext->sendQs)[i] = NULL;
+               }
+       }
+};
+
+
+/****************
+ *copy this message into the sharedBuf
+ If it does not succeed
+ *put it into the sendQ 
+ *NOTE: This method is called only after obtaining the corresponding mutex
+ * ********/
+int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,PxshmSendQ *dstSendQ){
+
+       if(dstBuf->header->bytes+size <= SHMBUFLEN){
+               /**copy  this message to sharedBuf **/
+               dstBuf->header->count++;
+               memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
+               dstBuf->header->bytes += size;
+               MACHSTATE4(3,"Pxshm send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
+                CmiFree(msg);
+               return 1;
+       }
+       /***
+        * Shared Buffer is too full for this message
+        * **/
+       printf("send buffer is too full\n");
+       pushSendQ(dstSendQ,msg,size,refcount);
+       (*refcount)++;
+       MACHSTATE3(3,"Pxshm send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
+       return 0;
+}
+
+inline OutgoingMsgRec* popSendQ(PxshmSendQ *q);
+
+/****
+ *Try to send all the messages in the sendq to this destination rank
+ *NOTE: This method is called only after obtaining the corresponding mutex
+ * ************/
+
+inline int flushSendQ(int dstRank){
+       sharedBufData *dstBuf = &(pxshmContext->sendBufs[dstRank]);
+       PxshmSendQ *dstSendQ = pxshmContext->sendQs[dstRank];
+       int count=dstSendQ->numEntries;
+       int sent=0;
+       while(count > 0){
+               OutgoingMsgRec *ogm = popSendQ(dstSendQ);
+               (*ogm->refcount)--;
+               MACHSTATE4(3,"Pxshm trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
+               int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
+               if(ret==1){
+                       sent++;
+#if CMK_NET_VERSION
+                        GarbageCollectMsg(ogm);
+#endif
+               }
+               count--;
+       }
+       return sent;
+}
+
+inline void emptyRecvBuf(sharedBufData *recvBuf);
+
+inline void emptyAllRecvBufs(){
+       int i;
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank){
+                       sharedBufData *recvBuf = &(pxshmContext->recvBufs[i]);
+                       if(recvBuf->header->count > 0){
+
+#if PXSHM_STATS
+                               pxshmContext->lockRecvCount++;
+#endif
+
+
+#if PXSHM_OSSPINLOCK
+                               if(! OSSpinLockTry(&recvBuf->header->lock)){
+#elif PXSHM_LOCK
+                               if(sem_trywait(recvBuf->mutex) < 0){
+#elif PXSHM_FENCE
+                               recvBuf->header->flagReceiver = 1;
+                               recvBuf->header->turn = SENDER;
+                               CmiMemoryReadFence(0,0);
+                               CmiMemoryWriteFence(0,0);
+                               //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
+                               if((recvBuf->header->flagSender)){
+                                       recvBuf->header->flagReceiver = 0;
+#endif
+                               }else{
+
+
+                                       MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
+                                       emptyRecvBuf(recvBuf);
+
+#if PXSHM_OSSPINLOCK
+                                       OSSpinLockUnlock(&recvBuf->header->lock);
+#elif PXSHM_LOCK
+                                       sem_post(recvBuf->mutex);
+#elif PXSHM_FENCE
+                                       CmiMemoryReadFence(0,0);
+                                       CmiMemoryWriteFence(0,0);
+                                       recvBuf->header->flagReceiver = 0;
+#endif
+
+                               }
+                       
+                       }
+               }
+       }
+};
+
+inline void flushAllSendQs(){
+       int i=0;
+       
+       for(i=0;i<pxshmContext->nodesize;i++){
+               if(i != pxshmContext->noderank && pxshmContext->sendQs[i]->numEntries > 0){
+       
+#if PXSHM_OSSPINLOCK
+                       if(OSSpinLockTry(&pxshmContext->sendBufs[i].header->lock)){
+#elif PXSHM_LOCK
+                       if(sem_trywait(pxshmContext->sendBufs[i].mutex) >= 0){
+#elif PXSHM_FENCE
+                       pxshmContext->sendBufs[i].header->flagSender = 1;
+                       pxshmContext->sendBufs[i].header->turn = RECEIVER;
+                       CmiMemoryReadFence(0,0);                        
+                       CmiMemoryWriteFence(0,0);
+                       if(!(pxshmContext->sendBufs[i].header->flagReceiver && pxshmContext->sendBufs[i].header->turn == RECEIVER)){
+#endif
+
+                               MACHSTATE1(3,"flushSendQ %d",i);
+                               flushSendQ(i);
+
+
+                               
+#if PXSHM_OSSPINLOCK   
+                               OSSpinLockUnlock(&pxshmContext->sendBufs[i].header->lock);
+#elif PXSHM_LOCK
+                               sem_post(pxshmContext->sendBufs[i].mutex);
+#elif PXSHM_FENCE
+                               CmiMemoryReadFence(0,0);                        
+                               CmiMemoryWriteFence(0,0);
+                               pxshmContext->sendBufs[i].header->flagSender = 0;
+#endif
+                       }else{
+
+#if PXSHM_FENCE
+                         pxshmContext->sendBufs[i].header->flagSender = 0;
+#endif                         
+
+                       }
+
+               }        
+       }       
+};
+
+void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot);
+
+void emptyRecvBuf(sharedBufData *recvBuf){
+       int numMessages = recvBuf->header->count;
+       int i=0;
+
+       char *ptr=recvBuf->data;
+
+       for(i=0;i<numMessages;i++){
+               int size;
+               int rank, srcpe, seqno, magic, i;
+               unsigned int broot;
+               char *msg = ptr;
+               char *newMsg;
+
+#if CMK_NET_VERSION
+               DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
+               size = CmiMsgHeaderGetLength(msg);
+#else
+                size = CmiGetMsgSize(msg);
+#endif
+       
+               newMsg = (char *)CmiAlloc(size);
+               memcpy(newMsg,msg,size);
+
+#if CMK_NET_VERSION
+               handoverPxshmMessage(newMsg,size,rank,broot);
+#else
+                handleOneRecvedMsg(size, newMsg);
+#endif
+               
+               ptr += size;
+
+               MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
+       }
+#if 1
+  if(ptr - recvBuf->data != recvBuf->header->bytes){
+               CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
+       }
+#endif
+       CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
+       recvBuf->header->count=0;
+       recvBuf->header->bytes=0;
+}
+
+
+void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
+#if CMK_NET_VERSION
+       CmiAssert(rank == 0);
+#if CMK_BROADCAST_SPANNING_TREE
+        if (rank == DGRAM_BROADCAST
+#if CMK_NODE_QUEUE_AVAILABLE
+          || rank == DGRAM_NODEBROADCAST
+#endif
+         ){
+               SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
+                                       }
+#elif CMK_BROADCAST_HYPERCUBE
+        if (rank == DGRAM_BROADCAST
+#if CMK_NODE_QUEUE_AVAILABLE
+          || rank == DGRAM_NODEBROADCAST
+#endif
+         ){
+                       SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
+                                       }
+#endif
+
+               switch (rank) {
+       case DGRAM_BROADCAST: {
+          CmiPushPE(0, newmsg);
+          break;
+      }
+        default:
+                               {
+                                       
+          CmiPushPE(rank, newmsg);
+                               }
+       }    /* end of switch */
+#endif
+}
+
+
+/**************************
+ *sendQ helper functions
+ * ****************/
+
+void initSendQ(PxshmSendQ *q,int size){
+       q->data = (OutgoingMsgRec *)malloc(sizeof(OutgoingMsgRec)*size);
+
+       q->size = size;
+       q->numEntries = 0;
+
+       q->begin = 0;
+       q->end = 0;
+}
+
+void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount){
+       if(q->numEntries == q->size){
+               //need to resize 
+               OutgoingMsgRec *oldData = q->data;
+               int newSize = q->size<<1;
+               q->data = (OutgoingMsgRec *)malloc(sizeof(OutgoingMsgRec)*newSize);
+               //copy head to the beginning of the new array
+               
+               CmiAssert(q->begin == q->end);
+
+               CmiAssert(q->begin < q->size);
+               memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
+
+               if(q->end != 0){
+                       memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
+               }
+               free(oldData);
+               q->begin = 0;
+               q->end = q->size;
+               q->size = newSize;
+       }
+       OutgoingMsgRec *omg = &q->data[q->end];
+        omg->size = size;
+        omg->data = msg;
+        omg->refcount = refcount;
+       (q->end)++;
+       if(q->end >= q->size){
+               q->end -= q->size;
+       }
+       q->numEntries++;
+}
+
+OutgoingMsgRec * popSendQ(PxshmSendQ *q){
+       OutgoingMsgRec * ret;
+       if(0 == q->numEntries){
+               return NULL;
+       }
+
+       ret = &q->data[q->begin];
+       (q->begin)++;
+       if(q->begin >= q->size){
+               q->begin -= q->size;
+       }
+       
+       q->numEntries--;
+       return ret;
+}
index f3b1432c2424b336c88135cd91f090f3b3933476..ce09bc53e0f1aebd6d186a6aa1984ef2db88a79d 100644 (file)
@@ -12,7 +12,8 @@
 
 #if CMK_PERSISTENT_COMM
 
-#include "persist_impl.h"
+#include "gni_pub.h"
+#include "machine-persistent.h"
 
 #define TABLESIZE  512
 PersistentSendsTable persistentSendsTable[TABLESIZE];
@@ -31,8 +32,11 @@ typedef struct _PersistentRequestMsg {
 
 typedef struct _PersistentReqGrantedMsg {
   char core[CmiMsgHeaderSizeBytes];
+/*
   void *msgAddr[PERSIST_BUFFERS_NUM];
   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
+*/
+  PersistentBuf    buf[PERSIST_BUFFERS_NUM];
   PersistentHandle sourceHandlerIndex;
   PersistentHandle destHandlerIndex;
 } PersistentReqGrantedMsg;
@@ -61,10 +65,13 @@ void initSendSlot(PersistentSendsTable *slot)
   slot->destPE = -1;
   slot->sizeMax = 0;
   slot->destHandle = 0; 
+#if 0
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
     slot->destAddress[i] = NULL;
     slot->destSizeAddress[i] = NULL;
   }
+#endif
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
   slot->messageBuf = 0;
   slot->messageSize = 0;
 }
@@ -72,22 +79,31 @@ void initSendSlot(PersistentSendsTable *slot)
 void swapSendSlotBuffers(PersistentSendsTable *slot)
 {
   if (PERSIST_BUFFERS_NUM == 2) {
+#if 0
   void *tmp = slot->destAddress[0];
   slot->destAddress[0] = slot->destAddress[1];
   slot->destAddress[1] = tmp;
   tmp = slot->destSizeAddress[0];
   slot->destSizeAddress[0] = slot->destSizeAddress[1];
   slot->destSizeAddress[1] = tmp;
+#else
+  PersistentBuf tmp = slot->destBuf[0];
+  slot->destBuf[0] = slot->destBuf[1];
+  slot->destBuf[1] = tmp;
+#endif
   }
 }
 
 void initRecvSlot(PersistentReceivesTable *slot)
 {
   int i;
+#if 0
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
     slot->messagePtr[i] = NULL;
     slot->recvSizePtr[i] = NULL;
   }
+#endif
+  memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
   slot->sizeMax = 0;
   slot->prev = slot->next = NULL;
 }
@@ -95,12 +111,18 @@ void initRecvSlot(PersistentReceivesTable *slot)
 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
 {
   if (PERSIST_BUFFERS_NUM == 2) {
+#if 0
   void *tmp = slot->messagePtr[0];
   slot->messagePtr[0] = slot->messagePtr[1];
   slot->messagePtr[1] = tmp;
   tmp = slot->recvSizePtr[0];
   slot->recvSizePtr[0] = slot->recvSizePtr[1];
   slot->recvSizePtr[1] = tmp;
+#else
+  PersistentBuf tmp = slot->destBuf[0];
+  slot->destBuf[0] = slot->destBuf[1];
+  slot->destBuf[1] = tmp;
+#endif
   }
 }
 
@@ -188,8 +210,12 @@ static void persistentRequestHandler(void *env)
   setupRecvSlot(slot, msg->maxBytes);
 
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+#if 0
     gmsg->msgAddr[i] = slot->messagePtr[i];
     gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
+#else
+    gmsg->buf[i] = slot->destBuf[i];
+#endif
   }
 
   gmsg->sourceHandlerIndex = msg->sourceHandlerIndex;
@@ -210,8 +236,12 @@ static void persistentReqGrantedHandler(void *env)
   PersistentSendsTable *slot = (PersistentSendsTable *)h;
   CmiAssert(slot->used == 1);
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+#if 0
     slot->destAddress[i] = msg->msgAddr[i];
     slot->destSizeAddress[i] = msg->slotFlagAddress[i];
+#else
+    slot->destBuf[i] = msg->buf[i];
+#endif
   }
   slot->destHandle = msg->destHandlerIndex;
 
@@ -239,9 +269,15 @@ PersistentReq CmiCreateReceiverPersistent(int maxBytes)
   ret.pe = CmiMyPe();
   ret.maxBytes = maxBytes;
   ret.myHand = h;
+  ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
+#if 0
     ret.messagePtr[i] = slot->messagePtr[i];
     ret.recvSizePtr[i] = slot->recvSizePtr[i];
+#else
+    ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
+    memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
+#endif
   }
 
   return ret;
@@ -257,10 +293,14 @@ PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
   slot->destPE = recvHand.pe;
   slot->sizeMax = recvHand.maxBytes;
 
+#if 0
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
     slot->destAddress[i] = recvHand.messagePtr[i];
     slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
   }
+#else
+  memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
+#endif
   slot->destHandle = recvHand.myHand;
   return h;
 }
@@ -292,8 +332,8 @@ void persistentDestoryHandler(void *env)
     persistentReceivesTableTail = slot->prev;
 
   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
-    if (slot->messagePtr[i]) /*elan_CmiStaticFree(slot->messagePtr);*/
-      PerFree((char*)slot->messagePtr[i]);
+    if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
+      PerFree((char*)slot->destBuf[i].destAddress);
 
   CmiFree(slot);
 }
@@ -335,9 +375,9 @@ void CmiDestoryAllPersistent()
     PersistentReceivesTable *next = slot->next;
     int i;
     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
-      if (slot->recvSizePtr[i])
+      if (slot->destBuf[i].destSizeAddress)
         CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
-      if (slot->messagePtr[i]) PerFree((char*)slot->messagePtr[i]);
+      if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
     }
     CmiFree(slot);
     slot = next;
index a221288cf8e1e03caa066995e5abcdb35d9ae1e5..d60ed90dc452bfb5ef002a37f282ef8736de4c5b 100644 (file)
@@ -50,8 +50,7 @@ typedef void * PersistentHandle;
 typedef struct {
   int pe;
   int maxBytes;
-  void *messagePtr[2];
-  void *recvSizePtr[2];
+  void **bufPtr;
   PersistentHandle myHand;
 } PersistentReq;
 
index ec6d8072154af05ebf0c6a61d84a538ef7c2ca73..93fdbea16c524f757705a6c7dcbdd33505ed61ca 100644 (file)
@@ -524,7 +524,7 @@ graph.o: graph.c $(CVHEADERS) $(UTILHEADERS)
 RTH.o: RTH.C RTH.h $(CVHEADERS)
        $(CHARMC) -o RTH.o RTH.C
 
-persist-comm.o: persist-comm.c persist_impl.h $(CVHEADERS)
+persist-comm.o: persist-comm.c $(CVHEADERS)
        $(CHARMC) -o persist-comm.o persist-comm.c
 
 conv-lists.o: conv-lists.C $(UTILHEADERS) $(CVHEADERS)