add xpmem into machine common core.
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 1 Feb 2012 07:41:49 +0000 (01:41 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 1 Feb 2012 07:41:49 +0000 (01:41 -0600)
src/arch/gemini_gni-crayxe/conv-mach-xpmem.h
src/arch/gemini_gni/Makefile.machine
src/arch/util/machine-common-core.c
src/arch/util/machine-xpmem.c

index 95265f3fc76e3357d3f99d560669fc8a086c6141..39cc4d8765678943216c473d0646d7827cf06c9e 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef _CONV_MACH_XPMEM_
 #define  _CONV_MACH_XPMEM
 
+#undef CMK_USE_PXSHM
 #undef CMK_USE_XPMEM
 #define CMK_USE_XPMEM                  1
 
index d08ea07f59a0d2ad9f30648500e2133e594fc5df..84664f7cd3e93d869d43e2da7337ba080db1cf05 100644 (file)
@@ -1,2 +1,2 @@
-$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-pxshm.c machine-persistent.c machine-commthd-util.c
+$(L)/libconv-cplus-n.a: machine.c machine-common-core.c machine-broadcast.c machine-lrts.h machine-pxshm.c machine-xpmem.c machine-persistent.c machine-commthd-util.c
 
index 2f219044ea537a82db91b49ed0803a203eecee44..3c3d7557debc2911bfacc1357053fc0b3a9a9eac 100644 (file)
@@ -468,6 +468,14 @@ inline void CommunicationServerPxshm();
 void CmiExitPxshm();
 #endif
 
+#if CMK_USE_XPMEM
+inline int CmiValidXpmem(int dst, int size);
+void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot);
+void CmiInitXpmem(char **argv);
+inline void CommunicationServerXpmem();
+void CmiExitXpmem();
+#endif
+
 int refcount = 0;
 
 void CmiFreeSendFn(int destPE, int size, char *msg) {
@@ -506,6 +514,17 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
           return;
         }
 #endif
+#if CMK_USE_XPMEM
+        int ret=CmiValidXpmem(destPE, size);
+        if (ret) {
+          CmiSendMessageXpmem(msg, size, destPE, &refcount, CmiRankOf(destPE), 0);
+          //for (int i=0; i<refcount; i++) CmiReference(msg);
+#if CMK_PERSISTENT_COMM
+          if (phs) curphs++;
+#endif
+          return;
+        }
+#endif
 if (  MSG_STATISTIC)
 {
     int ret_log = _cmi_log2(size);
@@ -650,6 +669,9 @@ if (  MSG_STATISTIC)
 #if CMK_USE_PXSHM
     CmiInitPxshm(argv);
 #endif
+#if CMK_USE_XPMEM
+    CmiInitXpmem(argv);
+#endif
 
     /* CmiTimerInit(); */
 #if CMK_BROADCAST_HYPERCUBE
@@ -762,6 +784,9 @@ static INLINE_KEYWORD void AdvanceCommunication(int whenidle) {
 #if CMK_USE_PXSHM
     CommunicationServerPxshm();
 #endif
+#if CMK_USE_XPMEM
+    CommunicationServerXpmem();
+#endif
 
     LrtsAdvanceCommunication(whenidle);
 
@@ -796,6 +821,9 @@ static void CommunicationServer(int sleepTime) {
 
 #if CMK_USE_PXSHM
         CmiExitPxshm();
+#endif
+#if CMK_USE_XPMEM
+        CmiExitXpmem();
 #endif
         LrtsExit();
     }
@@ -838,6 +866,9 @@ if (MSG_STATISTIC)
 #if !CMK_SMP
 #if CMK_USE_PXSHM
     CmiExitPxshm();
+#endif
+#if CMK_USE_XPMEM
+    CmiExitXpmem();
 #endif
     LrtsExit();
 #else
@@ -968,5 +999,8 @@ static char *CopyMsg(char *msg, int len) {
 #if CMK_USE_PXSHM
 #include "machine-pxshm.c"
 #endif
+#if CMK_USE_XPMEM
+#include "machine-xpmem.c"
+#endif
 
 
index 85687e02ca0777de4269f081dc9ecde80a8feb01..3b79551bf3a30575c422393223c5edec50d0521c 100644 (file)
@@ -3,7 +3,7 @@
 
 There are three options here for synchronization:
       XPMEM_FENCE is the default. It uses memory fences
-      PXSHM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
+      XPMEM_OSSPINLOCK will cause OSSpinLock's to be used (available on OSX)
       XPMEM_LOCK will cause POSIX semaphores to be used
 
   created by 
@@ -21,13 +21,14 @@ There are three options here for synchronization:
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <errno.h>
+#include <signal.h>
 
 #include "xpmem.h"
 
 /************** 
    Determine which type of synchronization to use 
 */
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
 #include <libkern/OSAtomic.h>
 #elif XPMEM_LOCK
 #include <semaphore.h>
@@ -36,10 +37,9 @@ There are three options here for synchronization:
 #define XPMEM_FENCE 1
 #endif
 
-
 #define MEMDEBUG(x) //x
 
-#define PXSHM_STATS 0
+#define XPMEM_STATS 0
 
 
 /*** The following code was copied verbatim from pcqueue.h file ***/
@@ -88,8 +88,9 @@ enum entities {SENDER,RECEIVER};
 #define NAMESTRLEN 60
 #define PREFIXSTRLEN 50 
 
-#define XPMEMBUFLEN (1024*1024*2)
-#define XPMEMMAXSIZE     (1*1024)
+#define XPMEMBUFLEN      (1024*1024*4)
+#define XPMEMMINSIZE     (1*1024)
+#define XPMEMMAXSIZE     (1024*1024)
 
 #define SENDQSTARTSIZE    256
 
@@ -99,7 +100,7 @@ typedef struct {
        int count; //number of messages
        int bytes; //number of bytes
 
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
        OSSpinLock lock;
 #endif
 
@@ -155,7 +156,7 @@ typedef struct {
        XpmemSendQ **sendQs;
 
 
-#if PXSHM_STATS
+#if XPMEM_STATS
        int sendCount;
        int validCheckCount;
        int lockRecvCount;
@@ -175,6 +176,13 @@ void calculateNodeSizeAndRank(char **);
 void setupSharedBuffers();
 void initAllSendQs();
 
+void CmiExitXpmem();
+
+static void cleanupOnAllSigs(int signo)
+{
+    CmiExitXpmem();
+}
+
 int xpmem_fd;
 
 /******************
@@ -224,6 +232,13 @@ void CmiInitXpmem(char **argv){
 
        MACHSTATE2(3,"CminitXpmem %s %d done",xpmemContext->prefixStr,xpmemContext->nodesize);
 
+        signal(SIGSEGV, cleanupOnAllSigs);
+        signal(SIGFPE, cleanupOnAllSigs);
+        signal(SIGILL, cleanupOnAllSigs);
+        signal(SIGTERM, cleanupOnAllSigs);
+        signal(SIGABRT, cleanupOnAllSigs);
+        signal(SIGQUIT, cleanupOnAllSigs);
+        signal(SIGBUS, cleanupOnAllSigs);
 };
 
 /**************
@@ -235,6 +250,8 @@ void tearDownSharedBuffers();
 void CmiExitXpmem(){
        int i=0;
        
+        if (xpmemContext == NULL) return;
+
        if(xpmemContext->nodesize != 1){
                tearDownSharedBuffers();
        
@@ -253,10 +270,11 @@ void CmiExitXpmem(){
                free(xpmemContext->sendBufs);
 
        }
-#if PXSHM_STATS
+#if XPMEM_STATS
 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,xpmemContext->sendCount,xpmemContext->sendTime,xpmemContext->validCheckCount,xpmemContext->validCheckTime,xpmemContext->commServerTime,xpmemContext->lockRecvCount);
 #endif
        free(xpmemContext);
+        xpmemContext = NULL;
 }
 
 /******************
@@ -264,7 +282,7 @@ CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6
  * ***********************/
 
 inline int CmiValidXpmem(int dst, int size){
-#if PXSHM_STATS
+#if XPMEM_STATS
        xpmemContext->validCheckCount++;
 #endif
 
@@ -273,7 +291,8 @@ inline int CmiValidXpmem(int dst, int size){
        }*/
        //replace by bitmap later
        //if(ogm->dst >= xpmemContext->nodestart && ogm->dst <= xpmemContext->nodeend && ogm->size < SHMBUFLEN ){
-       if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size > XPMEMMAXSIZE ){
+       //if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size < XPMEMMAXSIZE && size > XPMEMMINSIZE){
+       if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size <= XPMEMMAXSIZE){
                return 1;
        }else{
                return 0;
@@ -284,6 +303,7 @@ inline int CmiValidXpmem(int dst, int size){
 inline int XpmemRank(int dst){
        return dst - xpmemContext->nodestart;
 }
+
 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
 inline int sendMessage(char *msg, int size, int *refcount, sharedBufData *dstBuf,XpmemSendQ *dstSendQ);
 inline int flushSendQ(int dstRank);
@@ -303,7 +323,7 @@ inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ
 void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
 {
 
-#if PXSHM_STATS
+#if XPMEM_STATS
        double _startSendTime = CmiWallTimer();
 #endif
 
@@ -320,7 +340,7 @@ void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank
        sharedBufData *dstBuf = &(xpmemContext->sendBufs[dstRank]);
 
 
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
        if(! OSSpinLockTry(&dstBuf->header->lock)){
 #elif XPMEM_LOCK
        if(sem_trywait(dstBuf->mutex) < 0){
@@ -360,7 +380,7 @@ void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank
                 }
                 /* unlock the recvbuffer*/
 
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                 OSSpinLockUnlock(&dstBuf->header->lock);
 #elif XPMEM_LOCK
                 sem_post(dstBuf->mutex);
@@ -370,7 +390,7 @@ void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank
                 dstBuf->header->flagSender = 0;
 #endif
        }
-#if PXSHM_STATS
+#if XPMEM_STATS
                xpmemContext->sendCount ++;
                xpmemContext->sendTime += (CmiWallTimer()-_startSendTime);
 #endif
@@ -387,7 +407,7 @@ inline void flushAllSendQs();
  * ***/
 inline void CommunicationServerXpmem(){
        
-#if PXSHM_STATS
+#if XPMEM_STATS
        double _startCommServerTime =CmiWallTimer();
 #endif 
        
@@ -395,7 +415,7 @@ inline void CommunicationServerXpmem(){
        emptyAllRecvBufs();
        flushAllSendQs();
 
-#if PXSHM_STATS
+#if XPMEM_STATS
        xpmemContext->commServerTime += (CmiWallTimer()-_startCommServerTime);
 #endif
 
@@ -418,6 +438,8 @@ void calculateNodeSizeAndRank(char **argv){
        MACHSTATE(3,"calculateNodeSizeAndRank start");
        //CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node (for non-smp case).Used by the shared memory communication layer");
        CmiGetArgIntDesc(argv, "+nodesize", &(xpmemContext->nodesize),"Number of cores in this node");
+        if (_Cmi_mynode == 0 && xpmemContext->nodesize > 1)
+         CmiPrintf("Charm++> xpmem enabled: %d cores per node\n", xpmemContext->nodesize);
        MACHSTATE1(3,"calculateNodeSizeAndRank argintdesc %d",xpmemContext->nodesize);
 
        xpmemContext->noderank = _Cmi_mynode % (xpmemContext->nodesize);
@@ -582,7 +604,7 @@ void createRecvXpmemAndSems(sharedBufData **bufs,char **bufNames){
                //      createShmObject(bufNames[i],SHMBUFLEN+sizeof(sharedBufHeader),(char **)&((*bufs)[i].header));
                 memset(((*bufs)[i].header), 0, size);
                (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
 #elif XPMEM_LOCK
                (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
@@ -636,7 +658,7 @@ void createSendXpmemAndSems(sharedBufData **bufs,char **bufNames)
                 attachXpmemObject(segid, size,(char **)&((*bufs)[i].header));
                 memset(((*bufs)[i].header), 0, XPMEMBUFLEN+sizeof(sharedBufHeader));
                (*bufs)[i].data = ((char *)((*bufs)[i].header))+sizeof(sharedBufHeader);
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                (*bufs)[i].header->lock = 0; // by convention(see man page) 0 means unlocked
 #elif XPMEM_LOCK
                (*bufs)[i].mutex = sem_open(bufNames[i],O_CREAT, S_IRUSR | S_IWUSR,1);
@@ -758,12 +780,12 @@ inline void emptyAllRecvBufs(){
                        sharedBufData *recvBuf = &(xpmemContext->recvBufs[i]);
                        if(recvBuf->header->count > 0){
 
-#if PXSHM_STATS
+#if XPMEM_STATS
                                xpmemContext->lockRecvCount++;
 #endif
 
 
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                                if(! OSSpinLockTry(&recvBuf->header->lock)){
 #elif XPMEM_LOCK
                                if(sem_trywait(recvBuf->mutex) < 0){
@@ -782,7 +804,7 @@ inline void emptyAllRecvBufs(){
                                        MACHSTATE1(3,"emptyRecvBuf to be called for rank %d",i);                        
                                        emptyRecvBuf(recvBuf);
 
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                                        OSSpinLockUnlock(&recvBuf->header->lock);
 #elif XPMEM_LOCK
                                        sem_post(recvBuf->mutex);
@@ -805,7 +827,7 @@ inline void flushAllSendQs(){
        for(i=0;i<xpmemContext->nodesize;i++){
                if(i != xpmemContext->noderank && xpmemContext->sendQs[i]->numEntries > 0){
        
-#if PXSHM_OSSPINLOCK
+#if XPMEM_OSSPINLOCK
                        if(OSSpinLockTry(&xpmemContext->sendBufs[i].header->lock)){
 #elif XPMEM_LOCK
                        if(sem_trywait(xpmemContext->sendBufs[i].mutex) >= 0){
@@ -822,7 +844,7 @@ inline void flushAllSendQs(){
 
 
                                
-#if PXSHM_OSSPINLOCK   
+#if XPMEM_OSSPINLOCK   
                                OSSpinLockUnlock(&xpmemContext->sendBufs[i].header->lock);
 #elif XPMEM_LOCK
                                sem_post(xpmemContext->sendBufs[i].mutex);