Merge branch 'charm' into development
authorFilippo Gioachin <gioachin@uiuc.edu>
Fri, 5 Feb 2010 02:07:01 +0000 (20:07 -0600)
committerFilippo Gioachin <gioachin@uiuc.edu>
Fri, 5 Feb 2010 02:07:01 +0000 (20:07 -0600)
1  2 
src/ck-core/ck.C
src/ck-core/envelope.h
src/ck-core/init.C

diff --combined src/ck-core/ck.C
index 3616919e59bbad218103c34b8ac5664ef188aff4,7afcef5b82dfc24bddb4c670108f4aa11cc568a0..9304fe9c7547bf0538146733d58c9b8ec1de38fe
@@@ -887,11 -887,15 +887,15 @@@ static inline void _processForVidMsg(Ck
  /************** Receive: Groups ****************/
  
  /**
-  This message is sent to this groupID--prepare to
-  handle this message by looking up the group,
-  and possibly stashing the message.
+  Return a pointer to the local BOC of "groupID".
+  The message "env" passed in has some known dependency on this groupID
+  (either it is to be delivered to this BOC, or it depends on this BOC being there).
+  Therefore, if the return value is NULL, this function buffers the massage so that
+  it will be re-sent (by CkCreateLocalBranch) when this groupID is eventually constructed.
+  The message passed in must have its handlers correctly set so that it can be
+  scheduled again.
  */
IrrGroup *_lookupGroup(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
static inline IrrGroup *_lookupGroupAndBufferIfNotThere(CkCoreState *ck,envelope *env,const CkGroupID &groupID)
  {
  
        CmiImmediateLock(CkpvAccess(_groupTableImmLock));
@@@ -928,7 -932,7 +932,7 @@@ static inline void _deliverForBocMsg(Ck
  static inline void _processForBocMsg(CkCoreState *ck,envelope *env)
  {
    register CkGroupID groupID =  env->getGroupNum();
-   register IrrGroup *obj = _lookupGroup(ck,env,env->getGroupNum());
+   register IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,env->getGroupNum());
    if(obj) {
      _deliverForBocMsg(ck,env->getEpIdx(),env,obj);
    }
@@@ -980,6 -984,13 +984,13 @@@ void _processBocInitMsg(CkCoreState *ck
  {
    register CkGroupID groupID = env->getGroupNum();
    register int epIdx = env->getEpIdx();
+   if (!env->getGroupDep().isZero()) {      // dependence
+     CkGroupID dep = env->getGroupDep();
+     IrrGroup *obj = _lookupGroupAndBufferIfNotThere(ck,env,dep);
+     if (obj == NULL) return;
+   }
+   else
+     ck->process();
    CkCreateLocalGroup(groupID, epIdx, env);
  }
  
@@@ -993,14 -1004,14 +1004,14 @@@ void _processNodeBocInitMsg(CkCoreStat
  /************** Receive: Arrays *************/
  
  static void _processArrayEltInitMsg(CkCoreState *ck,envelope *env) {
-   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
+   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
    if (mgr) {
      _SET_USED(env, 0);
      mgr->insertElement((CkMessage *)EnvToUsr(env));
    }
  }
  static void _processArrayEltMsg(CkCoreState *ck,envelope *env) {
-   CkArray *mgr=(CkArray *)_lookupGroup(ck,env,env->getsetArrayMgr());
+   CkArray *mgr=(CkArray *)_lookupGroupAndBufferIfNotThere(ck,env,env->getsetArrayMgr());
    if (mgr) {
      _SET_USED(env, 0);
      mgr->getLocMgr()->deliverInline((CkMessage *)EnvToUsr(env));
@@@ -1050,7 -1061,9 +1061,9 @@@ void _processHandler(void *converseMsg,
  // Group support
      case BocInitMsg :
        TELLMSGTYPE(CkPrintf("proc[%d]: _processHandler with msg type: BocInitMsg\n", CkMyPe());)
-       ck->process(); if(env->isPacked()) CkUnpackMessage(&env);
+       // QD processing moved inside _processBocInitMsg because it is conditional
+       //ck->process(); 
+       if(env->isPacked()) CkUnpackMessage(&env);
        _processBocInitMsg(ck,env);
        break;
      case NodeBocInitMsg :
@@@ -1180,7 -1193,7 +1193,7 @@@ void CkUnpackMessage(envelope **pEnv
  // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
  // Thus these accellerated versions of the Cld calls.
  
 -int index_objectQHandler;
 +static int index_objectQHandler;
  int index_tokenHandler;
  static int index_skipCldHandler;
  
@@@ -1438,7 -1451,7 +1451,7 @@@ void CkSendMsgInline(int entryIndex, vo
    }
    else {
      //No way to inline a cross-processor message:
-     CkSendMsg(entryIndex,msg,pCid,opts&!CK_MSG_INLINE);
+     CkSendMsg(entryIndex, msg, pCid, opts & (~CK_MSG_INLINE));
    }
  }
  
@@@ -1551,7 -1564,7 +1564,7 @@@ void CkSendMsgBranchInline(int eIdx, vo
      }
    }
    //Can't inline-- send the usual way, clear CK_MSG_INLINE
-   CkSendMsgBranch(eIdx,msg,destPE,gID,opts&!CK_MSG_INLINE);
+   CkSendMsgBranch(eIdx, msg, destPE, gID, opts & (~CK_MSG_INLINE));
  }
  
  extern "C"
@@@ -1686,7 -1699,7 +1699,7 @@@ void CkSendMsgNodeBranchInline(int eIdx
      }
    }
    //Can't inline-- send the usual way
-   CkSendMsgNodeBranch(eIdx,msg,node,gID,opts&!CK_MSG_INLINE);
+   CkSendMsgNodeBranch(eIdx, msg, node, gID, opts & ~(CK_MSG_INLINE));
  }
  
  extern "C"
@@@ -1858,7 -1871,7 +1871,7 @@@ class CkMessageRecorder : public CkMess
  public:
    CkMessageRecorder(FILE *f_) { f=f_; }
    ~CkMessageRecorder() {
 -    fprintf(f,"-1 -1 -1");
 +    fprintf(f,"-1 -1 -1 ");
      fclose(f);
    }
  
@@@ -1875,10 -1888,6 +1888,10 @@@ private
      }
      return CmiTrue;
    }
 +  virtual int process(CthThreadToken *token,CkCoreState *ck) {
 +    fprintf(f, "%d %d %d\n",CkMyPe(), -2, token->serialNo);
 +    return 1;
 +  }
  };
  
  class CkMessageDetailRecorder : public CkMessageWatcher {
@@@ -1915,25 -1924,10 +1928,25 @@@ class CkMessageReplay : public CkMessag
        unsigned int crc1, crc2;
        /// Read the next message we need from the file:
        void getNext(void) {
 +        if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
 +        if (nextSize > 0) {
 +          // We are reading a regular message
 +          if (3!=fscanf(f,"%d%x%x", &nexttype,&crc1,&crc2)) {
 +            CkAbort("CkMessageReplay> Syntax error reading replay file");
 +          }
 +        } else if (nextSize == -2) {
 +          // We are reading a special message (right now only thread awaken)
 +          getNext(); // For now simply skip that
 +        } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
 +          CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
 +          CkAbort("CkMessageReplay> Unrecognized input");
 +        }
 +          /*
                if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
 -                      // CkAbort("CkMessageReplay> Syntax error reading replay file");
 +                      CkAbort("CkMessageReplay> Syntax error reading replay file");
                        nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
                }
 +              */
                counter++;
        }
        /// If this is the next message we need, advance and return CmiTrue.
                        envelope *env=delayed.deq();
                        if (isNext(env)) { /* this is the next message: process it */
                                REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
 -                              CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
 +                              //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
 +                              CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
                                return;
                        }
                        else /* Not ready yet-- put it back in the
@@@ -2022,56 -2015,12 +2035,56 @@@ private
                        return CmiFalse;
                }
        }
 +      virtual int process(CthThreadToken *token, CkCoreState *ck) {
 +        return 1;
 +      }
 +};
 +
 +class CkMessageDetailReplay : public CkMessageWatcher {
 +  void *getNext() {
 +    CmiUInt4 size; size_t nread;
 +    if ((nread=fread(&size, 4, 1, f)) < 1) {
 +      if (feof(f)) return NULL;
 +      CkPrintf("Broken record file (metadata) got %d\n",nread);
 +      CkAbort("");
 +    }
 +    void *env = CmiAlloc(size);
 +    if ((nread=fread(env, size, 1, f)) < 1) {
 +      CkPrintf("Broken record file (data) expecting %d, got %d\n",size,nread);
 +      CkAbort("");
 +    }
 +    return env;
 +  }
 +public:
 +  CkMessageDetailReplay(FILE *f_) {
 +    f=f_;
 +    /* This must match what CkMessageDetailRecorder did */
 +    CmiUInt2 little;
 +    fread(&little, 2, 1, f);
 +    if (little != sizeof(void*)) {
 +      CkAbort("Replaying on a different architecture from which recording was done!");
 +    }
 +
 +    CmiPushPE(CkMyPe(), getNext());
 +  }
 +  virtual CmiBool process(envelope *env,CkCoreState *ck) {
 +    CmiPushPE(CkMyPe(), getNext());
 +    return CmiTrue;
 +  }
  };
  
  extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
    CkPrintf("[%d] Quiescence detected\n",CkMyPe());
    CkMessageReplay *replay = (CkMessageReplay*)rep;
 -  
 +  //CmiStartQD(CkMessageReplayQuiescence, replay);
 +}
 +
 +extern "C" int CmiExecuteThreadResume(CthThreadToken *token) {
 +  CkCoreState *ck = CkpvAccess(_coreState);
 +  if (ck->watcher!=NULL) {
 +    return ck->watcher->processThread(token,ck);
 +  }
 +  return 1;
  }
  
  #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
@@@ -2095,8 -2044,7 +2108,8 @@@ static FILE *openReplayFile(const char 
  #include "ckliststring.h"
  void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
      char *procs = NULL;
 -      REPLAYDEBUG("CkMessageWaterInit ");
 +    replaySystem = 0;
 +      REPLAYDEBUG("CkMessageWatcherInit ");
      if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
          CkListString list(procs);
          if (list.includes(CkMyPe())) {
            CpdSetInitializeMemory(1);
                ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
        }
 -      if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream")) {
 -          CpdSetInitializeMemory(1);
 -              ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
 -      }
 -      if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Re-play the specified processors from recorded message content")) {
 +      if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
            CpdSetInitializeMemory(1);
 -        /*Nothing yet*/
 +          // Set the parameters of the processor
 +#if CMK_SHARED_VARS_UNAVAILABLE
 +          _Cmi_mype = atoi(procs);
 +          while (procs[0]!='/') procs++;
 +          procs++;
 +          _Cmi_numpes = atoi(procs);
 +#else
 +          CkAbort("+replay-detail available only for non-SMP build");
 +#endif
 +          replaySystem = 1;
 +          ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
        }
 +    if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream")) {
 +        CpdSetInitializeMemory(1);
 +        ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
 +    }
  }
  
  extern "C"
diff --combined src/ck-core/envelope.h
index d2251077cd18a0207677bfc2ada5a06ed58749b8,21e74c045e60450044620051f5236a4a483caf6a..5e0026f7bbdda856c705346160c8cad8137ae1dc
@@@ -155,6 -155,7 +155,7 @@@ public
        struct s_group {         // NodeBocInitMsg, BocInitMsg, ForNodeBocMsg, ForBocMsg
          CkGroupID g;           ///< GroupID
          CkNodeGroupID rednMgr; ///< Reduction manager for this group (constructor only!)
+         CkGroupID dep;         ///< create after dep is created (constructor only!)
          int epoch;             ///< "epoch" this group was created during (0--mainchare, 1--later)
          UShort arrayEp;        ///< Used only for array broadcasts
        } group;
@@@ -247,6 -248,7 +248,7 @@@ private
        env->totalsize = tsize;
        env->priobits = prio;
        env->setPacked(0);
+       env->type.group.dep.setZero();
        _SET_USED(env, 0);
        //for record-replay
        env->setEvent(++CkpvAccess(envelopeEventID));
  
        return env;
      }
 +    void reset() {
 +      setEvent(++CkpvAccess(envelopeEventID));
 +    }
      UShort getEpIdx(void) const { return epIdx; }
      void   setEpIdx(const UShort idx) { epIdx = idx; }
      UInt   getSrcPe(void) const { return pe; }
      int getGroupEpoch(void) { return type.group.epoch; }
      void setRednMgr(CkNodeGroupID r){ type.group.rednMgr = r; }
      CkNodeGroupID getRednMgr(){ return type.group.rednMgr; }
+     CkGroupID getGroupDep(){ return type.group.dep; }
+     void setGroupDep(const CkGroupID &r){ type.group.dep = r; }
  
  // Array-specific fields
      CkGroupID &getsetArrayMgr(void) {return type.array.arr;}
@@@ -371,10 -372,6 +375,10 @@@ inline void *_allocMsg(const int msgtyp
    return EnvToUsr(envelope::alloc(msgtype,size,prio));
  }
  
 +inline void _resetEnv(envelope *env) {
 +  env->reset();
 +}
 +
  /** @} */
  
  extern UChar   _defaultQueueing;
@@@ -391,12 -388,8 +395,12 @@@ private
        env->setMsgIdx(0);
        return EnvToUsr(env);
      }
 +    static void _reset(void* m) {
 +      register envelope *env = UsrToEnv(m);
 +      _resetEnv(env);
 +    }
  public:
 -    MsgPool():SafePool<void*>(_alloc, CkFreeMsg) {}
 +    MsgPool():SafePool<void*>(_alloc, CkFreeMsg, _reset) {}
  #ifdef _FAULT_MLOG_
          void *get(void){
              return allocfn();
diff --combined src/ck-core/init.C
index c44a5dbe9ea172a72c657004ea25c86e213ea44a,695e3410d11ab777e47b317604b9b33932c66c52..e587c1d2400e0c3b35e2cd6ee7b2e65cbe7a5e3f
@@@ -527,9 -527,9 +527,9 @@@ static inline void _processBufferedMsgs
        if(env->isForAnyPE())
          CldEnqueue(CLD_ANYWHERE, env, _infoIdx);
        else
-         CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
+         _processHandler((void *)env, CkpvAccess(_coreState));
      } else {
-       CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
+       _processHandler((void *)env, CkpvAccess(_coreState));
      }
    }
  }
@@@ -650,20 -650,16 +650,21 @@@ static void _roRestartHandler(void *msg
   * together with all the other regular messages by _bufferHandler (and will be flushed
   * after all the initialization messages have been processed).
   */
 -static void _initHandler(void *msg)
 +static void _initHandler(void *msg, CkCoreState *ck)
  {
    CkAssert(CkMyPe()!=0);
    register envelope *env = (envelope *) msg;
 +  
 +  if (ck->watcher!=NULL) {
 +    if (!ck->watcher->processMessage(env,ck)) return;
 +  }
 +  
    switch (env->getMsgtype()) {
      case BocInitMsg:
        if (env->getGroupEpoch()==0) {
          CkpvAccess(_numInitsRecd)++;
-         CpvAccess(_qd)->process();
+       // _processBocInitMsg already handles QD
+         //CpvAccess(_qd)->process();
          CkpvAccess(_bocInitVec)->insert(env->getGroupNum().idx, env);
        } else _bufferHandler(msg);
        break;
@@@ -920,11 -916,9 +921,11 @@@ void _initCharm(int unused_argc, char *
  
        _charmHandlerIdx = CkRegisterHandler((CmiHandler)_bufferHandler);
        _initHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
 +      CkNumberHandlerEx(_initHandlerIdx, (CmiHandlerEx)_initHandler, CkpvAccess(_coreState));
        _roRestartHandlerIdx = CkRegisterHandler((CmiHandler)_roRestartHandler);
        _exitHandlerIdx = CkRegisterHandler((CmiHandler)_exitHandler);
        _bocHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
 +      CkNumberHandlerEx(_bocHandlerIdx, (CmiHandlerEx)_initHandler, CkpvAccess(_coreState));
        _infoIdx = CldRegisterInfoFn((CldInfoFn)_infoFn);
        _triggerHandlerIdx = CkRegisterHandler((CmiHandler)_triggerHandler);
        _ckModuleInit();
                }*/
        }       
        
 -        if (faultFunc == NULL) {         // this is not restart
 +        if (faultFunc == NULL && !replaySystem) {         // this is not restart
              // these two are blocking calls for non-bigsim
  #if ! CMK_BLUEGENE_CHARM
            CmiInitCPUAffinity(argv);