an alternative to pxshm for intra-node communication, uses Linux kernel module xpmem
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 1 Feb 2012 06:26:39 +0000 (22:26 -0800)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 1 Feb 2012 06:26:39 +0000 (22:26 -0800)
src/arch/gemini_gni-crayxe/conv-mach-xpmem.h [new file with mode: 0644]
src/arch/gemini_gni-crayxe/conv-mach-xpmem.sh [new file with mode: 0644]
src/arch/util/machine-xpmem.c [new file with mode: 0644]

diff --git a/src/arch/gemini_gni-crayxe/conv-mach-xpmem.h b/src/arch/gemini_gni-crayxe/conv-mach-xpmem.h
new file mode 100644 (file)
index 0000000..95265f3
--- /dev/null
@@ -0,0 +1,18 @@
+#ifndef _CONV_MACH_XPMEM_
+#define  _CONV_MACH_XPMEM
+
+#undef CMK_USE_XPMEM
+#define CMK_USE_XPMEM                  1
+
+#undef CMK_IMMEDIATE_MSG
+#define CMK_IMMEDIATE_MSG       0
+
+#undef CMK_BROADCAST_HYPERCUBE
+#define CMK_BROADCAST_HYPERCUBE                                   1
+
+#undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
+#define CMK_WHEN_PROCESSOR_IDLE_USLEEP  0
+
+#define XPMEM_LOCK                      1
+
+#endif
diff --git a/src/arch/gemini_gni-crayxe/conv-mach-xpmem.sh b/src/arch/gemini_gni-crayxe/conv-mach-xpmem.sh
new file mode 100644 (file)
index 0000000..a269d3c
--- /dev/null
@@ -0,0 +1 @@
+#CMK_LIBS="$CMK_LIBS -lrt"
diff --git a/src/arch/util/machine-xpmem.c b/src/arch/util/machine-xpmem.c
new file mode 100644 (file)
index 0000000..85687e0
--- /dev/null
@@ -0,0 +1,987 @@
+/** @file
+
+
+There are three options here for synchronization:
+      XPMEM_FENCE is the default. It uses memory fences
+      PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
+      XPMEM_LOCK will cause POSIX semaphores to be used
+
+  created by 
+       Gengbin Zheng, September 2011
+*/
+
+/**
+ * @addtogroup NET
+ * @{
+ */
+
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "xpmem.h"
+
+/************** 
+   Determine which type of synchronization to use 
+*/
+#if PXSHM_OSSPINLOCK
+#include <libkern/OSAtomic.h>
+#elif XPMEM_LOCK
+#include <semaphore.h>
+#else
+/* Default to using fences */
+#define XPMEM_FENCE 1
+#endif
+
+
+#define MEMDEBUG(x) //x
+
+#define PXSHM_STATS 0
+
+
+/*** The following code was copied verbatim from pcqueue.h file ***/
+#undef CmiMemoryWriteFence
+#if XPMEM_FENCE
+#ifdef POWER_PC
+#define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("eieio":::"memory")
+#else
+#define CmiMemoryWriteFence(startPtr,nBytes) asm volatile("sfence":::"memory")
+//#define CmiMemoryWriteFence(startPtr,nBytes) 
+#endif
+#else
+#undef CmiMemoryWriteFence
+#define CmiMemoryWriteFence(startPtr,nBytes)  
+#endif
+
+#undef CmiMemoryReadFence
+#if XPMEM_FENCE
+#ifdef POWER_PC
+#define CmiMemoryReadFence(startPtr,nBytes) asm volatile("eieio":::"memory")
+#else
+#define CmiMemoryReadFence(startPtr,nBytes) asm volatile("lfence":::"memory")
+//#define CmiMemoryReadFence(startPtr,nBytes) 
+#endif
+#else
+#define CmiMemoryReadFence(startPtr,nBytes) 
+#endif
+
+#if CMK_SMP
+#error  "PXSHM can only be used in non-smp build of Charm++"
+#endif
+
+/***************************************************************************************/
+
+enum entities {SENDER,RECEIVER};
+
+/************************
+ *     Implementation currently assumes that
+ *     1) all nodes have the same number of processors
+ *  2) in the nodelist all processors in a node are listed in sequence
+ *   0 1 2 3      4 5 6 7 
+ *   -------      -------
+ *    node 1       node 2 
+ ************************/
+
+#define NAMESTRLEN 60
+#define PREFIXSTRLEN 50 
+
+#define XPMEMBUFLEN (1024*1024*2)
+#define XPMEMMAXSIZE     (1*1024)
+
+#define SENDQSTARTSIZE    256
+
+
+/// This struct is used as the first portion of a shared memory region, followed by data
+typedef struct {
+       int count; //number of messages
+       int bytes; //number of bytes
+
+#if PXSHM_OSSPINLOCK
+       OSSpinLock lock;
+#endif
+
+#if XPMEM_FENCE
+       volatile int flagSender;
+        CmiMemorySMPSeparation_t pad1;
+       volatile int flagReceiver;
+        CmiMemorySMPSeparation_t pad2;
+       volatile int turn;
+#endif 
+
+} sharedBufHeader;
+
+
+typedef struct {
+#if XPMEM_LOCK
+       sem_t *mutex;
+#endif
+       sharedBufHeader *header;        
+       char *data;
+        __s64  segid;
+} sharedBufData;
+
+typedef struct OutgoingMsgRec
+{
+  char *data;
+  int  *refcount;
+  int   size;
+}
+OutgoingMsgRec;
+
+typedef struct {
+       int size; //total size of data array
+       int begin; //position of first element
+       int end;        //position of next element
+       int numEntries; //number of entries
+
+       OutgoingMsgRec *data;
+
+} XpmemSendQ;
+
+typedef struct {
+       int nodesize;
+       int noderank;
+       int nodestart,nodeend;//proc numbers for the start and end of this node
+       char prefixStr[PREFIXSTRLEN];
+       char **recvBufNames;
+       char **sendBufNames;
+
+       sharedBufData *recvBufs;
+       sharedBufData *sendBufs;
+
+       XpmemSendQ **sendQs;
+
+
+#if PXSHM_STATS
+       int sendCount;
+       int validCheckCount;
+       int lockRecvCount;
+       double validCheckTime;
+       double sendTime;
+       double commServerTime;
+#endif
+
+} XpmemContext;
+
+
+
+XpmemContext *xpmemContext=NULL; //global context
+
+
+void calculateNodeSizeAndRank(char **);
+void setupSharedBuffers();
+void initAllSendQs();
+
+int xpmem_fd;
+
+/******************
+ *     Initialization routine
+ *     currently just testing start up
+ * ****************/
+void CmiInitXpmem(char **argv){
+        char input[32];
+
+       MACHSTATE(3,"CminitXpmem start");
+       xpmemContext = (XpmemContext *)calloc(1,sizeof(XpmemContext));
+
+#if CMK_NET_VERSION
+       if(Cmi_charmrun_pid <= 0){
+               CmiAbort("pxshm must be run with charmrun");
+       }
+#endif
+       calculateNodeSizeAndRank(argv);
+       if(xpmemContext->nodesize == 1) return;
+       
+       MACHSTATE1(3,"CminitXpmem  %d calculateNodeSizeAndRank",xpmemContext->nodesize);
+
+        xpmem_fd = open("/dev/xpmem", O_RDWR);
+        if (xpmem_fd == -1) {
+            CmiAbort("Opening /dev/xpmem");
+        }
+
+
+#if CMK_CRAYXE
+        srand(getpid());
+        int Cmi_charmrun_pid = rand();
+        PMI_Bcast(&Cmi_charmrun_pid, sizeof(int));
+#elif !CMK_NET_VERSION
+        #error "need a unique number"
+#endif
+       snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
+
+       MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
+
+       setupSharedBuffers();
+
+       MACHSTATE2(3,"CminitXpmem %s %d setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
+
+       initAllSendQs();
+       
+       MACHSTATE2(3,"CminitXpmem %s %d initAllSendQs",xpmemContext->prefixStr,xpmemContext->nodesize);
+
+       MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
+
+};
+
+/**************
+ * shutdown shmem objects and semaphores
+ *
+ * *******************/
+void tearDownSharedBuffers();
+
+void CmiExitXpmem(){
+       int i=0;
+       
+       if(xpmemContext->nodesize != 1){
+               tearDownSharedBuffers();
+       
+               for(i=0;i<xpmemContext->nodesize;i++){
+                       if(i != xpmemContext->noderank){
+                               break;
+                       }
+               }
+               free(xpmemContext->recvBufNames[i]);
+               free(xpmemContext->sendBufNames[i]);
+
+               free(xpmemContext->recvBufNames);
+               free(xpmemContext->sendBufNames);
+
+               free(xpmemContext->recvBufs);
+               free(xpmemContext->sendBufs);
+
+       }
+#if PXSHM_STATS
+CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
+#endif
+       free(xpmemContext);
+}
+
+/******************
+ *Should this message be sent using PxShm or not ?
+ * ***********************/
+
+inline int CmiValidXpmem(int dst, int size){
+#if PXSHM_STATS
+       xpmemContext->validCheckCount++;
+#endif
+
+/*     if(xpmemContext->nodesize == 1){
+               return 0;
+       }*/
+       //replace by bitmap later
+       //if(ogm->dst >= xpmemContext->nodestart && ogm->dst <= xpmemContext->nodeend && ogm->size < SHMBUFLEN ){
+       if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size > XPMEMMAXSIZE ){
+               return 1;
+       }else{
+               return 0;
+       }
+};
+
+
+inline int XpmemRank(int dst){
+       return dst - xpmemContext->nodestart;
+}
+inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
+inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
+inline int flushSendQ(int dstRank);
+
+inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
+  return sendMessage(omg->data, omg->size, omg->refcount, dstBuf, dstSendQ);
+}
+
+/***************
+ *
+ *Send this message through shared memory
+ *if you cannot get lock, put it in the sendQ
+ *Before sending messages pick them from sendQ
+ *
+ * ****************************/
+
+void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
+{
+
+#if PXSHM_STATS
+       double _startSendTime = CmiWallTimer();
+#endif
+
+        LrtsPrepareEnvelope(msg, size);
+       
+       int dstRank = XpmemRank(dstpe);
+       MEMDEBUG(CmiMemoryCheck());
+  
+       MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
+       MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
+
+       CmiAssert(dstRank >=0 && dstRank != xpmemContext->noderank);
+       
+       sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
+
+
+#if PXSHM_OSSPINLOCK
+       if(! OSSpinLockTry(&dstBuf->header->lock)){
+#elif XPMEM_LOCK
+       if(sem_trywait(dstBuf->mutex) < 0){
+#elif XPMEM_FENCE
+       dstBuf->header->flagSender = 1;
+       dstBuf->header->turn = RECEIVER;
+       CmiMemoryReadFence(0,0);
+       CmiMemoryWriteFence(0,0);
+       //if(dstBuf->header->flagReceiver && dstBuf->header->turn == RECEIVER){
+       if(dstBuf->header->flagReceiver){
+               dstBuf->header->flagSender = 0;
+#endif
+               /**failed to get the lock 
+               insert into q and retain the message*/
+
+               pushSendQ(xpmemContext->sendQs[dstRank], msg, size, refcount);
+               (*refcount)++;
+               MEMDEBUG(CmiMemoryCheck());
+               return;
+       }else{
+
+               /***
+                * We got the lock for this buffer
+                * first write all the messages in the sendQ and then write this guy
+                * */
+                if(xpmemContext->sendQs[dstRank]->numEntries == 0){
+                               // send message user event
+                               int ret = sendMessage(msg,size,refcount,dstBuf,xpmemContext->sendQs[dstRank]);
+                               MACHSTATE(3,"Xpmem Send succeeded immediately");
+                }else{
+                               (*refcount)+=2;/*this message should not get deleted when the queue is flushed*/
+                               pushSendQ(xpmemContext->sendQs[dstRank],msg,size,refcount);
+                               MACHSTATE3(3,"Xpmem ogm %p pushed to sendQ length %d refcount %d",ogm,xpmemContext->sendQs[dstRank]->numEntries,ogm->refcount);
+                               int sent = flushSendQ(dstRank);
+                               (*refcount)--; /*if it has been sent, can be deleted by caller, if not will be deleted when queue is flushed*/
+                               MACHSTATE1(3,"Xpmem flushSendQ sent %d messages",sent);
+                }
+                /* unlock the recvbuffer*/
+
+#if PXSHM_OSSPINLOCK
+                OSSpinLockUnlock(&dstBuf->header->lock);
+#elif XPMEM_LOCK
+                sem_post(dstBuf->mutex);
+#elif XPMEM_FENCE
+                CmiMemoryReadFence(0,0);                       
+                CmiMemoryWriteFence(0,0);
+                dstBuf->header->flagSender = 0;
+#endif
+       }
+#if PXSHM_STATS
+               xpmemContext->sendCount ++;
+               xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
+#endif
+       MEMDEBUG(CmiMemoryCheck());
+
+};
+
+inline void emptyAllRecvBufs();
+inline void flushAllSendQs();
+
+/**********
+ * Extract all the messages from the recvBuffers you can
+ * Flush all sendQs
+ * ***/
+inline void CommunicationServerXpmem(){
+       
+#if PXSHM_STATS
+       double _startCommServerTime =CmiWallTimer();
+#endif 
+       
+       MEMDEBUG(CmiMemoryCheck());
+       emptyAllRecvBufs();
+       flushAllSendQs();
+
+#if PXSHM_STATS
+       xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
+#endif
+
+       MEMDEBUG(CmiMemoryCheck());
+};
+
+static void CmiNotifyStillIdleXpmem(CmiIdleState *s){
+       CommunicationServerXpmem();
+}
+
+
+static void CmiNotifyBeginIdleXpmem(CmiIdleState *s)
+{
+       CmiNotifyStillIdle(s);
+}
+
+
+void calculateNodeSizeAndRank(char **argv){
+       xpmemContext->nodesize=1;
+       MACHSTATE(3,"calculateNodeSizeAndRank start");
+       //CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
+       CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
+       MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
+
+       xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
+       
+       MACHSTATE1(3,"calculateNodeSizeAndRank noderank %d",xpmemContext->noderank);
+       
+       xpmemContext->nodestart = _Cmi_mynode -xpmemContext->noderank;
+       
+       MACHSTATE(3,"calculateNodeSizeAndRank nodestart ");
+
+       xpmemContext->nodeend = xpmemContext->nodestart + xpmemContext->nodesize -1;
+
+       if(xpmemContext->nodeend >= _Cmi_numnodes){
+               xpmemContext->nodeend = _Cmi_numnodes-1;
+               xpmemContext->nodesize = (xpmemContext->nodeend - xpmemContext->nodestart) +1;
+       }
+       
+       MACHSTATE3(3,"calculateNodeSizeAndRank nodestart %d nodesize %d noderank %d",xpmemContext->nodestart,xpmemContext->nodesize,xpmemContext->noderank);
+}
+
+void allocBufNameStrings(char ***bufName);
+void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames);
+void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames);
+void removeXpmemFiles();
+
+/***************
+ *     calculate the name of the shared objects and semaphores
+ *     
+ *     name scheme
+ *     shared memory: charm_pxshm_<recvernoderank>_<sendernoderank>  
+ *  semaphore    : charm_pxshm_<recvernoderank>_<sendernoderank>.sem for semaphore for that shared object
+ *                the semaphore name used by us is the same as the shared memory object name
+ *                the posix library adds the semaphore tag // in linux at least . other machines might need more portable code
+ *
+ *     open these shared objects and semaphores
+ * *********/
+void setupSharedBuffers(){
+       int i=0;
+        
+       allocBufNameStrings(&(xpmemContext->recvBufNames));
+       
+       allocBufNameStrings((&xpmemContext->sendBufNames));
+       
+       for(i=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+                       snprintf(xpmemContext->recvBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,xpmemContext->noderank+xpmemContext->nodestart,i+xpmemContext->nodestart);
+                       MACHSTATE2(3,"recvBufName %s with rank %d",xpmemContext->recvBufNames[i],i)
+                       snprintf(xpmemContext->sendBufNames[i],NAMESTRLEN-1,"%s_%d_%d",xpmemContext->prefixStr,i+xpmemContext->nodestart,xpmemContext->noderank+xpmemContext->nodestart);
+                       MACHSTATE2(3,"sendBufName %s with rank %d",xpmemContext->sendBufNames[i],i);
+               }
+       }
+       
+       createRecvXpmemAndSems(&(xpmemContext->recvBufs),xpmemContext->recvBufNames);
+        CmiBarrier();
+       createSendXpmemAndSems(&(xpmemContext->sendBufs),xpmemContext->sendBufNames);
+        CmiBarrier();
+        removeXpmemFiles();
+       
+       for(i=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+                       //CmiAssert(xpmemContext->sendBufs[i].header->count == 0);
+                       xpmemContext->sendBufs[i].header->count = 0;
+                       xpmemContext->sendBufs[i].header->bytes = 0;
+               }
+       }
+}
+
+void allocBufNameStrings(char ***bufName){
+       int i,count;
+       
+       int totalAlloc = sizeof(char)*NAMESTRLEN*(xpmemContext->nodesize-1);
+       char *tmp = malloc(totalAlloc);
+       
+       MACHSTATE2(3,"allocBufNameStrings tmp %p totalAlloc %d",tmp,totalAlloc);
+
+       *bufName = (char **)malloc(sizeof(char *)*xpmemContext->nodesize);
+       
+       for(i=0,count=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+                       (*bufName)[i] = &(tmp[count*NAMESTRLEN*sizeof(char)]);
+                       count++;
+               }else{
+                       (*bufName)[i] = NULL;
+               }
+       }
+}
+
+__s64 createXpmemObject(int size,char **pPtr)
+{
+        struct xpmem_cmd_make make_info;
+        int ret;
+
+        *pPtr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
+        if (*pPtr == MAP_FAILED) {
+            perror("Creating mapping.");
+            return -1;
+        }
+        make_info.vaddr = (__u64) *pPtr;
+        make_info.size = size;
+        make_info.permit_type = XPMEM_PERMIT_MODE;
+        make_info.permit_value = (__u64) 0600;
+        ret = ioctl(xpmem_fd, XPMEM_CMD_MAKE, &make_info);
+        if (ret != 0) {
+            perror("xpmem_make");
+            CmiAbort("xpmem_make");
+        }
+        return make_info.segid;
+}
+
+void attachXpmemObject(__s64 segid, int size, char **pPtr)
+{
+       int ret;
+       __s64 apid;
+       struct xpmem_cmd_get get_info;
+       struct xpmem_cmd_attach attach_info;
+       int *attached_buffer;
+
+       xpmem_fd = open("/dev/xpmem", O_RDWR);
+       if (xpmem_fd == -1) {
+               CmiAbort("Opening /dev/xpmem");
+       }
+
+       get_info.segid = segid;
+       get_info.flags = XPMEM_RDWR;
+       get_info.permit_type = XPMEM_PERMIT_MODE;
+       get_info.permit_value = (__u64) NULL;
+       ret = ioctl(xpmem_fd, XPMEM_CMD_GET, &get_info);
+       if (ret != 0) {
+               CmiAbort("xpmem_get");
+       }
+       apid = get_info.apid;
+
+       attach_info.apid = get_info.apid;
+       attach_info.offset = 0;
+       attach_info.size = size;
+       attach_info.vaddr = (__u64) NULL;
+       attach_info.fd = xpmem_fd;
+       attach_info.flags = 0;
+
+       ret = ioctl(xpmem_fd, XPMEM_CMD_ATTACH, &attach_info);
+       if (ret != 0) {
+               CmiAbort("xpmem_attach");
+       }
+
+       *pPtr = (void *)attach_info.vaddr;
+}
+
+void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
+       int i=0;
+        __s64 *segid_arr;
+        int size, pagesize = getpagesize();
+       
+       *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
+        segid_arr = malloc(sizeof(__s64)*xpmemContext->nodesize);
+       
+        size = XPMEMBUFLEN+sizeof(sharedBufHeader);
+        size = ((~(pagesize-1))&(size+pagesize-1));
+
+       for(i=0;i<xpmemContext->nodesize;i++){
+           if(i != xpmemContext->noderank)  {
+                (*bufs)[i].segid = segid_arr[i] = createXpmemObject(size,(char **)&((*bufs)[i].header));
+               //      createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
+                memset(((*bufs)[i].header), 0, size);
+               (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
+#if PXSHM_OSSPINLOCK
+               (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
+#elif XPMEM_LOCK
+               (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
+#endif
+           }else{
+               (*bufs)[i].header = NULL;
+               (*bufs)[i].data = NULL;
+#if XPMEM_LOCK
+               (*bufs)[i].mutex = NULL;
+#endif
+           }
+       }       
+
+        int fd;
+        char fname[128];
+        sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
+        fd = open(fname, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
+        if (fd == -1) {
+          CmiAbort("createShmObjectsAndSems failed");
+        }
+        write(fd, segid_arr, sizeof(__s64*)*xpmemContext->nodesize);
+        close(fd);
+        free(segid_arr);
+}
+
+void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
+{
+        int i;
+        int size, pagesize;
+
+        pagesize = getpagesize();
+        size = XPMEMBUFLEN+sizeof(sharedBufHeader);
+        size = ((~(pagesize-1))&(size+pagesize-1));
+
+       *bufs = (sharedBufData *)calloc(xpmemContext->nodesize, sizeof(sharedBufData));
+
+       for(i=0;i<xpmemContext->nodesize;i++){
+           if(i != xpmemContext->noderank)  {
+                __s64 segid;
+                 char fname[128];
+                 int fd;
+                 sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+i);
+                 fd = open(fname, O_RDONLY);
+                 if (fd == -1) {
+                     CmiAbort("createShmObjectsAndSems failed");
+                 }
+                lseek(fd, xpmemContext->noderank*sizeof(__s64), SEEK_SET);
+                read(fd, &segid, sizeof(__s64*));
+                close(fd);
+                (*bufs)[i].segid = segid;
+                attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
+                memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
+               (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
+#if PXSHM_OSSPINLOCK
+               (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
+#elif XPMEM_LOCK
+               (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
+#endif
+           }else{
+               (*bufs)[i].header = NULL;
+               (*bufs)[i].data = NULL;
+#if XPMEM_LOCK
+               (*bufs)[i].mutex = NULL;
+#endif
+           }
+        }
+}
+
+void removeXpmemFiles()
+{
+        char fname[128];
+        sprintf(fname, ".xpmem.%d", xpmemContext->nodestart+xpmemContext->noderank);
+        unlink(fname);
+}
+
+void tearDownSharedBuffers(){
+       int i;
+       for(i= 0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+#if 0
+                       if(shm_unlink(xpmemContext->recvBufNames[i]) < 0) {
+                               fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
+                       }
+#endif
+
+#if XPMEM_LOCK
+                       sem_close(xpmemContext->recvBufs[i].mutex);
+                       //sem_unlink(xpmemContext->recvBufNames[i]);
+                       sem_close(xpmemContext->sendBufs[i].mutex);
+#endif
+               }
+       }
+};
+
+
+void initSendQ(XpmemSendQ *q,int size);
+
+void initAllSendQs(){
+       int i=0;
+       xpmemContext->sendQs = (XpmemSendQ **) malloc(sizeof(XpmemSendQ *)*xpmemContext->nodesize);
+       for(i=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+                       (xpmemContext->sendQs)[i] = (XpmemSendQ *)calloc(1, sizeof(XpmemSendQ));
+                       initSendQ((xpmemContext->sendQs)[i],SENDQSTARTSIZE);
+               }else{
+                       (xpmemContext->sendQs)[i] = NULL;
+               }
+       }
+};
+
+
+/****************
+ *copy this message into the sharedBuf
+ If it does not succeed
+ *put it into the sendQ 
+ *NOTE: This method is called only after obtaining the corresponding mutex
+ * ********/
+int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ){
+
+       if(dstBuf->header->bytes+size <= XPMEMBUFLEN){
+               /**copy  this message to sharedBuf **/
+               dstBuf->header->count++;
+               memcpy(dstBuf->data+dstBuf->header->bytes,msg,size);
+               dstBuf->header->bytes += size;
+               MACHSTATE4(3,"Xpmem send done ogm %p size %d dstBuf->header->count %d dstBuf->header->bytes %d",ogm,ogm->size,dstBuf->header->count,dstBuf->header->bytes);
+                CmiFree(msg);
+               return 1;
+       }
+       /***
+        * Shared Buffer is too full for this message
+        * **/
+       //printf("[%d] send buffer is too full\n", CmiMyPe());
+       pushSendQ(dstSendQ,msg,size,refcount);
+       (*refcount)++;
+       MACHSTATE3(3,"Xpmem send ogm %p size %d queued refcount %d",ogm,ogm->size,ogm->refcount);
+       return 0;
+}
+
+inline OutgoingMsgRec* popSendQ(XpmemSendQ *q);
+
+/****
+ *Try to send all the messages in the sendq to this destination rank
+ *NOTE: This method is called only after obtaining the corresponding mutex
+ * ************/
+
+inline int flushSendQ(int dstRank){
+       sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
+       XpmemSendQ *dstSendQ = xpmemContext->sendQs[dstRank];
+       int count=dstSendQ->numEntries;
+       int sent=0;
+       while(count > 0){
+               OutgoingMsgRec *ogm = popSendQ(dstSendQ);
+               (*ogm->refcount)--;
+               MACHSTATE4(3,"Xpmem trysending ogm %p size %d to dstRank %d refcount %d",ogm,ogm->size,dstRank,ogm->refcount);
+               int ret = sendMessageRec(ogm,dstBuf,dstSendQ);
+               if(ret==1){
+                       sent++;
+#if CMK_NET_VERSION
+                        GarbageCollectMsg(ogm);
+#endif
+               }
+               count--;
+       }
+       return sent;
+}
+
+inline void emptyRecvBuf(sharedBufData *recvBuf);
+
+inline void emptyAllRecvBufs(){
+       int i;
+       for(i=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank){
+                       sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
+                       if(recvBuf->header->count > 0){
+
+#if PXSHM_STATS
+                               xpmemContext->lockRecvCount++;
+#endif
+
+
+#if PXSHM_OSSPINLOCK
+                               if(! OSSpinLockTry(&recvBuf->header->lock)){
+#elif XPMEM_LOCK
+                               if(sem_trywait(recvBuf->mutex) < 0){
+#elif XPMEM_FENCE
+                               recvBuf->header->flagReceiver = 1;
+                               recvBuf->header->turn = SENDER;
+                               CmiMemoryReadFence(0,0);
+                               CmiMemoryWriteFence(0,0);
+                               //if((recvBuf->header->flagSender && recvBuf->header->turn == SENDER)){
+                               if((recvBuf->header->flagSender)){
+                                       recvBuf->header->flagReceiver = 0;
+#endif
+                               }else{
+
+
+                                       MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
+                                       emptyRecvBuf(recvBuf);
+
+#if PXSHM_OSSPINLOCK
+                                       OSSpinLockUnlock(&recvBuf->header->lock);
+#elif XPMEM_LOCK
+                                       sem_post(recvBuf->mutex);
+#elif XPMEM_FENCE
+                                       CmiMemoryReadFence(0,0);
+                                       CmiMemoryWriteFence(0,0);
+                                       recvBuf->header->flagReceiver = 0;
+#endif
+
+                               }
+                       
+                       }
+               }
+       }
+};
+
+inline void flushAllSendQs(){
+       int i=0;
+       
+       for(i=0;i<xpmemContext->nodesize;i++){
+               if(i != xpmemContext->noderank && xpmemContext->sendQs[i]->numEntries > 0){
+       
+#if PXSHM_OSSPINLOCK
+                       if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
+#elif XPMEM_LOCK
+                       if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
+#elif XPMEM_FENCE
+                       xpmemContext->sendBufs[i].header->flagSender = 1;
+                       xpmemContext->sendBufs[i].header->turn = RECEIVER;
+                       CmiMemoryReadFence(0,0);                        
+                       CmiMemoryWriteFence(0,0);
+                       if(!(xpmemContext->sendBufs[i].header->flagReceiver && xpmemContext->sendBufs[i].header->turn == RECEIVER)){
+#endif
+
+                               MACHSTATE1(3,"flushSendQ %d",i);
+                               flushSendQ(i);
+
+
+                               
+#if PXSHM_OSSPINLOCK   
+                               OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
+#elif XPMEM_LOCK
+                               sem_post(xpmemContext->sendBufs[i].mutex);
+#elif XPMEM_FENCE
+                               CmiMemoryReadFence(0,0);                        
+                               CmiMemoryWriteFence(0,0);
+                               xpmemContext->sendBufs[i].header->flagSender = 0;
+#endif
+                       }else{
+
+#if XPMEM_FENCE
+                         xpmemContext->sendBufs[i].header->flagSender = 0;
+#endif                         
+
+                       }
+
+               }        
+       }       
+};
+
+void static inline handoverXpmemMessage(char *newmsg,int total_size,int rank,int broot);
+
+void emptyRecvBuf(sharedBufData *recvBuf){
+       int numMessages = recvBuf->header->count;
+       int i=0;
+
+       char *ptr=recvBuf->data;
+
+       for(i=0;i<numMessages;i++){
+               int size;
+               int rank, srcpe, seqno, magic, i;
+               unsigned int broot;
+               char *msg = ptr;
+               char *newMsg;
+
+#if CMK_NET_VERSION
+               DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
+               size = CmiMsgHeaderGetLength(msg);
+#else
+                size = CmiGetMsgSize(msg);
+#endif
+       
+               newMsg = (char *)CmiAlloc(size);
+               memcpy(newMsg,msg,size);
+
+#if CMK_NET_VERSION
+               handoverPxshmMessage(newMsg,size,rank,broot);
+#else
+                handleOneRecvedMsg(size, newMsg);
+#endif
+               
+               ptr += size;
+
+               MACHSTATE3(3,"message of size %d recvd ends at ptr-data %d total bytes %d bytes %d",size,ptr-recvBuf->data,recvBuf->header->bytes);
+       }
+#if 1
+  if(ptr - recvBuf->data != recvBuf->header->bytes){
+               CmiPrintf("[%d] ptr - recvBuf->data  %d recvBuf->header->bytes %d numMessages %d \n",_Cmi_mynode, ptr - recvBuf->data, recvBuf->header->bytes,numMessages);
+       }
+#endif
+       CmiAssert(ptr - recvBuf->data == recvBuf->header->bytes);
+       recvBuf->header->count=0;
+       recvBuf->header->bytes=0;
+}
+
+
+#if CMK_NET_VERSION
+void static inline handoverPxshmMessage(char *newmsg,int total_size,int rank,int broot){
+       CmiAssert(rank == 0);
+#if CMK_BROADCAST_SPANNING_TREE
+        if (rank == DGRAM_BROADCAST
+#if CMK_NODE_QUEUE_AVAILABLE
+          || rank == DGRAM_NODEBROADCAST
+#endif
+         ){
+               SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
+                                       }
+#elif CMK_BROADCAST_HYPERCUBE
+        if (rank == DGRAM_BROADCAST
+#if CMK_NODE_QUEUE_AVAILABLE
+          || rank == DGRAM_NODEBROADCAST
+#endif
+         ){
+                       SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
+                                       }
+#endif
+
+               switch (rank) {
+       case DGRAM_BROADCAST: {
+          CmiPushPE(0, newmsg);
+          break;
+      }
+        default:
+                               {
+                                       
+          CmiPushPE(rank, newmsg);
+                               }
+       }    /* end of switch */
+}
+#endif
+
+
+/**************************
+ *sendQ helper functions
+ * ****************/
+
+void initSendQ(XpmemSendQ *q,int size){
+       q->data = (OutgoingMsgRec *)calloc(size, sizeof(OutgoingMsgRec));
+
+       q->size = size;
+       q->numEntries = 0;
+
+       q->begin = 0;
+       q->end = 0;
+}
+
+void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount){
+       if(q->numEntries == q->size){
+               //need to resize 
+               OutgoingMsgRec *oldData = q->data;
+               int newSize = q->size<<1;
+               q->data = (OutgoingMsgRec *)calloc(newSize, sizeof(OutgoingMsgRec));
+               //copy head to the beginning of the new array
+               CmiAssert(q->begin == q->end);
+
+               CmiAssert(q->begin < q->size);
+               memcpy(&(q->data[0]),&(oldData[q->begin]),sizeof(OutgoingMsgRec)*(q->size - q->begin));
+
+               if(q->end!=0){
+                       memcpy(&(q->data[(q->size - q->begin)]),&(oldData[0]),sizeof(OutgoingMsgRec)*(q->end));
+               }
+               free(oldData);
+               q->begin = 0;
+               q->end = q->size;
+               q->size = newSize;
+       }
+       OutgoingMsgRec *omg = &q->data[q->end];
+        omg->size = size;
+        omg->data = msg;
+        omg->refcount = refcount;
+       (q->end)++;
+       if(q->end >= q->size){
+               q->end -= q->size;
+       }
+       q->numEntries++;
+}
+
+OutgoingMsgRec * popSendQ(XpmemSendQ *q){
+       OutgoingMsgRec * ret;
+       if(0 == q->numEntries){
+               return NULL;
+       }
+
+       ret = &q->data[q->begin];
+       (q->begin)++;
+       if(q->begin >= q->size){
+               q->begin -= q->size;
+       }
+       
+       q->numEntries--;
+       return ret;
+}