Merge branch 'charm' into development
authorFilippo Gioachin <gioachin@uiuc.edu>
Fri, 5 Feb 2010 02:07:01 +0000 (20:07 -0600)
committerFilippo Gioachin <gioachin@uiuc.edu>
Fri, 5 Feb 2010 02:07:01 +0000 (20:07 -0600)
20 files changed:
src/arch/bluegenel/bglmachine.C
src/arch/bluegenel/machine.c
src/arch/bluegenep/machine.c
src/arch/elan/machine.c
src/arch/portals-crayxt3/machine.c
src/ck-core/ck.C
src/ck-core/ck.h
src/ck-core/ckcheckpoint.C
src/ck-core/ckcheckpoint.h
src/ck-core/ckobjQ.C
src/ck-core/ckobjQ.h
src/ck-core/envelope.h
src/ck-core/init.C
src/ck-core/middle-conv.h
src/ck-perf/trace-converse.c
src/conv-core/convcore.c
src/conv-core/converse.h
src/conv-core/debug-conv.c
src/conv-core/threads.c
src/util/cklists.h

index 6f98d13788954289ff0643c750586a24e124dc03..4de1adf4f3b8dda13e8f751121f65542c4cc8032 100644 (file)
@@ -155,7 +155,7 @@ static void CmiStartThreads(char **argv)
 int received_immediate = 0;
 
 /*Add a message to this processor's receive queue, pe is a rank */
-static void CmiPushPE(int pe,void *msg)
+void CmiPushPE(int pe,void *msg)
 {
   CmiState cs = CmiGetStateN(pe);
   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
index fd50117c5456643de3ba1e47d60ad10f3bd6887c..e92b163922d2567fd83ba9b768bc6e28e7be2194 100644 (file)
@@ -351,7 +351,7 @@ static void CmiStartThreads(char **argv)
 #endif /* non smp */
 
 /*Add a message to this processor's receive queue, pe is a rank */
-static void CmiPushPE(int pe,void *msg)
+void CmiPushPE(int pe,void *msg)
 {
   CmiState cs = CmiGetStateN(pe);
   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
index 26f4ec466ba9515a4477573348f4396342843ae3..d42eafd6a26531c85f56d1fb6cdc89ce3ad0a0b5 100644 (file)
@@ -165,7 +165,7 @@ static void CmiStartThreads(char **argv) {
 //int received_broadcast;
 
 /*Add a message to this processor's receive queue, pe is a rank */
-static void CmiPushPE(int pe,void *msg) {
+void CmiPushPE(int pe,void *msg) {
     CmiState cs = CmiGetStateN(pe);
     MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
 #if CMK_IMMEDIATE_MSG
index 28a581850aaf091d1319b6bcb363286a2b1b1a65..d115164ed60ee128aca4ad7113f3b2f177ce32ea 100644 (file)
@@ -270,7 +270,7 @@ static void CmiStartThreads(char **argv)
 }      
 
 /*Add a message to this processor's receive queue */
-static void CmiPushPE(int pe,void *msg)
+void CmiPushPE(int pe,void *msg)
 {
   CmiState cs=CmiGetStateN(pe);
   MACHSTATE1(2,"Pushing message into %d's queue",pe);
index 86a3bf5b170c0bd7102a1d4b99067093827769ee..7154ac75624b067bef0d40329595cf0c32f33c44 100644 (file)
@@ -254,7 +254,7 @@ static void CmiStartThreads(char **argv)
 #endif /* non smp */
 
 /*Add a message to this processor's receive queue, pe is a rank */
-static void CmiPushPE(int pe,void *msg)
+void CmiPushPE(int pe,void *msg)
 {
   CmiState cs = CmiGetStateN(pe);
   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
index 7afcef5b82dfc24bddb4c670108f4aa11cc568a0..9304fe9c7547bf0538146733d58c9b8ec1de38fe 100644 (file)
@@ -1193,7 +1193,7 @@ void CkUnpackMessage(envelope **pEnv)
 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
 // Thus these accellerated versions of the Cld calls.
 
-int index_objectQHandler;
+static int index_objectQHandler;
 int index_tokenHandler;
 static int index_skipCldHandler;
 
@@ -1871,7 +1871,7 @@ class CkMessageRecorder : public CkMessageWatcher {
 public:
   CkMessageRecorder(FILE *f_) { f=f_; }
   ~CkMessageRecorder() {
-    fprintf(f,"-1 -1 -1");
+    fprintf(f,"-1 -1 -1 ");
     fclose(f);
   }
 
@@ -1888,6 +1888,10 @@ private:
     }
     return CmiTrue;
   }
+  virtual int process(CthThreadToken *token,CkCoreState *ck) {
+    fprintf(f, "%d %d %d\n",CkMyPe(), -2, token->serialNo);
+    return 1;
+  }
 };
 
 class CkMessageDetailRecorder : public CkMessageWatcher {
@@ -1924,10 +1928,25 @@ class CkMessageReplay : public CkMessageWatcher {
        unsigned int crc1, crc2;
        /// Read the next message we need from the file:
        void getNext(void) {
+         if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
+         if (nextSize > 0) {
+           // We are reading a regular message
+           if (3!=fscanf(f,"%d%x%x", &nexttype,&crc1,&crc2)) {
+             CkAbort("CkMessageReplay> Syntax error reading replay file");
+           }
+         } else if (nextSize == -2) {
+           // We are reading a special message (right now only thread awaken)
+           getNext(); // For now simply skip that
+         } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
+           CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
+           CkAbort("CkMessageReplay> Unrecognized input");
+         }
+           /*
                if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
-                       // CkAbort("CkMessageReplay> Syntax error reading replay file");
+                       CkAbort("CkMessageReplay> Syntax error reading replay file");
                        nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
                }
+               */
                counter++;
        }
        /// If this is the next message we need, advance and return CmiTrue.
@@ -1964,7 +1983,8 @@ class CkMessageReplay : public CkMessageWatcher {
                        envelope *env=delayed.deq();
                        if (isNext(env)) { /* this is the next message: process it */
                                REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
-                               CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
+                               //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
+                               CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
                                return;
                        }
                        else /* Not ready yet-- put it back in the
@@ -2015,12 +2035,56 @@ private:
                        return CmiFalse;
                }
        }
+       virtual int process(CthThreadToken *token, CkCoreState *ck) {
+         return 1;
+       }
+};
+
+class CkMessageDetailReplay : public CkMessageWatcher {
+  void *getNext() {
+    CmiUInt4 size; size_t nread;
+    if ((nread=fread(&size, 4, 1, f)) < 1) {
+      if (feof(f)) return NULL;
+      CkPrintf("Broken record file (metadata) got %d\n",nread);
+      CkAbort("");
+    }
+    void *env = CmiAlloc(size);
+    if ((nread=fread(env, size, 1, f)) < 1) {
+      CkPrintf("Broken record file (data) expecting %d, got %d\n",size,nread);
+      CkAbort("");
+    }
+    return env;
+  }
+public:
+  CkMessageDetailReplay(FILE *f_) {
+    f=f_;
+    /* This must match what CkMessageDetailRecorder did */
+    CmiUInt2 little;
+    fread(&little, 2, 1, f);
+    if (little != sizeof(void*)) {
+      CkAbort("Replaying on a different architecture from which recording was done!");
+    }
+
+    CmiPushPE(CkMyPe(), getNext());
+  }
+  virtual CmiBool process(envelope *env,CkCoreState *ck) {
+    CmiPushPE(CkMyPe(), getNext());
+    return CmiTrue;
+  }
 };
 
 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
   CkMessageReplay *replay = (CkMessageReplay*)rep;
-  
+  //CmiStartQD(CkMessageReplayQuiescence, replay);
+}
+
+extern "C" int CmiExecuteThreadResume(CthThreadToken *token) {
+  CkCoreState *ck = CkpvAccess(_coreState);
+  if (ck->watcher!=NULL) {
+    return ck->watcher->processThread(token,ck);
+  }
+  return 1;
 }
 
 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
@@ -2044,7 +2108,8 @@ static FILE *openReplayFile(const char *prefix, const char *suffix, const char *
 #include "ckliststring.h"
 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
     char *procs = NULL;
-       REPLAYDEBUG("CkMessageWaterInit ");
+    replaySystem = 0;
+       REPLAYDEBUG("CkMessageWatcherInit ");
     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
         CkListString list(procs);
         if (list.includes(CkMyPe())) {
@@ -2056,14 +2121,24 @@ void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
            CpdSetInitializeMemory(1);
                ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
        }
-       if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream")) {
-           CpdSetInitializeMemory(1);
-               ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
-       }
-       if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Re-play the specified processors from recorded message content")) {
+       if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
            CpdSetInitializeMemory(1);
-         /*Nothing yet*/
+           // Set the parameters of the processor
+#if CMK_SHARED_VARS_UNAVAILABLE
+           _Cmi_mype = atoi(procs);
+           while (procs[0]!='/') procs++;
+           procs++;
+           _Cmi_numpes = atoi(procs);
+#else
+           CkAbort("+replay-detail available only for non-SMP build");
+#endif
+           replaySystem = 1;
+           ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
        }
+    if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream")) {
+        CpdSetInitializeMemory(1);
+        ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
+    }
 }
 
 extern "C"
index 0fdb3cc197cf1e9bcbfd3109eafd67cd1e41de17..d412c8dc4532e328e87dba407ab754a58eb35987 100644 (file)
@@ -25,6 +25,9 @@
 #define _CHECK_VALID(p, msg) do { } while(0)
 #endif
 
+// Flag that tells the system if we are replaying using Record/Replay
+extern int replaySystem;
+
 /// A set of "Virtual ChareID"'s
 class VidBlock {
     enum VidState {FILLED, UNFILLED};
@@ -92,12 +95,21 @@ public:
         * up to the outermost
         */
        inline CmiBool processMessage(envelope *env,CkCoreState *ck) {
-         if (next != NULL) next->processMessage(env, ck);
-         return process(env, ck);
+         CmiBool result = CmiTrue;
+         if (next != NULL) result &= next->processMessage(env, ck);
+         result &= process(env, ck);
+         return result;
+       }
+       inline int processThread(CthThreadToken *token, CkCoreState *ck) {
+          int result = 1;
+          if (next != NULL) result &= next->processThread(token, ck);
+          result &= process(token, ck);
+          return result;
        }
 protected:
-    /** This is used internally by this class to call the correct subclass method */
+    /** These are used internally by this class to call the correct subclass method */
        virtual CmiBool process(envelope *env,CkCoreState *ck) =0;
+       virtual int process(CthThreadToken *token, CkCoreState *ck) {return 1;}
 public:
     inline void setNext(CkMessageWatcher *w) { next = w; }
 };
index c2b6cc4ce6bbcdaf00132ce330de11225b8a577b..e789ccd8dbc911b590736896570ba6524dcebb83 100644 (file)
@@ -431,33 +431,6 @@ int  CkCountArrayElements(){
 }
 #endif
 
-void CkPupProcessorData(PUP::er &p)
-{
-    // save readonlys, and callback BTW
-    if(CkMyRank()==0) {
-        CkPupROData(p);
-    }
-
-    // save mainchares into MainChares.dat
-    if(CkMyPe()==0) {
-      CkPupMainChareData(p, NULL);
-    }
-       
-    // save non-migratable chare
-    CkPupChareData(p);
-
-    // save groups 
-    CkPupGroupData(p);
-
-    // save nodegroups
-    if(CkMyRank()==0) {
-        CkPupNodeGroupData(p);
-    }
-
-    // pup array elements
-    CkPupArrayElementsData(p);
-}
-
 // called only on pe 0
 static void checkpointOne(const char* dirname, CkCallback& cb){
        CmiAssert(CkMyPe()==0);
index a46fbff0ed93ff858d5bad68ca852b131486d397..f290fe9715afafefc4d66a51efffc73b369f1c67 100644 (file)
@@ -63,7 +63,6 @@ void CkPupChareData(PUP::er &p);
 void CkPupGroupData(PUP::er &p);
 void CkPupNodeGroupData(PUP::er &p);
 void CkPupArrayElementsData(PUP::er &p, int notifyListeners=1);
-void CkPupProcessorData(PUP::er &p);
 void CkRemoveArrayElements();
 //void CkTestArrayElements();
 
index 0ff5d04b1cc7d9a1c011c208fccdc38bbc818090..a9efa7a163bb4c3de0193fb2a8f08c5361d1fd5b 100644 (file)
@@ -22,7 +22,6 @@
 CkpvDeclare(TokenPool*, _tokenPool);
 
 extern int index_tokenHandler;
-extern int index_skipCldHandler;
 
 extern CkMigratable * CkArrayMessageObjectPtr(envelope *env);
 
index 7d9b5575c86f64994a6c7bf8fda1c5c61721c2a1..1960e4c019db4a15ed7f2cead8ed16f8bc7cc87a 100644 (file)
@@ -40,9 +40,6 @@ public:
 
 CkpvExtern(TokenPool*, _tokenPool);
 
-extern int index_objectQHandler;
-extern int index_tokenHandler;
-
 Chare * CkFindObjectPtr(envelope *);
 void _enqObjQueue(Chare *obj, envelope *env);
 void _ObjectQHandler(void *converseMsg);
index 21e74c045e60450044620051f5236a4a483caf6a..5e0026f7bbdda856c705346160c8cad8137ae1dc 100644 (file)
@@ -268,6 +268,9 @@ private:
 
       return env;
     }
+    void reset() {
+      setEvent(++CkpvAccess(envelopeEventID));
+    }
     UShort getEpIdx(void) const { return epIdx; }
     void   setEpIdx(const UShort idx) { epIdx = idx; }
     UInt   getSrcPe(void) const { return pe; }
@@ -372,6 +375,10 @@ inline void *_allocMsg(const int msgtype, const int size, const int prio=0) {
   return EnvToUsr(envelope::alloc(msgtype,size,prio));
 }
 
+inline void _resetEnv(envelope *env) {
+  env->reset();
+}
+
 /** @} */
 
 extern UChar   _defaultQueueing;
@@ -388,8 +395,12 @@ private:
       env->setMsgIdx(0);
       return EnvToUsr(env);
     }
+    static void _reset(void* m) {
+      register envelope *env = UsrToEnv(m);
+      _resetEnv(env);
+    }
 public:
-    MsgPool():SafePool<void*>(_alloc, CkFreeMsg) {}
+    MsgPool():SafePool<void*>(_alloc, CkFreeMsg, _reset) {}
 #ifdef _FAULT_MLOG_
         void *get(void){
             return allocfn();
index 695e3410d11ab777e47b317604b9b33932c66c52..e587c1d2400e0c3b35e2cd6ee7b2e65cbe7a5e3f 100644 (file)
@@ -650,10 +650,15 @@ static void _roRestartHandler(void *msg)
  * together with all the other regular messages by _bufferHandler (and will be flushed
  * after all the initialization messages have been processed).
  */
-static void _initHandler(void *msg)
+static void _initHandler(void *msg, CkCoreState *ck)
 {
   CkAssert(CkMyPe()!=0);
   register envelope *env = (envelope *) msg;
+  
+  if (ck->watcher!=NULL) {
+    if (!ck->watcher->processMessage(env,ck)) return;
+  }
+  
   switch (env->getMsgtype()) {
     case BocInitMsg:
       if (env->getGroupEpoch()==0) {
@@ -916,9 +921,11 @@ void _initCharm(int unused_argc, char **argv)
 
        _charmHandlerIdx = CkRegisterHandler((CmiHandler)_bufferHandler);
        _initHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
+       CkNumberHandlerEx(_initHandlerIdx, (CmiHandlerEx)_initHandler, CkpvAccess(_coreState));
        _roRestartHandlerIdx = CkRegisterHandler((CmiHandler)_roRestartHandler);
        _exitHandlerIdx = CkRegisterHandler((CmiHandler)_exitHandler);
        _bocHandlerIdx = CkRegisterHandler((CmiHandler)_initHandler);
+       CkNumberHandlerEx(_bocHandlerIdx, (CmiHandlerEx)_initHandler, CkpvAccess(_coreState));
        _infoIdx = CldRegisterInfoFn((CldInfoFn)_infoFn);
        _triggerHandlerIdx = CkRegisterHandler((CmiHandler)_triggerHandler);
        _ckModuleInit();
@@ -1089,7 +1096,7 @@ void _initCharm(int unused_argc, char **argv)
                }*/
        }       
        
-        if (faultFunc == NULL) {         // this is not restart
+        if (faultFunc == NULL && !replaySystem) {         // this is not restart
             // these two are blocking calls for non-bigsim
 #if ! CMK_BLUEGENE_CHARM
           CmiInitCPUAffinity(argv);
index ea027d3084b94d2e8a3f950f5af6fa6237d70fb5..082b49c9a38716a13804081f2f796e5b9097ef35 100644 (file)
@@ -47,27 +47,27 @@ static inline int CkNumPes() { return CmiNumPes(); }
 
 static inline void CmiSyncSend(int x, int y, char *z) 
 {
-  CmiSyncSendFn(x, y, z);
+  if (ConverseDeliver()) CmiSyncSendFn(x, y, z);
 }
 static inline void CmiSyncSendAndFree(int x, int y, char *z)
 {
-  CmiFreeSendFn(x, y, z);
+  if (ConverseDeliver()) CmiFreeSendFn(x, y, z);
 }
 static inline void CmiSyncBroadcast(int x, char *y)
 {
-  CmiSyncBroadcastFn(x, y);
+  if (ConverseDeliver()) CmiSyncBroadcastFn(x, y);
 }
 static inline void CmiSyncBroadcastAndFree(int x, char *y)
 {
-  CmiFreeBroadcastFn(x, y);
+  if (ConverseDeliver()) CmiFreeBroadcastFn(x, y);
 }
 static inline void CmiSyncBroadcastAll(int x, char *y)
 {
-  CmiSyncBroadcastAllFn(x, y);
+  if (ConverseDeliver()) CmiSyncBroadcastAllFn(x, y);
 }
 static inline void CmiSyncBroadcastAllAndFree(int x, char *y)
 {
-  CmiFreeBroadcastAllFn(x, y);
+  if (ConverseDeliver()) CmiFreeBroadcastAllFn(x, y);
 }
 
 #if 0
index 20facf9b5cbbcc75a39e07e3d9569626923ed71e..25c520058aacc9a3221a23f4800f91f5e8a78b94 100644 (file)
@@ -49,5 +49,6 @@ void traceEndFuncProj(char *name){}
 void traceUserSuppliedNote(char *note) {}
 
 /* This routine, included in Charm++ programs from init.C, needs to be present in converse as well.
-   Here is a place where it gets included only in converse, and not in Charm++ (thus not generating conflicts. */
+   Here is a place where it gets included only in converse, and not in Charm++ (thus not generating conflicts). */
 void EmergencyExit(void) {}
+int CmiExecuteThreadResume(CthThreadToken *token) {}
index 21122161e67a26a57bef5c4a1fd4c839b03f49bb..e90ec004c4d95deaea0ec0d8fefdbfcb40103f59 100644 (file)
@@ -1716,12 +1716,16 @@ void CthResumeNormalThread(CthThreadToken* token)
                resumeTraceCore();*/
 #endif
 #endif
-
+  
   /* BIGSIM_OOC DEBUGGING
   CmiPrintf("In CthResumeNormalThread:   ");
   CthPrintThdMagic(t);
   */
-  CthResume(t);
+
+  /* For Record/Replay debugging: need to notify the upper layer that we are resuming a thread */
+  if (CmiExecuteThreadResume(token)) {
+    CthResume(t);
+  }
 }
 
 void CthResumeSchedulingThread(CthThreadToken  *token)
index a0e07824f37aa3a6858b6fd38f0a664e3016f344..b3ae641b23436ee0fb77c12976453f9ca49d28d2 100644 (file)
@@ -737,46 +737,46 @@ int      CmiTimerIsSynchronized();
 
 #define CsdNodeEnqueueGeneral(x,s,i,p) do { \
           CmiLock(CsvAccess(CsdNodeQueueLock));\
-          CqsEnqueueGeneral(CsvAccess(CsdNodeQueue),(x),(s),(i),(p)); \
+          CqsEnqueueGeneral((Queue)CsvAccess(CsdNodeQueue),(x),(s),(i),(p)); \
           CmiUnlock(CsvAccess(CsdNodeQueueLock)); \
         } while(0)
 #define CsdNodeEnqueueFifo(x)     do { \
           CmiLock(CsvAccess(CsdNodeQueueLock));\
-          CqsEnqueueFifo(CsvAccess(CsdNodeQueue),(x)); \
+          CqsEnqueueFifo((Queue)CsvAccess(CsdNodeQueue),(x)); \
           CmiUnlock(CsvAccess(CsdNodeQueueLock)); \
         } while(0)
 #define CsdNodeEnqueueLifo(x)     do { \
           CmiLock(CsvAccess(CsdNodeQueueLock));\
-          CqsEnqueueLifo(CsvAccess(CsdNodeQueue),(x))); \
+          CqsEnqueueLifo((Queue)CsvAccess(CsdNodeQueue),(x))); \
           CmiUnlock(CsvAccess(CsdNodeQueueLock)); \
         } while(0)
 #define CsdNodeEnqueue(x)     do { \
           CmiLock(CsvAccess(CsdNodeQueueLock));\
-          CqsEnqueueFifo(CsvAccess(CsdNodeQueue),(x));\
+          CqsEnqueueFifo((Queue)CsvAccess(CsdNodeQueue),(x));\
           CmiUnlock(CsvAccess(CsdNodeQueueLock)); \
         } while(0)
 
-#define CsdNodeEmpty()            (CqsEmpty(CpvAccess(CsdNodeQueue)))
-#define CsdNodeLength()           (CqsLength(CpvAccess(CsdNodeQueue)))
+#define CsdNodeEmpty()            (CqsEmpty(C(Queue)pvAccess(CsdNodeQueue)))
+#define CsdNodeLength()           (CqsLength((Queue)CpvAccess(CsdNodeQueue)))
 
 #else
 
 #define CsdNodeEnqueueGeneral(x,s,i,p) (CsdEnqueueGeneral(x,s,i,p))
-#define CsdNodeEnqueueFifo(x) (CqsEnqueueFifo(CpvAccess(CsdSchedQueue),(x)))
-#define CsdNodeEnqueueLifo(x) (CqsEnqueueLifo(CpvAccess(CsdSchedQueue),(x)))
+#define CsdNodeEnqueueFifo(x) (CqsEnqueueFifo((Queue)CpvAccess(CsdSchedQueue),(x)))
+#define CsdNodeEnqueueLifo(x) (CqsEnqueueLifo((Queue)CpvAccess(CsdSchedQueue),(x)))
 #define CsdNodeEnqueue(x)     (CsdEnqueue(x))
-#define CsdNodeEmpty()        (CqsEmpty(CpvAccess(CsdSchedQueue)))
-#define CsdNodeLength()       (CqsLength(CpvAccess(CsdSchedQueue)))
+#define CsdNodeEmpty()        (CqsEmpty((Queue)CpvAccess(CsdSchedQueue)))
+#define CsdNodeLength()       (CqsLength((Queue)CpvAccess(CsdSchedQueue)))
 
 #endif
 
 #define CsdEnqueueGeneral(x,s,i,p)\
-    (CqsEnqueueGeneral(CpvAccess(CsdSchedQueue),(x),(s),(i),(p)))
-#define CsdEnqueueFifo(x)     (CqsEnqueueFifo(CpvAccess(CsdSchedQueue),(x)))
-#define CsdEnqueueLifo(x)     (CqsEnqueueLifo(CpvAccess(CsdSchedQueue),(x)))
-#define CsdEnqueue(x)         (CqsEnqueueFifo(CpvAccess(CsdSchedQueue),(x)))
-#define CsdEmpty()            (CqsEmpty(CpvAccess(CsdSchedQueue)))
-#define CsdLength()           (CqsLength(CpvAccess(CsdSchedQueue)))
+    (CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),(x),(s),(i),(p)))
+#define CsdEnqueueFifo(x)     (CqsEnqueueFifo((Queue)CpvAccess(CsdSchedQueue),(x)))
+#define CsdEnqueueLifo(x)     (CqsEnqueueLifo((Queue)CpvAccess(CsdSchedQueue),(x)))
+#define CsdEnqueue(x)         (CqsEnqueueFifo((Queue)CpvAccess(CsdSchedQueue),(x)))
+#define CsdEmpty()            (CqsEmpty((Queue)CpvAccess(CsdSchedQueue)))
+#define CsdLength()           (CqsLength((Queue)CpvAccess(CsdSchedQueue)))
 
 #if CMK_CMIPRINTF_IS_A_BUILTIN /* these are implemented in machine.c */
 void  CmiPrintf(const char *, ...);
@@ -962,6 +962,8 @@ void     CmiLookupGroup(CmiGroup grp, int *npes, int **pes);
   }\
 }
 
+void CmiPushPE(int, void*);
+
 void          CmiSyncSendFn(int, int, char *);
 CmiCommHandle CmiAsyncSendFn(int, int, char *);
 void          CmiFreeSendFn(int, int, char *);
@@ -1049,25 +1051,31 @@ void          CmiMultipleIsend(unsigned int, int, int *, char **);
 int           CmiAsyncMsgSent(CmiCommHandle);
 void          CmiReleaseCommHandle(CmiCommHandle);
 
-#define CmiSyncSend(p,s,m)              (CmiSyncSendFn((p),(s),(char *)(m)))
-#define CmiAsyncSend(p,s,m)             (CmiAsyncSendFn((p),(s),(char *)(m)))
-#define CmiSyncSendAndFree(p,s,m)       (CmiFreeSendFn((p),(s),(char *)(m)))
+#ifdef CMK_OPTIMIZE
+#define ConverseDeliver()   1
+#else
+int ConverseDeliver();
+#endif
+
+#define CmiSyncSend(p,s,m)              if (ConverseDeliver()) (CmiSyncSendFn((p),(s),(char *)(m)))
+#define CmiAsyncSend(p,s,m)             if (ConverseDeliver()) (CmiAsyncSendFn((p),(s),(char *)(m)))
+#define CmiSyncSendAndFree(p,s,m)       if (ConverseDeliver()) (CmiFreeSendFn((p),(s),(char *)(m)))
 
-#define CmiSyncBroadcast(s,m)           (CmiSyncBroadcastFn((s),(char *)(m)))
-#define CmiAsyncBroadcast(s,m)          (CmiAsyncBroadcastFn((s),(char *)(m)))
-#define CmiSyncBroadcastAndFree(s,m)    (CmiFreeBroadcastFn((s),(char *)(m)))
+#define CmiSyncBroadcast(s,m)           if (ConverseDeliver()) (CmiSyncBroadcastFn((s),(char *)(m)))
+#define CmiAsyncBroadcast(s,m)          if (ConverseDeliver()) (CmiAsyncBroadcastFn((s),(char *)(m)))
+#define CmiSyncBroadcastAndFree(s,m)    if (ConverseDeliver()) (CmiFreeBroadcastFn((s),(char *)(m)))
 
-#define CmiSyncBroadcastAll(s,m)        (CmiSyncBroadcastAllFn((s),(char *)(m)))
-#define CmiAsyncBroadcastAll(s,m)       (CmiAsyncBroadcastAllFn((s),(char *)(m)))
-#define CmiSyncBroadcastAllAndFree(s,m) (CmiFreeBroadcastAllFn((s),(char *)(m)))
+#define CmiSyncBroadcastAll(s,m)        if (ConverseDeliver()) (CmiSyncBroadcastAllFn((s),(char *)(m)))
+#define CmiAsyncBroadcastAll(s,m)       if (ConverseDeliver()) (CmiAsyncBroadcastAllFn((s),(char *)(m)))
+#define CmiSyncBroadcastAllAndFree(s,m) if (ConverseDeliver()) (CmiFreeBroadcastAllFn((s),(char *)(m)))
 
-#define CmiSyncListSend(n,l,s,m)        (CmiSyncListSendFn((n),(l),(s),(char *)(m)))
-#define CmiAsyncListSend(n,l,s,m)       (CmiAsyncListSendFn((n),(l),(s),(char *)(m)))
-#define CmiSyncListSendAndFree(n,l,s,m) (CmiFreeListSendFn((n),(l),(s),(char *)(m)))
+#define CmiSyncListSend(n,l,s,m)        if (ConverseDeliver()) (CmiSyncListSendFn((n),(l),(s),(char *)(m)))
+#define CmiAsyncListSend(n,l,s,m)       if (ConverseDeliver()) (CmiAsyncListSendFn((n),(l),(s),(char *)(m)))
+#define CmiSyncListSendAndFree(n,l,s,m) if (ConverseDeliver()) (CmiFreeListSendFn((n),(l),(s),(char *)(m)))
 
-#define CmiSyncMulticast(g,s,m)         (CmiSyncMulticastFn((g),(s),(char*)(m)))
-#define CmiAsyncMulticast(g,s,m)        (CmiAsyncMulticastFn((g),(s),(char*)(m)))
-#define CmiSyncMulticastAndFree(g,s,m)  (CmiFreeMulticastFn((g),(s),(char*)(m)))
+#define CmiSyncMulticast(g,s,m)         if (ConverseDeliver()) (CmiSyncMulticastFn((g),(s),(char*)(m)))
+#define CmiAsyncMulticast(g,s,m)        if (ConverseDeliver()) (CmiAsyncMulticastFn((g),(s),(char*)(m)))
+#define CmiSyncMulticastAndFree(g,s,m)  if (ConverseDeliver()) (CmiFreeMulticastFn((g),(s),(char*)(m)))
 
 #if CMK_NODE_QUEUE_AVAILABLE
 void          CmiSyncNodeSendFn(int, int, char *);
@@ -1181,6 +1189,7 @@ typedef struct {
   */
   char cmicore[CmiReservedHeaderSize];
   CthThread thread;
+  int serialNo;
 } CthThreadToken;
 
 CthThreadToken *CthGetToken(CthThread);
index a730efed3b704399104982c1666f58966bf4901f..110c831afa3688641ded2d4ef4358f073eb2bf15 100644 (file)
@@ -30,6 +30,13 @@ uint32_t ntohl(uint32_t netlong) {
 }
 #endif
 
+/** Specify if we are replaying the processor from message logs, thus disable delivering of messages */
+int replaySystem;
+
+int ConverseDeliver() {
+  return !replaySystem;
+}
+
 /***************************************************
   The CCS interface to the debugger
 */
index 23b023ff1445f70d7659228d2218d842eacbead8..a9e9021038a722593ad4e67b9a9312c91dc61d40 100644 (file)
@@ -192,6 +192,8 @@ CthThreadToken *CthGetToken(CthThread t){
        return B(t)->token;
 }
 
+CpvStaticDeclare(int, Cth_serialNo);
+
 /*********************** Stack Aliasing *********************
   Stack aliasing: instead of consuming virtual address space
   with isomalloc, map all stacks to the same virtual addresses
@@ -383,6 +385,7 @@ static void CthThreadBaseInit(CthThreadBase *th)
   static int serialno = 1;
   th->token = (CthThreadToken *)malloc(sizeof(CthThreadToken));
   th->token->thread = S(th);
+  th->token->serialNo = CpvAccess(Cth_serialNo)++;
   th->scheduled = 0;
 
   th->awakenfn = 0;
@@ -483,6 +486,9 @@ static void CthBaseInit(char **argv)
   
   CthCpvAccess(CthData)=0;
   CthCpvAccess(CthDatasize)=0;
+  
+  CpvInitialize(int, Cth_serialNo);
+  CpvAccess(Cth_serialNo) = 1;
 }
 
 int CthImplemented() { return 1; } 
@@ -515,6 +521,7 @@ void CthPupBase(pup_er p,CthThreadBase *t,int useMigratable)
                if(BgOutOfCoreFlag==0){
                    t->token = (CthThreadToken *)malloc(sizeof(CthThreadToken));
                    t->token->thread = S(t);
+                   t->token->serialNo = CpvAccess(Cth_serialNo)++;
                    /*For normal runs where this pup is needed,
                    set scheduled to 0 in the unpacking period since the thread has
                    not been scheduled */
@@ -530,6 +537,7 @@ void CthPupBase(pup_er p,CthThreadBase *t,int useMigratable)
                        t->token = (CthThreadToken *)malloc(sizeof(CthThreadToken));
                    }
                    t->token->thread = S(t);
+            t->token->serialNo = CpvAccess(Cth_serialNo)++;
                }
        }
        
index 45d7849bda989bb9ecc9028e065774707dd97ce0..2fc6938360931eebf2bb2ff287f3276f58d02a4b 100644 (file)
@@ -528,10 +528,12 @@ class SafePool {
     T msgs[MAXMSGS];
     typedef T (*allocFn)();
     typedef void (*freeFn)(T);
+    typedef void (*resetFn)(T);
     allocFn allocfn;
     freeFn  freefn;
+    resetFn resetfn;
   public:
-    SafePool(allocFn _afn, freeFn _ffn): allocfn(_afn), freefn(_ffn) {
+    SafePool(allocFn _afn, freeFn _ffn, resetFn _rfn=NULL): allocfn(_afn), freefn(_ffn), resetfn(_rfn) {
       for(int i=0;i<MAXMSGS;i++)
         msgs[i] = allocfn();
       num = MAXMSGS;
@@ -544,8 +546,10 @@ class SafePool {
     void put(T m) {
       if (num==MAXMSGS || CmiImmIsRunning())
         freefn(m);
-      else
+      else {
+        if (resetfn!=NULL) resetfn(m);
         msgs[num++] = m;
+      }
     }
 };