move CmiAbort to machine common code, and add a new LrtsAbort for LRTS.
authorGengbin Zheng <gzheng@illinois.edu>
Fri, 3 Feb 2012 06:26:47 +0000 (00:26 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Fri, 3 Feb 2012 06:26:47 +0000 (00:26 -0600)
clean up shmem and semaphore in pxshm version
add LrtsSendNetwork() which is a wrapper for LrtsSend() with pxshm support.

src/arch/gemini_gni/machine.c
src/arch/util/machine-broadcast.c
src/arch/util/machine-common-core.c
src/arch/util/machine-lrts.h
src/arch/util/machine-pxshm.c
src/arch/util/machine-xpmem.c

index 28bac1fd0f93c4d5804d7ba088dd2e4acbfd8d93..5f18f08330542324e0973c4947bdc21dbad6a8df 100644 (file)
@@ -178,6 +178,7 @@ uint8_t   onesided_hnd, omdh;
 
 #define FMA_PER_CORE  1024
 #define FMA_BUFFER_SIZE 1024
+
 /* If SMSG is used */
 static int  SMSG_MAX_MSG = 1024;
 #define SMSG_MAX_CREDIT 72 
@@ -1001,10 +1002,10 @@ inline void LrtsPrepareEnvelope(char *msg, int size)
 
 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
 {
-
     gni_return_t        status  =   GNI_RC_SUCCESS;
     uint8_t tag;
     CONTROL_MSG         *control_msg_tmp;
+
     LrtsPrepareEnvelope(msg, size);
 
 #if PRINT_SYH
@@ -2010,6 +2011,8 @@ static void _init_static_smsg()
     uint32_t              vmdh_index = -1;
     mdh_addr_t            base_infor;
     mdh_addr_t            *base_addr_vec;
+    char *env;
+
     if(mysize <=512)
     {
         SMSG_MAX_MSG = 1024;
@@ -2023,6 +2026,10 @@ static void _init_static_smsg()
         SMSG_MAX_MSG = 256;
     }
     
+    env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
+    if (env) SMSG_MAX_MSG = atoi(env);
+    CmiAssert(SMSG_MAX_MSG > 0);
+
     smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
     
     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
@@ -2033,7 +2040,7 @@ static void _init_static_smsg()
     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
     CmiAssert(ret == 0);
     bzero(smsg_mailbox_base, smsg_memlen*(mysize));
-    //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG. \n", smsg_memlen*mysize/1e6);
+    //if (myrank == 0) printf("Charm++> allocates %.2fMB for SMSG mailbox. \n", smsg_memlen*mysize/1e6);
     
     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
             smsg_memlen*(mysize), smsg_rx_cqh,
@@ -2080,8 +2087,8 @@ static void _init_static_smsg()
     } //end initialization
 
     free(base_addr_vec);
-
     free(smsg_attr);
+
     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
 } 
@@ -2325,6 +2332,8 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     unsigned int            local_addr, *MPID_UGNI_AllAddr;
     int                     first_spawned;
     int                     physicalID;
+    char                   *env;
+
     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
@@ -2350,8 +2359,14 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
 #endif
 
+    env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
+    if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
+
+    env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
+    if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
+
     useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
@@ -2432,9 +2447,9 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
         _init_smsg();
         PMI_Barrier();
     }
+
 #if     USE_LRTS_MEMPOOL
     char *str;
-    //if (CmiGetArgLong(*argv, "+useMemorypoolSize", &_mempool_size))
     if (CmiGetArgStringDesc(*argv,"+useMemorypoolSize",&str,"Set the memory pool size")) 
     {
       if (strpbrk(str,"G")) {
@@ -2563,10 +2578,9 @@ void LrtsDrainResources()
     PMI_Barrier();
 }
 
-void CmiAbort(const char *message) {
-
-    CmiPrintStackTrace(0);
+void LrtsAbort(const char *message) {
     printf("CmiAbort is calling on PE:%d\n", myrank);
+    CmiPrintStackTrace(0);
     PMI_Abort(-1, message);
 }
 
index e6975916fae095f947c666ec227e7f7bfbc3dba5..67b2fb1c9c1aa00c2fff16f21bf224d103f9bf27 100644 (file)
@@ -103,10 +103,10 @@ static void SendSpanningChildren(int size, char *msg, int rankToAssign, int star
         CmiAssert(nd>=0 && nd!=CmiMyNode());
 #if CMK_BROADCAST_USE_CMIREFERENCE
         CmiReference(msg);
-        LrtsSendFunc(nd, size, msg, BCAST_SYNC);
+        LrtsSendNetworkFunc(nd, size, msg, BCAST_SYNC);
 #else
         newmsg = CopyMsg(msg, size);
-        LrtsSendFunc(nd, size, newmsg, BCAST_SYNC);
+        LrtsSendNetworkFunc(nd, size, newmsg, BCAST_SYNC);
 #endif
     }
     CMI_DEST_RANK(msg) = oldRank;
@@ -151,10 +151,10 @@ static void SendHyperCube(int size,  char *msg, int rankToAssign, int startNode)
         CmiAssert(nd>=0 && nd!=CmiMyNode());
 #if CMK_BROADCAST_USE_CMIREFERENCE
         CmiReference(msg);
-        LrtsSendFunc(nd, size, msg, BCAST_SYNC);
+        LrtsSendNetworkFunc(nd, size, msg, BCAST_SYNC);
 #else
         char *newmsg = CopyMsg(msg, size);
-        LrtsSendFunc(nd, size, newmsg, BCAST_SYNC);
+        LrtsSendNetworkFunc(nd, size, newmsg, BCAST_SYNC);
 #endif
     }
     CMI_DEST_RANK(msg) = oldRank;
index 3c3d7557debc2911bfacc1357053fc0b3a9a9eac..ce14ea167198541d609b9c2f92fdb5391defb081 100644 (file)
@@ -462,7 +462,7 @@ void CmiSyncSendFn(int destPE, int size, char *msg) {
 
 #if CMK_USE_PXSHM
 inline int CmiValidPxshm(int dst, int size);
-void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot);
+void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount);
 void CmiInitPxshm(char **argv);
 inline void CommunicationServerPxshm();
 void CmiExitPxshm();
@@ -470,7 +470,7 @@ void CmiExitPxshm();
 
 #if CMK_USE_XPMEM
 inline int CmiValidXpmem(int dst, int size);
-void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot);
+void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount);
 void CmiInitXpmem(char **argv);
 inline void CommunicationServerXpmem();
 void CmiExitXpmem();
@@ -478,6 +478,37 @@ void CmiExitXpmem();
 
 int refcount = 0;
 
+/* a wrapper of LrtsSendFunc */
+#if CMK_C_INLINE
+inline 
+#endif
+CmiCommHandle LrtsSendNetworkFunc(int destNode, int size, char *msg, int mode)
+{
+        int rank;
+#if CMK_USE_PXSHM
+        if (CmiValidPxshm(destNode, size)) {
+          CmiSendMessagePxshm(msg, size, destNode, &refcount);
+          //for (int i=0; i<refcount; i++) CmiReference(msg);
+          return 0;
+        }
+#endif
+#if CMK_USE_XPMEM
+        if (CmiValidXpmem(destNode, size)) {
+          CmiSendMessageXpmem(msg, size, destNode, &refcount);
+          //for (int i=0; i<refcount; i++) CmiReference(msg);
+          return 0;
+        }
+#endif
+
+if (MSG_STATISTIC)
+{
+    int ret_log = _cmi_log2(size);
+    if(ret_log >21) ret_log = 21;
+    msg_histogram[ret_log]++;
+}
+    return LrtsSendFunc(destNode, size, msg, mode);
+}
+
 void CmiFreeSendFn(int destPE, int size, char *msg) {
     CMI_SET_BROADCAST_ROOT(msg, 0);
     CQdCreate(CpvAccess(cQdState), 1);
@@ -488,13 +519,16 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
 #endif
     } else {
 #if CMK_PERSISTENT_COMM
-        if (phs && size > 8192) {
+        if (phs) {
+          if (size > 8192) {
             CmiAssert(curphs < phsSize);
             LrtsSendPersistentMsg(phs[curphs++], destPE, size, msg);
             return;
+          }
+          else
+            curphs++;
         }
 #endif
-
         int destNode = CmiNodeOf(destPE);
 #if CMK_SMP
         if (CmiMyNode()==destNode) {
@@ -503,35 +537,7 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
         }
 #endif
         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
-#if CMK_USE_PXSHM
-        int ret=CmiValidPxshm(destPE, size);
-        if (ret) {
-          CmiSendMessagePxshm(msg, size, destPE, &refcount, CmiRankOf(destPE), 0);
-          //for (int i=0; i<refcount; i++) CmiReference(msg);
-#if CMK_PERSISTENT_COMM
-          if (phs) curphs++;
-#endif
-          return;
-        }
-#endif
-#if CMK_USE_XPMEM
-        int ret=CmiValidXpmem(destPE, size);
-        if (ret) {
-          CmiSendMessageXpmem(msg, size, destPE, &refcount, CmiRankOf(destPE), 0);
-          //for (int i=0; i<refcount; i++) CmiReference(msg);
-#if CMK_PERSISTENT_COMM
-          if (phs) curphs++;
-#endif
-          return;
-        }
-#endif
-if (  MSG_STATISTIC)
-{
-    int ret_log = _cmi_log2(size);
-    if(ret_log >21) ret_log = 21;
-    msg_histogram[ret_log]++;
-}
-        LrtsSendFunc(destNode, size, msg, P2P_SYNC);
+        LrtsSendNetworkFunc(destPE, size, msg, P2P_SYNC);
     }
 }
 #endif
@@ -882,6 +888,15 @@ if (MSG_STATISTIC)
 }
 /* ##### End of Functions Related with Machine Running ##### */
 
+void CmiAbort(const char *message) {
+#if CMK_USE_PXSHM
+    CmiExitPxshm();
+#endif
+#if CMK_USE_XPMEM
+    CmiExitXpmem();
+#endif
+    LrtsAbort(message);
+}
 
 /* ##### Beginning of Functions Providing Incoming Network Messages ##### */
 void *CmiGetNonLocal(void) {
index 6135fc2654cbaea43c4830f0637cd5e00f3a5299..bf628b59bb2db68642ead02ce94f6a4831e99401 100644 (file)
@@ -5,6 +5,7 @@ void LrtsPrepareEnvelope(char *msg, int size);
 
 /* The machine-specific send function */
 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode);
+CmiCommHandle LrtsSendNetworkFunc(int destNode, int size, char *msg, int mode);
 
 #if CMK_PERSISTENT_COMM
 void LrtsSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m);
@@ -21,6 +22,7 @@ void LrtsPostCommonInit(int everReturn);
 void LrtsAdvanceCommunication(int whileidle);
 void LrtsDrainResources(); /* used when exit */
 void LrtsExit();
+void LrtsAbort(const char *message);
 /* ### End of Machine-running Related Functions ### */
 void LrtsPostNonLocal();
 
index b3a7106c979466c5e47174a025f43abe461a93ce..59eec21226d3703b1f109adcb748dd0b7124c829 100644 (file)
@@ -255,6 +255,8 @@ void CmiInitPxshm(char **argv){
         signal(SIGABRT, cleanupOnAllSigs);
         signal(SIGQUIT, cleanupOnAllSigs);
         signal(SIGBUS, cleanupOnAllSigs);
+        signal(SIGINT, cleanupOnAllSigs);
+        signal(SIGTRAP, cleanupOnAllSigs);
 
 #if 0
         char name[64];
@@ -270,6 +272,7 @@ void CmiInitPxshm(char **argv){
 void tearDownSharedBuffers();
 
 void CmiExitPxshm(){
+        if (pxshmContext == NULL) return;
        if(pxshmContext->nodesize != 1){
                 int i;
                tearDownSharedBuffers();
@@ -293,13 +296,15 @@ void CmiExitPxshm(){
 CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6lf commServerTime %6lf lockRecvCount %d \n",_Cmi_mynode,pxshmContext->sendCount,pxshmContext->sendTime,pxshmContext->validCheckCount,pxshmContext->validCheckTime,pxshmContext->commServerTime,pxshmContext->lockRecvCount);
 #endif
        free(pxshmContext);
+        pxshmContext = NULL;
 }
 
 /******************
  *Should this message be sent using PxShm or not ?
  * ***********************/
 
-inline int CmiValidPxshm(int dst, int size){
+/* dstNode is node number */
+inline int CmiValidPxshm(int node, int size){
 #if PXSHM_STATS
        pxshmContext->validCheckCount++;
 #endif
@@ -309,17 +314,12 @@ inline int CmiValidPxshm(int dst, int size){
        }*/
        //replace by bitmap later
        //if(ogm->dst >= pxshmContext->nodestart && ogm->dst <= pxshmContext->nodeend && ogm->size < SHMBUFLEN ){
-        int node = CmiNodeOf(dst);
-       if(node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size < SHMMAXSIZE ){
-               return 1;
-       }else{
-               return 0;
-       }
+       return (node >= pxshmContext->nodestart && node <= pxshmContext->nodeend && size <= SHMMAXSIZE )? 1: 0;
 };
 
 
-inline int PxshmRank(int dst){
-       return CmiNodeOf(dst) - pxshmContext->nodestart;
+inline int PxshmRank(int dstnode){
+       return dstnode - pxshmContext->nodestart;
 }
 
 inline void pushSendQ(PxshmSendQ *q, char *msg, int size, int *refcount);
@@ -338,7 +338,7 @@ inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,PxshmSendQ
  *
  * ****************************/
 
-void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
+void CmiSendMessagePxshm(char *msg, int size, int dstnode, int *refcount)
 {
 
 #if PXSHM_STATS
@@ -347,7 +347,7 @@ void CmiSendMessagePxshm(char *msg, int size, int dstpe, int *refcount, int rank
 
         LrtsPrepareEnvelope(msg, size);
        
-       int dstRank = PxshmRank(dstpe);
+       int dstRank = PxshmRank(dstnode);
        MEMDEBUG(CmiMemoryCheck());
   
        MACHSTATE4(3,"Send Msg Pxshm ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
@@ -600,24 +600,23 @@ void createShmObject(char *name,int size,char **pPtr){
 }
 
 void tearDownSharedBuffers(){
-#if PXSHM_LOCK
        int i;
        for(i= 0;i<pxshmContext->nodesize;i++){
            if(i != pxshmContext->noderank){
-                if (pxshmContext->recvBufs[i].mutex != NULL)
-                {
-                   sem_close(pxshmContext->recvBufs[i].mutex);
-                   if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
-                       fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
-                   }
-                   sem_close(pxshmContext->sendBufs[i].mutex);
-                   sem_unlink(pxshmContext->sendBufNames[i]);
-                    pxshmContext->recvBufs[i].mutex = NULL;
-                    pxshmContext->sendBufs[i].mutex = NULL;
-                }
+               if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
+                   fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
+               }
+               sem_unlink(pxshmContext->sendBufNames[i]);
+#if PXSHM_LOCK
+               sem_close(pxshmContext->recvBufs[i].mutex);
+               sem_close(pxshmContext->sendBufs[i].mutex);
+               sem_unlink(pxshmContext->sendBufNames[i]);
+               sem_unlink(pxshmContext->recvBufNames[i]);
+                pxshmContext->recvBufs[i].mutex = NULL;
+                pxshmContext->sendBufs[i].mutex = NULL;
+#endif
            }
        }
-#endif
 };
 
 
index 7d9881839c94368ef12c93ce9e46343f4b4a91fd..8a414f191b34d8707216b3d50333c8f2677336b6 100644 (file)
@@ -218,7 +218,7 @@ void CmiInitXpmem(char **argv){
 #elif !CMK_NET_VERSION
         #error "need a unique number"
 #endif
-       snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_pxshm_%d",Cmi_charmrun_pid);
+       snprintf(&(xpmemContext->prefixStr[0]),PREFIXSTRLEN-1,"charm_xpmem_%d",Cmi_charmrun_pid);
 
        MACHSTATE2(3,"CminitXpmem %s %d pre setupSharedBuffers",xpmemContext->prefixStr,xpmemContext->nodesize);
 
@@ -281,7 +281,8 @@ CmiPrintf("[%d] sendCount %d sendTime %6lf validCheckCount %d validCheckTime %.6
  *Should this message be sent using PxShm or not ?
  * ***********************/
 
-inline int CmiValidXpmem(int dst, int size){
+/* dstNode is node number */
+inline int CmiValidXpmem(int node, int size){
 #if XPMEM_STATS
        xpmemContext->validCheckCount++;
 #endif
@@ -292,16 +293,12 @@ inline int CmiValidXpmem(int dst, int size){
        //replace by bitmap later
        //if(ogm->dst >= xpmemContext->nodestart && ogm->dst <= xpmemContext->nodeend && ogm->size < SHMBUFLEN ){
        //if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size < XPMEMMAXSIZE && size > XPMEMMINSIZE){
-       if(dst >= xpmemContext->nodestart && dst <= xpmemContext->nodeend && size <= XPMEMMAXSIZE){
-               return 1;
-       }else{
-               return 0;
-       }
+       return (node >= xpmemContext->nodestart && node <= xpmemContext->nodeend && size <= XPMEMMAXSIZE )? 1: 0;
 };
 
 
-inline int XpmemRank(int dst){
-       return dst - xpmemContext->nodestart;
+inline int XpmemRank(int dstnode){
+       return dstnode - xpmemContext->nodestart;
 }
 
 inline void pushSendQ(XpmemSendQ *q, char *msg, int size, int *refcount);
@@ -320,7 +317,7 @@ inline int sendMessageRec(OutgoingMsgRec *omg, sharedBufData *dstBuf,XpmemSendQ
  *
  * ****************************/
 
-void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank,unsigned int broot)
+void CmiSendMessageXpmem(char *msg, int size, int dstnode, int *refcount)
 {
 
 #if XPMEM_STATS
@@ -329,7 +326,7 @@ void CmiSendMessageXpmem(char *msg, int size, int dstpe, int *refcount, int rank
 
         LrtsPrepareEnvelope(msg, size);
        
-       int dstRank = XpmemRank(dstpe);
+       int dstRank = XpmemRank(dstnode);
        MEMDEBUG(CmiMemoryCheck());
   
        MACHSTATE4(3,"Send Msg Xpmem ogm %p size %d dst %d dstRank %d",ogm,ogm->size,ogm->dst,dstRank);
@@ -674,24 +671,19 @@ void removeXpmemFiles()
 }
 
 void tearDownSharedBuffers(){
-#if PXSHM_LOCK
        int i;
-       for(i= 0;i<pxshmContext->nodesize;i++){
-           if(i != pxshmContext->noderank){
-                if (pxshmContext->recvBufs[i].mutex != NULL)
-                {
-                   sem_close(pxshmContext->recvBufs[i].mutex);
-                   if(shm_unlink(pxshmContext->recvBufNames[i]) < 0){
-                       fprintf(stderr,"Error from shm_unlink %s \n",strerror(errno));
-                   }
-                   sem_close(pxshmContext->sendBufs[i].mutex);
-                   sem_unlink(pxshmContext->sendBufNames[i]);
-                    pxshmContext->recvBufs[i].mutex = NULL;
-                    pxshmContext->sendBufs[i].mutex = NULL;
-                }
+       for(i= 0;i<xpmemContext->nodesize;i++){
+           if(i != xpmemContext->noderank){
+#if XPMEM_LOCK
+               sem_close(xpmemContext->recvBufs[i].mutex);
+               sem_close(xpmemContext->sendBufs[i].mutex);
+               sem_unlink(xpmemContext->sendBufNames[i]);
+               sem_unlink(xpmemContext->recvBufNames[i]);
+                xpmemContext->recvBufs[i].mutex = NULL;
+                xpmemContext->sendBufs[i].mutex = NULL;
+#endif
            }
        }
-#endif
 };
 
 void initSendQ(XpmemSendQ *q,int size);