Changing CkMessageWatcher. Moving the file descriptor to the parent class and adding...
authorFilippo Gioachin <gioachin@illinois.edu>
Tue, 10 Nov 2009 23:51:25 +0000 (23:51 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Tue, 10 Nov 2009 23:51:25 +0000 (23:51 +0000)
src/ck-core/ck.C
src/ck-core/ck.h

index 9f58c982ef96e9a7ed4523cab3ecf7a6e0e19fa4..2291a4b6194c5afc851bb2f16e2a205f7990d412 100644 (file)
@@ -1831,33 +1831,56 @@ printf("[%d] DELETE!\n", CkMyPe());
 
 //------------------- Message Watcher (record/replay) ----------------
 
+#include "crc32.h"
 CkMessageWatcher::~CkMessageWatcher() {}
 
 class CkMessageRecorder : public CkMessageWatcher {
-       FILE *f;
 public:
-       CkMessageRecorder(FILE *f_) :f(f_) {}
-       ~CkMessageRecorder() {
-               fprintf(f,"-1 -1 -1");
-               fclose(f);
-       }
+  CkMessageRecorder(FILE *f_) { f=f_; }
+  ~CkMessageRecorder() {
+    fprintf(f,"-1 -1 -1");
+    fclose(f);
+  }
 
-       virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
-                if (env->getEvent())
-                    fprintf(f,"%d %d %d %d\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg);
-               return CmiTrue;
-       }
+private:
+  virtual CmiBool process(envelope *env,CkCoreState *ck) {
+    if (env->getEvent()) {
+      unsigned int crc = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
+      fprintf(f,"%d %d %d %d %x\n",env->getSrcPe(),env->getTotalsize(),env->getEvent(), env->getMsgtype()==NodeBocInitMsg || env->getMsgtype()==ForNodeBocMsg, crc);
+    }
+    return CmiTrue;
+  }
+};
+
+class CkMessageDetailRecorder : public CkMessageWatcher {
+public:
+  CkMessageDetailRecorder(FILE *f_) {
+    f=f_;
+    /* The file starts with "x 0" if it is little endian, "0 x" if big endian.
+     * The value of 'x' is the pointer size.
+     */
+    CmiUInt2 little = sizeof(void*);
+    fwrite(&little, 2, 1, f);
+  }
+  ~CkMessageDetailRecorder() {fclose(f);}
+private:
+  virtual CmiBool process(envelope *env, CkCoreState *ck) {
+    CmiUInt4 size = env->getTotalsize();
+    fwrite(&size, 4, 1, f);
+    fwrite(env, env->getTotalsize(), 1, f);
+    return CmiTrue;
+  }
 };
 
 //#define REPLAYDEBUG(args) ckout<<"["<<CkMyPe()<<"] "<< args <<endl;
 #define REPLAYDEBUG(args) /* empty */
 
 class CkMessageReplay : public CkMessageWatcher {
-       FILE *f;
        int nextPE, nextSize, nextEvent, nexttype; //Properties of next message we need:
+       unsigned int crc;
        /// Read the next message we need from the file:
        void getNext(void) {
-               if (4!=fscanf(f,"%d%d%d%d", &nextPE,&nextSize,&nextEvent,&nexttype)) {
+               if (5!=fscanf(f,"%d%d%d%d%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc)) {
                        // CkAbort("CkMessageReplay> Syntax error reading replay file");
                        nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
                }
@@ -1868,9 +1891,13 @@ class CkMessageReplay : public CkMessageWatcher {
                if (nextEvent!=env->getEvent()) return CmiFalse;
                if (nextSize!=env->getTotalsize())
                 {
-                       CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
+                       CkPrintf("CkMessageReplay> Message size changed during replay org: [%d %d %d] got: [%d %d %d]\n", nextPE, nextEvent, nextSize, env->getSrcPe(), env->getEvent(), env->getTotalsize());
                         return CmiFalse;
                 }
+               unsigned int crcnew = crc32(((unsigned char*)env)+CmiMsgHeaderSizeBytes, env->getTotalsize()-CmiMsgHeaderSizeBytes);
+               if (crcnew != crc) {
+                 CkPrintf("CkMessageReplay> Message CRC changed during replay org: [0x%x] got: [0x%x]\n",crc,crcnew);
+               }
                return CmiTrue;
        }
 
@@ -1897,12 +1924,15 @@ class CkMessageReplay : public CkMessageWatcher {
        }
 
 public:
-       CkMessageReplay(FILE *f_) :f(f_) { getNext();
-       REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
-                   }
+       CkMessageReplay(FILE *f_) {
+         f=f_;
+         getNext();
+         REPLAYDEBUG("Constructing ckMessageReplay: "<< nextPE <<" "<< nextSize <<" "<<nextEvent);
+       }
        ~CkMessageReplay() {fclose(f);}
 
-       virtual CmiBool processMessage(envelope *env,CkCoreState *ck) {
+private:
+       virtual CmiBool process(envelope *env,CkCoreState *ck) {
          REPLAYDEBUG("ProcessMessage message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent() <<" " <<env->getMsgtype() <<" " <<env->getMsgIdx());
                 if (env->getEvent() == 0) return CmiTrue;
                if (isNext(env)) { /* This is the message we were expecting */
@@ -1932,10 +1962,14 @@ public:
        }
 };
 
-static FILE *openReplayFile(const char *permissions) {
+#include "trace-common.h" /* For traceRoot and traceRootBaseLength */
+
+static FILE *openReplayFile(const char *prefix, const char *suffix, const char *permissions) {
 
-       char fName[200];
-       sprintf(fName,"ckreplay_%06d.log",CkMyPe());
+       int i;
+       char *fName = new char[CkpvAccess(traceRootBaseLength)+strlen(prefix)+strlen(suffix)+7];
+       strncpy(fName, CkpvAccess(traceRoot), CkpvAccess(traceRootBaseLength));
+       sprintf(fName+CkpvAccess(traceRootBaseLength), "%s%06d%s",prefix,CkMyPe(),suffix);
        FILE *f=fopen(fName,permissions);
        REPLAYDEBUG("openReplayfile "<<fName);
        if (f==NULL) {
@@ -1946,12 +1980,25 @@ static FILE *openReplayFile(const char *permissions) {
        return f;
 }
 
+#include "ckliststring.h"
 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
+    char *procs = NULL;
        REPLAYDEBUG("CkMessageWaterInit ");
-       if (CmiGetArgFlagDesc(argv,"+record","Record message processing order"))
-               ck->watcher=new CkMessageRecorder(openReplayFile("w"));
-       if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream"))
-               ck->watcher=new CkMessageReplay(openReplayFile("r"));
+    if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
+        CkListString list(procs);
+        if (list.includes(CkMyPe())) {
+          ck->addWatcher(new CkMessageDetailRecorder(openReplayFile("ckreplay_",".detail","w")));
+        }
+    }
+       if (CmiGetArgFlagDesc(argv,"+record","Record message processing order")) {
+               ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
+       }
+       if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream")) {
+               ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
+       }
+       if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Re-play the specified processors from recorded message content")) {
+         /*Nothing yet*/
+       }
 }
 
 extern "C"
index 4c0bf801193f9e8814407dee7ece9ffb2f081f0d..f33cf43973180ba597dffe0993a4d550254867f4 100644 (file)
@@ -78,13 +78,27 @@ class CkCoreState;
 
 /// Message watcher: for record/replay support
 class CkMessageWatcher {
+protected:
+  FILE *f;
+  CkMessageWatcher *next;
 public:
-       virtual ~CkMessageWatcher();
+    CkMessageWatcher() : next(NULL) { }
+    virtual ~CkMessageWatcher();
        /**
         * This message is about to be processed by Charm.
         * If this function returns false, the message will not be processed.
+        * The message is processed by the watcher starting from the innermost one
+        * up to the outermost
         */
-       virtual CmiBool processMessage(envelope *env,CkCoreState *ck) =0;
+       inline CmiBool processMessage(envelope *env,CkCoreState *ck) {
+         if (next != NULL) next->processMessage(env, ck);
+         return process(env, ck);
+       }
+protected:
+    /** This is used internally by this class to call the correct subclass method */
+       virtual CmiBool process(envelope *env,CkCoreState *ck) =0;
+public:
+    inline void setNext(CkMessageWatcher *w) { next = w; }
 };
 
 /// All the state that's useful to have on the receive side in the Charm Core (ck.C)
@@ -93,6 +107,11 @@ class CkCoreState {
        QdState *qd;
 public:
        CkMessageWatcher *watcher;
+       /** Adds an extra watcher (which wrap the previously existing one) */
+       inline void addWatcher(CkMessageWatcher *w) {
+         w->setNext(watcher);
+         watcher = w;
+       }
        
        CkCoreState() 
                :groupTable(CkpvAccess(_groupTable)),