Merge branch 'charm' into development
[charm.git] / src / ck-core / ck.C
index 7afcef5b82dfc24bddb4c670108f4aa11cc568a0..9304fe9c7547bf0538146733d58c9b8ec1de38fe 100644 (file)
@@ -1193,7 +1193,7 @@ void CkUnpackMessage(envelope **pEnv)
 // the PE can never be CLD_ANYWHERE; wasting _infoFn calls.
 // Thus these accellerated versions of the Cld calls.
 
-int index_objectQHandler;
+static int index_objectQHandler;
 int index_tokenHandler;
 static int index_skipCldHandler;
 
@@ -1871,7 +1871,7 @@ class CkMessageRecorder : public CkMessageWatcher {
 public:
   CkMessageRecorder(FILE *f_) { f=f_; }
   ~CkMessageRecorder() {
-    fprintf(f,"-1 -1 -1");
+    fprintf(f,"-1 -1 -1 ");
     fclose(f);
   }
 
@@ -1888,6 +1888,10 @@ private:
     }
     return CmiTrue;
   }
+  virtual int process(CthThreadToken *token,CkCoreState *ck) {
+    fprintf(f, "%d %d %d\n",CkMyPe(), -2, token->serialNo);
+    return 1;
+  }
 };
 
 class CkMessageDetailRecorder : public CkMessageWatcher {
@@ -1924,10 +1928,25 @@ class CkMessageReplay : public CkMessageWatcher {
        unsigned int crc1, crc2;
        /// Read the next message we need from the file:
        void getNext(void) {
+         if (3!=fscanf(f,"%d%d%d", &nextPE,&nextSize,&nextEvent)) CkAbort("CkMessageReplay> Syntax error reading replay file");
+         if (nextSize > 0) {
+           // We are reading a regular message
+           if (3!=fscanf(f,"%d%x%x", &nexttype,&crc1,&crc2)) {
+             CkAbort("CkMessageReplay> Syntax error reading replay file");
+           }
+         } else if (nextSize == -2) {
+           // We are reading a special message (right now only thread awaken)
+           getNext(); // For now simply skip that
+         } else if (nextPE!=-1 || nextSize!=-1 || nextEvent!=-1) {
+           CkPrintf("Read from file item %d %d %d\n",nextPE,nextSize,nextEvent);
+           CkAbort("CkMessageReplay> Unrecognized input");
+         }
+           /*
                if (6!=fscanf(f,"%d%d%d%d%x%x", &nextPE,&nextSize,&nextEvent,&nexttype,&crc1,&crc2)) {
-                       // CkAbort("CkMessageReplay> Syntax error reading replay file");
+                       CkAbort("CkMessageReplay> Syntax error reading replay file");
                        nextPE=nextSize=nextEvent=nexttype=-1; //No destructor->record file just ends in the middle!
                }
+               */
                counter++;
        }
        /// If this is the next message we need, advance and return CmiTrue.
@@ -1964,7 +1983,8 @@ class CkMessageReplay : public CkMessageWatcher {
                        envelope *env=delayed.deq();
                        if (isNext(env)) { /* this is the next message: process it */
                                REPLAYDEBUG("Dequeueing message: "<<env->getSrcPe()<<" "<<env->getTotalsize()<<" "<<env->getEvent())
-                               CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
+                               //CmiSyncSendAndFree(CkMyPe(),env->getTotalsize(),(char *)env);
+                               CsdEnqueueLifo((void*)env); // Make it at the beginning since this is the one we want next
                                return;
                        }
                        else /* Not ready yet-- put it back in the
@@ -2015,12 +2035,56 @@ private:
                        return CmiFalse;
                }
        }
+       virtual int process(CthThreadToken *token, CkCoreState *ck) {
+         return 1;
+       }
+};
+
+class CkMessageDetailReplay : public CkMessageWatcher {
+  void *getNext() {
+    CmiUInt4 size; size_t nread;
+    if ((nread=fread(&size, 4, 1, f)) < 1) {
+      if (feof(f)) return NULL;
+      CkPrintf("Broken record file (metadata) got %d\n",nread);
+      CkAbort("");
+    }
+    void *env = CmiAlloc(size);
+    if ((nread=fread(env, size, 1, f)) < 1) {
+      CkPrintf("Broken record file (data) expecting %d, got %d\n",size,nread);
+      CkAbort("");
+    }
+    return env;
+  }
+public:
+  CkMessageDetailReplay(FILE *f_) {
+    f=f_;
+    /* This must match what CkMessageDetailRecorder did */
+    CmiUInt2 little;
+    fread(&little, 2, 1, f);
+    if (little != sizeof(void*)) {
+      CkAbort("Replaying on a different architecture from which recording was done!");
+    }
+
+    CmiPushPE(CkMyPe(), getNext());
+  }
+  virtual CmiBool process(envelope *env,CkCoreState *ck) {
+    CmiPushPE(CkMyPe(), getNext());
+    return CmiTrue;
+  }
 };
 
 extern "C" void CkMessageReplayQuiescence(void *rep, double time) {
   CkPrintf("[%d] Quiescence detected\n",CkMyPe());
   CkMessageReplay *replay = (CkMessageReplay*)rep;
-  
+  //CmiStartQD(CkMessageReplayQuiescence, replay);
+}
+
+extern "C" int CmiExecuteThreadResume(CthThreadToken *token) {
+  CkCoreState *ck = CkpvAccess(_coreState);
+  if (ck->watcher!=NULL) {
+    return ck->watcher->processThread(token,ck);
+  }
+  return 1;
 }
 
 #include "trace-common.h" /* For traceRoot and traceRootBaseLength */
@@ -2044,7 +2108,8 @@ static FILE *openReplayFile(const char *prefix, const char *suffix, const char *
 #include "ckliststring.h"
 void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
     char *procs = NULL;
-       REPLAYDEBUG("CkMessageWaterInit ");
+    replaySystem = 0;
+       REPLAYDEBUG("CkMessageWatcherInit ");
     if (CmiGetArgStringDesc(argv,"+record-detail",&procs,"Record full message content for the specified processors")) {
         CkListString list(procs);
         if (list.includes(CkMyPe())) {
@@ -2056,14 +2121,24 @@ void CkMessageWatcherInit(char **argv,CkCoreState *ck) {
            CpdSetInitializeMemory(1);
                ck->addWatcher(new CkMessageRecorder(openReplayFile("ckreplay_",".log","w")));
        }
-       if (CmiGetArgFlagDesc(argv,"+replay","Re-play recorded message stream")) {
-           CpdSetInitializeMemory(1);
-               ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
-       }
-       if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Re-play the specified processors from recorded message content")) {
+       if (CmiGetArgStringDesc(argv,"+replay-detail",&procs,"Replay the specified processors from recorded message content")) {
            CpdSetInitializeMemory(1);
-         /*Nothing yet*/
+           // Set the parameters of the processor
+#if CMK_SHARED_VARS_UNAVAILABLE
+           _Cmi_mype = atoi(procs);
+           while (procs[0]!='/') procs++;
+           procs++;
+           _Cmi_numpes = atoi(procs);
+#else
+           CkAbort("+replay-detail available only for non-SMP build");
+#endif
+           replaySystem = 1;
+           ck->addWatcher(new CkMessageDetailReplay(openReplayFile("ckreplay_",".detail","r")));
        }
+    if (CmiGetArgFlagDesc(argv,"+replay","Replay recorded message stream")) {
+        CpdSetInitializeMemory(1);
+        ck->addWatcher(new CkMessageReplay(openReplayFile("ckreplay_",".log","r")));
+    }
 }
 
 extern "C"