Merge branch 'charm' of charmgit:charm into charm-mpi-interop
authorNikhil Jain <nikhil@illinois.edu>
Sat, 31 Mar 2012 17:06:35 +0000 (12:06 -0500)
committerNikhil Jain <nikhil@illinois.edu>
Sat, 31 Mar 2012 17:06:35 +0000 (12:06 -0500)
Conflicts:
src/conv-core/converse.h

1  2 
src/arch/util/machine-common-core.c
src/ck-core/init.C
src/conv-core/converse.h

index 3dad0f0d1079e4aeb6042a850044cccd6bd1056c,c2727170e8d2823e94703b1ebd992402c209d0f9..59cfbd6fd4cfdb1edac87e9da80b1a2d045faa20
@@@ -151,7 -151,11 +151,11 @@@ CpvDeclare(void*, CmiLocalQueue)
  
  enum MACHINE_SMP_MODE {
      INVALID_MODE,
+ #if CMK_BLUEGENEQ
+     COMM_THREAD_SEND_RECV = 1,
+ #else 
      COMM_THREAD_SEND_RECV = 0,
+ #endif
      COMM_THREAD_ONLY_RECV, /* work threads will do the send */
      COMM_WORK_THREADS_SEND_RECV, /* work and comm threads do the both send/recv */
      COMM_THREAD_NOT_EXIST /* work threads will do both send and recv */
@@@ -374,7 -378,7 +378,7 @@@ void CmiPushPE(int rank,void *msg) 
      }
  #endif
  
 -    PCQueuePush(cs->recv,msg);
 +    PCQueuePush(cs->recv,(char*)msg);
  
  #if CMK_SHARED_VARS_POSIX_THREADS_SMP
    if (_Cmi_noprocforcommthread)
@@@ -473,7 -477,7 +477,7 @@@ void CmiSyncSendFn(int destPE, int size
  #include "machine-xpmem.c"
  #endif
  
- int refcount = 0;
static int refcount = 0;
  
  #if CMK_USE_OOB
  CpvExtern(int, _urgentSend);
  #if CMK_C_INLINE
  inline 
  #endif
- CmiCommHandle LrtsSendNetworkFunc(int destNode, int size, char *msg, int mode)
+ CmiCommHandle CmiSendNetworkFunc(int destNode, int size, char *msg, int mode)
  {
          int rank;
  #if CMK_USE_PXSHM
            return 0;
          }
  #endif
+ #if CMK_PERSISTENT_COMM
+         if (CpvAccess(phs)) {
+           if (size > PERSIST_MIN_SIZE) {
+             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
+             LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)], destNode, size, msg);
+             return 0;
+           }
+         }
+ #endif
  
  #if CMK_WITH_STATS
  if (MSG_STATISTIC)
@@@ -523,29 -536,24 +536,24 @@@ void CmiFreeSendFn(int destPE, int size
  #if CMK_PERSISTENT_COMM
          if (CpvAccess(phs)) CpvAccess(curphs)++;
  #endif
-     } else {
- #if CMK_PERSISTENT_COMM
-         if (CpvAccess(phs)) {
-           if (size > 8192) {
-             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
-             int destNode = CmiNodeOf(destPE);
-             CMI_DEST_RANK(msg) = CmiRankOf(destPE);
-             LrtsSendPersistentMsg(CpvAccess(phs)[CpvAccess(curphs)++], destNode, size, msg);
-             return;
-           }
-           else
-             CpvAccess(curphs)++;
-         }
- #endif
+     } 
+     else {
          int destNode = CmiNodeOf(destPE);
+         int destRank = CmiRankOf(destPE);
  #if CMK_SMP
          if (CmiMyNode()==destNode) {
-             CmiPushPE(CmiRankOf(destPE), msg);
+             CmiPushPE(destRank, msg);
+ #if CMK_PERSISTENT_COMM
+             if (CpvAccess(phs)) CpvAccess(curphs)++;
+ #endif
              return;
          }
  #endif
-         CMI_DEST_RANK(msg) = CmiRankOf(destPE);
-         LrtsSendNetworkFunc(destNode, size, msg, P2P_SYNC);
+         CMI_DEST_RANK(msg) = destRank;
+         CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
+ #if CMK_PERSISTENT_COMM
+         if (CpvAccess(phs)) CpvAccess(curphs)++;
+ #endif
      }
  }
  #endif
@@@ -792,23 -800,16 +800,23 @@@ static void ConverseRunPE(int everRetur
         node barrier previously should take care of the node synchronization */
      _immediateReady = 1;
  
 -    /* communication thread */
 -    if (CmiMyRank() == CmiMyNodeSize()) {
 +    if(CharmLibInterOperate) {
 +      /* !!! Not considering SMP mode now */
 +      /* TODO: make interoperability working in SMP!!! */
 +      Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
 +      CsdScheduler(-1);
 +    } else {
 +      /* communication thread */
 +      if (CmiMyRank() == CmiMyNodeSize()) {
          Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
          while (1) CommunicationServerThread(5);
 -    } else { /* worker thread */
 +      } else { /* worker thread */
          if (!everReturn) {
 -            Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
 -            if (Cmi_usrsched==0) CsdScheduler(-1);
 -            ConverseExit();
 +          Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
 +          if (Cmi_usrsched==0) CsdScheduler(-1);
 +          ConverseExit();
          }
 +      }
      }
  }
  /* ##### End of Functions Related with Machine Startup ##### */
@@@ -848,7 -849,7 +856,7 @@@ extern void ConverseCommonExit()
  
  static void CommunicationServer(int sleepTime) {
  #if CMK_SMP
-     AdvanceCommunication(0);
+     AdvanceCommunication(1);
  
      if (commThdExit == CmiMyNodeSize()) {
          MACHSTATE(2, "CommunicationServer exiting {");
diff --combined src/ck-core/init.C
index abb70e769bf3b35dcb361ccaf3ffce7f1cd79512,0e7ebc04fb1011ba577326b3ba131627ab24c1b5..61e2d302e09e5447af3352a90c06e6f978a97998
@@@ -67,7 -67,6 +67,7 @@@ never be excluded..
  #include "trace.h"
  
  void CkRestartMain(const char* dirname);
 +int storeInterOperateStatus = 0;
  
  #define  DEBUGF(x)     //CmiPrintf x;
  
@@@ -193,7 -192,8 +193,8 @@@ static char* _restartDir
  #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
  int teamSize=1;
  int chkptPeriod=1000;
- bool parallelRestart=false;
+ bool fastRecovery = false;
+ int parallelRecovery = 1;
  extern int BUFFER_TIME; //time spent waiting for buffered messages
  #endif
  
@@@ -294,9 -294,12 +295,12 @@@ static inline void _parseCommandLineOpt
      if(!CmiGetArgIntDesc(argv,"+chkptPeriod",&chkptPeriod,"Set the checkpoint period for the message logging fault tolerance algorithm in seconds")){
          chkptPeriod = 100;
      }
-     if(CmiGetArgFlagDesc(argv,"+Parallelrestart", "Parallel Restart with message logging protocol")){
-         parallelRestart = true;
+       if(CmiGetArgIntDesc(argv,"+fastRecovery", &parallelRecovery, "Parallel recovery with message logging protocol")){
+         fastRecovery = true;
      }
+ #endif
+ #if defined(_FAULT_MLOG_)
      if(!CmiGetArgIntDesc(argv,"+mlog_local_buffer",&_maxBufferedMessages,"# of local messages buffered in the message logging protoocl")){
          _maxBufferedMessages = 2;
      }
      if(!CmiGetArgIntDesc(argv,"+mlog_buffer_time",&BUFFER_TIME,"# Time spent waiting for messages to be buffered in the message logging protoocl")){
          BUFFER_TIME = 2;
      }
  #endif        
        /* Anytime migration flag */
        _isAnytimeMigration = true;
@@@ -468,12 -470,10 +471,12 @@@ static void _exitHandler(envelope *env
    switch(env->getMsgtype()) {
      case StartExitMsg:
        CkAssert(CkMyPe()==0);
 -      if (!_CkExitFnVec.isEmpty()) {
 -        CkExitFn fn = _CkExitFnVec.deq();
 -        fn();
 -        break;
 +      if(!CharmLibInterOperate) {
 +        if (!_CkExitFnVec.isEmpty()) {
 +          CkExitFn fn = _CkExitFnVec.deq();
 +          fn();
 +          break;
 +        }
        }
        // else goto next
      case ExitMsg:
          return;
        }
        _exitStarted = 1;
 -      CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
 -      CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
 +      if(!CharmLibInterOperate) {
 +        CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
 +        CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
 +      }
        env->setMsgtype(ReqStatMsg);
        env->setSrcPe(CkMyPe());
        // if exit in ring, instead of broadcasting, send in ring
        }       
        break;
      case ReqStatMsg:
 +    if(!CharmLibInterOperate) {
  #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
          _messageLoggingExit();
  #endif
        DEBUGF(("ReqStatMsg on %d\n", CkMyPe()));
        CkNumberHandler(_charmHandlerIdx,(CmiHandler)_discardHandler);
        CkNumberHandler(_bocHandlerIdx, (CmiHandler)_discardHandler);
 -      /*FAULT_EVAC*/
 +      /*FAULT_EVAC*/
        if(CmiNodeAlive(CkMyPe())){
           _sendStats();
 -      }       
 +      }
        _mainDone = 1; // This is needed because the destructors for
                       // readonly variables will be called when the program
                     // exits. If the destructor is called while _mainDone
  #if CMK_TRACE_ENABLED
        if (_ringexit) traceClose();
  #endif
 +    }
        if (_ringexit) {
          int stride = CkNumPes()/_ringtoken;
          int pe = CkMyPe()+1;
        }
        else
          CmiFree(env);
 -      if(CkMyPe()){
 -      DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
 -        ConverseExit();
 -      }       
 +      //everyone exits here - there may be issues with leftover messages in the queue
 +      if(CharmLibInterOperate) {
 +        DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
 +        _exitStarted = 0;
 +        CpvAccess(charmLibExitFlag) = 1;
 +      } else {
 +        if(CkMyPe()){
 +          DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
 +          CharmLibInterOperate = storeInterOperateStatus;
 +          CpvAccess(charmLibExitFlag) = 1;
 +          ConverseExit();
 +        }
 +      }
        break;
      case StatMsg:
 +// shouldn't reach here in interoperate mode
        CkAssert(CkMyPe()==0);
  #if CMK_WITH_STATS
        _allStats[env->getSrcPe()] = (Stats*) EnvToUsr(env);
                        /*FAULT_EVAC*/
        if(_numStatsRecd==CkNumValidPes()) {
          _printStats();
 -      DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
 +        DEBUGF(("[%d] Calling converse exit \n",CkMyPe()));
 +        CharmLibInterOperate = storeInterOperateStatus;
 +        CpvAccess(charmLibExitFlag) = 1;
          ConverseExit();
        }
        break;
@@@ -847,14 -831,11 +850,14 @@@ void CkExit(void
    CmiSetHandler(env, _exitHandlerIdx);
    CmiSyncSendAndFree(0, env->getTotalsize(), (char *)env);
  
 +  if(!CharmLibInterOperate) {
  #if ! CMK_BIGSIM_THREAD
 -  _TRACE_END_EXECUTE();
 -  //Wait for stats, which will call ConverseExit when finished:
 -  CsdScheduler(-1);
 +    _TRACE_END_EXECUTE();
 +    //Wait for stats, which will call ConverseExit when finished:
 +    if(!storeInterOperateStatus)
 +      CsdScheduler(-1);
  #endif
 +  }
  }
  
  /* This is a routine called in case the application is closing due to a signal.
@@@ -960,6 -941,13 +963,13 @@@ void _initCharm(int unused_argc, char *
  { 
        int inCommThread = (CmiMyRank() == CmiMyNodeSize());
  
+       if(CmiMyNode() == 0 && CmiMyRank() == 0) {
+     if(CmiGetArgFlag(argv, "+printTopo")) {
+                       TopoManager tmgr;
+                       tmgr.printAllocation();
+               }
+       }
        DEBUGF(("[%d,%.6lf ] _initCharm started\n",CmiMyPe(),CmiWallTimer()));
  
        CkpvInitialize(size_t *, _offsets);
@@@ -1413,21 -1401,4 +1423,21 @@@ void registerExitFn(CkExitFn fn
    _CkExitFnVec.enq(fn);
  }
  
 +void CharmLibInit(int peid, int numpes, int argc, char **argv){
 +    //note CmiNumNodes and CmiMyNode should just be macros
 +    _Cmi_numnodes = numpes;
 +    _Cmi_mynode = peid;
 +
 +  CharmLibInterOperate = 1;
 +  ConverseInit(argc, argv, (CmiStartFn)_initCharm, 1, 0);
 +}
 +
 +void CharmLibExit() {
 +  storeInterOperateStatus = 1;
 +  CharmLibInterOperate = 0;
 +  if(CkMyPe() == 0) {
 +    CkExit();
 +  }
 +  CsdScheduler(-1);
 +}
  /*@}*/
diff --combined src/conv-core/converse.h
index df4b7c16c9507723bd93167d6885b8e2f1eeeab5,7d322c78cee1c695a3397cdef6bd182b181316da..34673d222043d7587bf03ab98ff8be8f32b71126
@@@ -130,8 -130,6 +130,8 @@@ extern int CmiMyRank_()
  extern int _Cmi_mype;
  extern int _Cmi_numpes;
  extern int _Cmi_myrank; /* Normally zero; only 1 during SIGIO handling */
 +extern int _Cmi_mynode;
 +extern int _Cmi_numnodes;
  
  #define CmiMyPe()           _Cmi_mype
  #define CmiMyRank()         0
@@@ -204,11 -202,18 +204,18 @@@ extern void CmiNodeBarrier(void)
  extern void CmiNodeAllBarrier(void);
  #define CmiSvAlloc CmiAlloc
  
+ #if CMK_HAS_SPINLOCK && CMK_USE_SPINLOCK
+ typedef pthread_spinlock_t *CmiNodeLock;
+ #define CmiLock(lock) (pthread_spin_lock(lock))
+ #define CmiUnlock(lock) (pthread_spin_unlock(lock))
+ #define CmiTryLock(lock) (pthread_spin_trylock(lock))
+ #else
  typedef pthread_mutex_t *CmiNodeLock;
- extern CmiNodeLock CmiCreateLock();
  #define CmiLock(lock) (pthread_mutex_lock(lock))
  #define CmiUnlock(lock) (pthread_mutex_unlock(lock))
  #define CmiTryLock(lock) (pthread_mutex_trylock(lock))
+ #endif
+ extern CmiNodeLock CmiCreateLock();
  extern void CmiDestroyLock(CmiNodeLock lock);
  
  extern CmiNodeLock CmiMemLock_lock;
@@@ -1831,6 -1836,14 +1838,14 @@@ extern int *memCriticalEntries
  
  double CmiReadSize(const char *str);
  
+ #if  CMK_CONVERSE_GEMINI_UGNI
+ void CmiTurnOnStats();
+ void CmiTurnOffStats();
+ #else
+ #define CmiTurnOnStats()
+ #define CmiTurnOffStats()
+ #endif
  #if defined(__cplusplus)
  }                                         /* end of extern "C"  */
  #endif
@@@ -1906,11 -1919,4 +1921,10 @@@ EXTERN void CmiNotifyCommThd(CmiNotifyC
  CpvCExtern(int, _urgentSend);
  #define CmiEnableUrgentSend(yn)   CpvAccess(_urgentSend)=(yn)
  
 +/* CharmLibInterOperate should be a global variable as it will be
 + * set only once by MPI ranks respectively.
 + */
 +extern int CharmLibInterOperate;
 +CpvExtern(int,charmLibExitFlag);
 +
  #endif /* CONVERSE_H */