Using shmem to provide consistency and allow the un-delivery of a user-specified...
[charm.git] / src / ck-core / debug-charm.C
index f27f5722fd7320fc41ee2e203cb299ab8e306c57..87ce6da4947570bd8f02faffc1ec47f4ece43f6d 100644 (file)
@@ -422,10 +422,14 @@ void CpdPupMessage(PUP::er &p, void *msg)
   p.synchronize(PUP::sync_end_object);
 }
 
   p.synchronize(PUP::sync_end_object);
 }
 
+struct ConditionalList {
+  int count;
+  int msgs[1];
+};
 CkpvStaticDeclare(void *, lastBreakPointMsg);
 int conditionalPipe[2] = {0, 0};
 CkpvDeclare(void*, conditionalQueue);
 CkpvStaticDeclare(void *, lastBreakPointMsg);
 int conditionalPipe[2] = {0, 0};
 CkpvDeclare(void*, conditionalQueue);
-
+ConditionalList * conditionalShm = NULL;
 
 //Cpd Lists for local and scheduler queues
 class CpdList_localQ : public CpdListAccessor {
 
 //Cpd Lists for local and scheduler queues
 class CpdList_localQ : public CpdListAccessor {
@@ -531,17 +535,14 @@ class CpdList_message : public CpdListAccessor {
   }
 };
 
   }
 };
 
-void CpdDeliverMessage(char * msg) {
-  int msgNum;
+static void CpdDeliverMessageInt(int msgNum) {
   void *m;
   void *m;
-  sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
-  //CmiPrintf("received deliver request %d\n",msgNum);
-
   void *debugQ=CkpvAccess(debugQueue);
   CdsFifo_Enqueue(debugQ, (void*)(-1)); // Enqueue a guard
   for (int i=0; i<msgNum; ++i) CdsFifo_Enqueue(debugQ, CdsFifo_Dequeue(debugQ));
   CkpvAccess(skipBreakpoint) = 1;
   char *queuedMsg = (char *)CdsFifo_Dequeue(debugQ);
   void *debugQ=CkpvAccess(debugQueue);
   CdsFifo_Enqueue(debugQ, (void*)(-1)); // Enqueue a guard
   for (int i=0; i<msgNum; ++i) CdsFifo_Enqueue(debugQ, CdsFifo_Dequeue(debugQ));
   CkpvAccess(skipBreakpoint) = 1;
   char *queuedMsg = (char *)CdsFifo_Dequeue(debugQ);
+  if (_conditionalDelivery==1) conditionalShm->msgs[conditionalShm->count++] = msgNum;
   if (_conditionalDelivery) {
     CmiReference(queuedMsg);
     CdsFifo_Enqueue(CkpvAccess(conditionalQueue), queuedMsg);
   if (_conditionalDelivery) {
     CmiReference(queuedMsg);
     CdsFifo_Enqueue(CkpvAccess(conditionalQueue), queuedMsg);
@@ -557,6 +558,13 @@ void CpdDeliverMessage(char * msg) {
   while ((m=CdsFifo_Dequeue(debugQ)) != (void*)(-1)) CdsFifo_Enqueue(debugQ, m);  
 }
 
   while ((m=CdsFifo_Dequeue(debugQ)) != (void*)(-1)) CdsFifo_Enqueue(debugQ, m);  
 }
 
+void CpdDeliverMessage(char * msg) {
+  int msgNum;
+  sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
+  //CmiPrintf("received deliver request %d\n",msgNum);
+  CpdDeliverMessageInt(msgNum);
+}
+
 void *CpdGetNextMessageConditional(CsdSchedulerState_t*) {
   int len;
   read(conditionalPipe[0], &len, 4);
 void *CpdGetNextMessageConditional(CsdSchedulerState_t*) {
   int len;
   read(conditionalPipe[0], &len, 4);
@@ -565,23 +573,23 @@ void *CpdGetNextMessageConditional(CsdSchedulerState_t*) {
   return msg;
 }
 
   return msg;
 }
 
-void CpdEndConditionalDelivery(char *msg) {
-  _exit(0);
-}
+#include <sys/wait.h>
+#include <sys/ipc.h>
+#include <sys/shm.h>
 
 extern "C" void CpdDeliverSingleMessage ();
 
 
 extern "C" void CpdDeliverSingleMessage ();
 
-#include <sys/wait.h>
-void CpdDeliverMessageConditionally(char * msg) {
-  int msgNum;
-  void *m;
-  sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
-  //CmiPrintf("received deliver request %d\n",msgNum);
-
+static pid_t CpdConditional_SetupComm() {
   int pipefd[2][2];
   pipe(pipefd[0]); // parent to child
   pipe(pipefd[1]); // child to parent
   
   int pipefd[2][2];
   pipe(pipefd[0]); // parent to child
   pipe(pipefd[1]); // child to parent
   
+  if (conditionalShm == NULL) {
+    int shmemid = shmget(IPC_PRIVATE, 1024*1024, IPC_CREAT | 0666);
+    conditionalShm = (ConditionalList*)shmat(shmemid, NULL, 0);
+    conditionalShm->count = 0;
+  }
+  
   pid_t pid = fork();
   if (pid > 0) {
     int bytes;
   pid_t pid = fork();
   if (pid > 0) {
     int bytes;
@@ -596,40 +604,68 @@ void CpdDeliverMessageConditionally(char * msg) {
     read(conditionalPipe[0], buf, bytes);
     CcsSendReply(bytes,buf);
     free(buf);
     read(conditionalPipe[0], buf, bytes);
     CcsSendReply(bytes,buf);
     free(buf);
-    return;
+    return pid;
   }
   }
-  
-  //while (true);
-  printf("child\n");
-  int volatile tmp=1;
+
+  //int volatile tmp=1;
   //while (tmp);
   //while (tmp);
-  //_exit(0);
+  printf("child\n");
   _conditionalDelivery = 1;
   close(pipefd[0][1]);
   close(pipefd[1][0]);
   conditionalPipe[0] = pipefd[0][0];
   conditionalPipe[1] = pipefd[1][1];
   CpdGetNextMessage = CpdGetNextMessageConditional;
   _conditionalDelivery = 1;
   close(pipefd[0][1]);
   close(pipefd[1][0]);
   conditionalPipe[0] = pipefd[0][0];
   conditionalPipe[1] = pipefd[1][1];
   CpdGetNextMessage = CpdGetNextMessageConditional;
+  return 0;
+}
 
 
-  if (msgNum == -1) CpdDeliverSingleMessage();
-  else {
-    void *debugQ=CkpvAccess(debugQueue);
-    CdsFifo_Enqueue(debugQ, (void*)(-1)); // Enqueue a guard
-    for (int i=0; i<msgNum; ++i) CdsFifo_Enqueue(debugQ, CdsFifo_Dequeue(debugQ));
-    CkpvAccess(skipBreakpoint) = 1;
-    char *queuedMsg = (char *)CdsFifo_Dequeue(debugQ);
-    CmiReference(queuedMsg);
-    CdsFifo_Enqueue(CkpvAccess(conditionalQueue), queuedMsg);
-#if CMK_BLUEGENE_CHARM
-    stopVTimer();
-    BgProcessMessageDefault(cta(threadinfo), queuedMsg);
-    startVTimer();
-#else
-    CmiHandleMessage(queuedMsg);
-#endif
-    CkpvAccess(skipBreakpoint) = 0;
-    //_exit(0);
-    while ((m=CdsFifo_Dequeue(debugQ)) != (void*)(-1)) CdsFifo_Enqueue(debugQ, m);
+void CpdEndConditionalDelivery(char *msg) {
+  int msgNum;
+  void *m;
+  sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
+  printf("%d messages:\n",conditionalShm->count);
+  for (int i=0; i<conditionalShm->count; ++i)
+    printf("message delivered %d\n",conditionalShm->msgs[i]);
+  conditionalShm->count = msgNum;
+  shmdt(conditionalShm);
+  _exit(0);
+}
+
+extern "C" void CpdEndConditionalDeliver_master() {
+  close(conditionalPipe[0]);
+  conditionalPipe[0] = 0;
+  close(conditionalPipe[1]);
+  conditionalPipe[1] = 0;
+  wait(NULL);
+  if (conditionalShm->count == 0) {
+    CcsSendReply(0,NULL);
+    shmdt(conditionalShm);
+    conditionalShm = NULL;
+  } else {
+    if (CpdConditional_SetupComm()==0) {
+      // We are in the child, deliver again the messages
+      _conditionalDelivery = 2;
+      printf("new child: redelivering %d messages\n",conditionalShm->count);
+      for (int i=0; i<conditionalShm->count; ++i) {
+        int msgNum = conditionalShm->msgs[i];
+        if (msgNum == -1) CpdDeliverSingleMessage();
+        else CpdDeliverMessageInt(msgNum);
+      }
+      _conditionalDelivery = 1;
+      CcsSendReply(0, NULL);
+    }
+  }
+}
+
+void CpdDeliverMessageConditionally(char * msg) {
+  int msgNum;
+  void *m;
+  sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
+  //CmiPrintf("received deliver request %d\n",msgNum);
+
+  if (CpdConditional_SetupComm()==0) {
+    if (msgNum == -1) CpdDeliverSingleMessage();
+    else CpdDeliverMessageInt(msgNum);
   }
 }
 
   }
 }
 
@@ -722,6 +758,7 @@ void CpdDeliverSingleMessage () {
     EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
     if (breakPointEntryInfo != NULL) {
       if (_conditionalDelivery) {
     EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
     if (breakPointEntryInfo != NULL) {
       if (_conditionalDelivery) {
+        if (_conditionalDelivery==1) conditionalShm->msgs[conditionalShm->count++] = -1;
         void *env = UsrToEnv(CkpvAccess(lastBreakPointMsg));
         CmiReference(env);
         CdsFifo_Enqueue(CkpvAccess(conditionalQueue),env);
         void *env = UsrToEnv(CkpvAccess(lastBreakPointMsg));
         CmiReference(env);
         CdsFifo_Enqueue(CkpvAccess(conditionalQueue),env);
@@ -740,6 +777,7 @@ void CpdDeliverSingleMessage () {
       CkpvAccess(skipBreakpoint) = 1;
       char *queuedMsg = (char *)CdsFifo_Dequeue(CkpvAccess(debugQueue));
       if (_conditionalDelivery) {
       CkpvAccess(skipBreakpoint) = 1;
       char *queuedMsg = (char *)CdsFifo_Dequeue(CkpvAccess(debugQueue));
       if (_conditionalDelivery) {
+        if (_conditionalDelivery==1) conditionalShm->msgs[conditionalShm->count++] = 0;
         CmiReference(queuedMsg);
         CdsFifo_Enqueue(CkpvAccess(conditionalQueue),queuedMsg);
       }
         CmiReference(queuedMsg);
         CdsFifo_Enqueue(CkpvAccess(conditionalQueue),queuedMsg);
       }