Infrastructure for conditional delivery of messages (working).
authorFilippo Gioachin <gioachin@uiuc.edu>
Tue, 1 Jun 2010 16:04:21 +0000 (11:04 -0500)
committerFilippo Gioachin <gioachin@uiuc.edu>
Tue, 1 Jun 2010 16:06:53 +0000 (11:06 -0500)
A child is forked to handle conditional messages, parent forwards CCS requests to child.
Upon exit, child terminates, and parent resumes normal operations.
Now there is a function in CpdFreezeModeScheduler to pull new messages

src/ck-core/debug-charm.C
src/ck-core/debug-charm.h
src/conv-ccs/conv-ccs.c
src/conv-ccs/middle-ccs.C
src/conv-core/debug-conv.c
src/conv-core/debug-conv.h

index c26abb7e6a94add33432d9b9187e81aca2a36191..be587190fe7902a2237ca4a6c806f48fc11ba615 100644 (file)
@@ -423,6 +423,9 @@ void CpdPupMessage(PUP::er &p, void *msg)
 }
 
 CkpvStaticDeclare(void *, lastBreakPointMsg);
+int conditionalPipe[2] = {0, 0};
+CkpvDeclare(void*, conditionalQueue);
+
 
 //Cpd Lists for local and scheduler queues
 class CpdList_localQ : public CpdListAccessor {
@@ -438,10 +441,23 @@ public:
     return x;
   }
   virtual void pup(PUP::er &p, CpdListItemsRequest &req) {
-    int length = CdsFifo_Length((CdsFifo)(CkpvAccess(debugQueue)));
-    void ** messages = CdsFifo_Enumerate(CkpvAccess(debugQueue));
+    int length;
+    void ** messages;
     int curObj=0;
+    void *msg;
 
+    length = CdsFifo_Length((CdsFifo)(CkpvAccess(conditionalQueue)));
+    messages = CdsFifo_Enumerate(CkpvAccess(conditionalQueue));
+    for (curObj=-length; curObj<0; curObj++) {
+      void *msg = messages[length+curObj];
+      pupSingleMessage(p, curObj-1, msg);
+    }
+    delete[] messages;
+    
+    curObj = 0;
+    length = CdsFifo_Length((CdsFifo)(CkpvAccess(debugQueue)));
+    messages = CdsFifo_Enumerate(CkpvAccess(debugQueue));
+    
     if (CkpvAccess(lastBreakPointMsg) != NULL) {
       beginItem(p, -1);
       envelope *env=(envelope *)UsrToEnv(CkpvAccess(lastBreakPointMsg));
@@ -458,35 +474,40 @@ public:
     for(curObj=req.lo; curObj<req.hi; curObj++)
       if ((curObj>=0) && (curObj<length))
       {
-        beginItem(p,curObj);
         void *msg=messages[curObj]; /* converse message */
-        int isCharm=0;
-        const char *type="Converse";
-        p.comment("name");
-        char name[128];
+        pupSingleMessage(p, curObj, msg);
+      }
+    delete[] messages;
+
+  }
+
+  void pupSingleMessage(PUP::er &p, int curObj, void *msg) {
+    beginItem(p,curObj);
+    int isCharm=0;
+    const char *type="Converse";
+    p.comment("name");
+    char name[128];
 #if ! CMK_BLUEGENE_CHARM
-        if (CmiGetHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Local Charm";}
-        if (CmiGetXHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Network Charm";}
+    if (CmiGetHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Local Charm";}
+    if (CmiGetXHandler(msg)==_charmHandlerIdx) {isCharm=1; type="Network Charm";}
 #else
-        isCharm=1; type="BG";
+    isCharm=1; type="BG";
 #endif
-        sprintf(name,"%s %d: %s (%d)","Message",curObj,type,CmiGetHandler(msg));
-        p(name, strlen(name));
-
-        if (isCharm)
-        { /* charm message */
-          p.comment("charmMsg");
-          p.synchronize(PUP::sync_begin_object);
-          envelope *env=(envelope *)msg;
-          CkUnpackMessage(&env);
-          messages[curObj]=env;
-          CpdPupMessage(p, EnvToUsr(env));
-          //CkPupMessage(p, &messages[curObj], 0);
-          p.synchronize(PUP::sync_end_object);
-        }
-      }
-    delete[] messages;
+    if (curObj < 0) type="Conditional";
+    sprintf(name,"%s %d: %s (%d)","Message",curObj,type,CmiGetHandler(msg));
+    p(name, strlen(name));
 
+    if (isCharm)
+    { /* charm message */
+      p.comment("charmMsg");
+      p.synchronize(PUP::sync_begin_object);
+      envelope *env=(envelope *)msg;
+      CkUnpackMessage(&env);
+      //messages[curObj]=env;
+      CpdPupMessage(p, EnvToUsr(env));
+      //CkPupMessage(p, &messages[curObj], 0);
+      p.synchronize(PUP::sync_end_object);
+    }
   }
 };
 
@@ -516,11 +537,15 @@ void CpdDeliverMessage(char * msg) {
   sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
   //CmiPrintf("received deliver request %d\n",msgNum);
 
-  void *debugQ=CpvAccess(debugQueue);
+  void *debugQ=CkpvAccess(debugQueue);
   CdsFifo_Enqueue(debugQ, (void*)(-1)); // Enqueue a guard
   for (int i=0; i<msgNum; ++i) CdsFifo_Enqueue(debugQ, CdsFifo_Dequeue(debugQ));
   CkpvAccess(skipBreakpoint) = 1;
   char *queuedMsg = (char *)CdsFifo_Dequeue(debugQ);
+  if (_conditionalDelivery) {
+    CmiReference(queuedMsg);
+    CdsFifo_Enqueue(CkpvAccess(conditionalQueue), queuedMsg);
+  }  
 #if CMK_BLUEGENE_CHARM
   stopVTimer();
   BgProcessMessageDefault(cta(threadinfo), queuedMsg);
@@ -532,16 +557,16 @@ void CpdDeliverMessage(char * msg) {
   while ((m=CdsFifo_Dequeue(debugQ)) != (void*)(-1)) CdsFifo_Enqueue(debugQ, m);  
 }
 
-void CpdConditionalDeliveryScheduler() {
-  CsdSchedulerState_t state;
-  CsdSchedulerState_new(&state);
-  while (CpvAccess(conditionalDelivery)) {
-#if NODE_0_IS_CONVHOST
-    if (CmiMyPe()==0) CcsServerCheck(); /*Make sure we can get CCS messages*/
-#endif
-    msg = CsdNextMessage(&state);
-    
-  }
+void *CpdGetNextMessageConditional(CsdSchedulerState_t*) {
+  int len;
+  read(conditionalPipe[0], &len, 4);
+  void *msg = CmiAlloc(len);
+  read(conditionalPipe[0], msg, len);
+  return msg;
+}
+
+void CpdEndConditionalDelivery(char *msg) {
+  _exit(0);
 }
 
 #include <sys/wait.h>
@@ -551,25 +576,46 @@ void CpdDeliverMessageConditionally(char * msg) {
   sscanf(msg+CmiMsgHeaderSizeBytes, "%d", &msgNum);
   //CmiPrintf("received deliver request %d\n",msgNum);
 
+  int pipefd[2][2];
+  pipe(pipefd[0]); // parent to child
+  pipe(pipefd[1]); // child to parent
+  
   pid_t pid = fork();
   if (pid > 0) {
+    int bytes;
     CmiPrintf("parent %d\n",pid);
-    CpdConditionalDeliveryScheduler();
-    wait(NULL);
+    close(pipefd[0][0]);
+    close(pipefd[1][1]);
+    conditionalPipe[0] = pipefd[1][0];
+    conditionalPipe[1] = pipefd[0][1];
+    //CpdConditionalDeliveryScheduler(pipefd[1][0], pipefd[0][1]);
+    read(conditionalPipe[0], &bytes, 4);
+    char *buf = (char*)malloc(bytes);
+    read(conditionalPipe[0], buf, bytes);
+    CcsSendReply(bytes,buf);
+    free(buf);
     return;
   }
   
   //while (true);
   printf("child\n");
-  //while (1);
+  int volatile tmp=1;
+  //while (tmp);
   //_exit(0);
-  _replaySystem = 1;
-
-  void *debugQ=CpvAccess(debugQueue);
+  _conditionalDelivery = 1;
+  close(pipefd[0][1]);
+  close(pipefd[1][0]);
+  conditionalPipe[0] = pipefd[0][0];
+  conditionalPipe[1] = pipefd[1][1];
+  CpdGetNextMessage = CpdGetNextMessageConditional;
+
+  void *debugQ=CkpvAccess(debugQueue);
   CdsFifo_Enqueue(debugQ, (void*)(-1)); // Enqueue a guard
   for (int i=0; i<msgNum; ++i) CdsFifo_Enqueue(debugQ, CdsFifo_Dequeue(debugQ));
   CkpvAccess(skipBreakpoint) = 1;
   char *queuedMsg = (char *)CdsFifo_Dequeue(debugQ);
+  CmiReference(queuedMsg);
+  CdsFifo_Enqueue(CkpvAccess(conditionalQueue), queuedMsg);
 #if CMK_BLUEGENE_CHARM
   stopVTimer();
   BgProcessMessageDefault(cta(threadinfo), queuedMsg);
@@ -578,7 +624,7 @@ void CpdDeliverMessageConditionally(char * msg) {
   CmiHandleMessage(queuedMsg);
 #endif
   CkpvAccess(skipBreakpoint) = 0;
-  _exit(0);
+  //_exit(0);
   while ((m=CdsFifo_Dequeue(debugQ)) != (void*)(-1)) CdsFifo_Enqueue(debugQ, m);  
 }
 
@@ -670,6 +716,10 @@ void CpdDeliverSingleMessage () {
   if ( (CkpvAccess(lastBreakPointMsg) != NULL) && (CkpvAccess(lastBreakPointObject) != NULL) ) {
     EntryInfo * breakPointEntryInfo = CpvAccess(breakPointEntryTable)->get(CkpvAccess(lastBreakPointIndex));
     if (breakPointEntryInfo != NULL) {
+      if (_conditionalDelivery) {
+        CmiReference(CkpvAccess(lastBreakPointMsg));
+        CdsFifo_Enqueue(CkpvAccess(conditionalQueue),CkpvAccess(lastBreakPointMsg));
+      }
       breakPointEntryInfo->call(CkpvAccess(lastBreakPointMsg), CkpvAccess(lastBreakPointObject));
     }
     CkpvAccess(lastBreakPointMsg) = NULL;
@@ -683,6 +733,10 @@ void CpdDeliverSingleMessage () {
     if (!CdsFifo_Empty(CkpvAccess(debugQueue))) {
       CkpvAccess(skipBreakpoint) = 1;
       char *queuedMsg = (char *)CdsFifo_Dequeue(CkpvAccess(debugQueue));
+      if (_conditionalDelivery) {
+        CmiReference(queuedMsg);
+        CdsFifo_Enqueue(CkpvAccess(conditionalQueue),queuedMsg);
+      }
 #if CMK_BLUEGENE_CHARM
       stopVTimer();
       BgProcessMessageDefault(cta(threadinfo), queuedMsg);
@@ -906,12 +960,14 @@ void CpdCharmInit()
   CpdListRegister(new CpdList_localQ());
   CcsRegisterHandler("deliverMessage",(CmiHandler)CpdDeliverMessage);
   CcsRegisterHandler("deliverConditional",(CmiHandler)CpdDeliverMessageConditionally);
+  CcsRegisterHandler("endConditional",(CmiHandler)CpdEndConditionalDelivery);
   CpdListRegister(new CpdList_arrayElementNames());
   CpdListRegister(new CpdList_arrayElements());
   CpdListRegister(new CpdList_objectNames());
   CpdListRegister(new CpdList_object());
   CpdListRegister(new CpdList_message());
   CpdListRegister(new CpdList_msgStack());
+  CpdGetNextMessage = CsdNextMessage;
   CpdIsDebugMessage = CpdIsCharmDebugMessage;
 #if CMK_BLUEGENE_CHARM
   CpdIsDebugMessage = CpdIsBgCharmDebugMessage;
index 047e042ba8799e83285026d12f5354c9e7bbe7bb..e63803b27d0e8be56ff1181bab6186a3194fe57c 100644 (file)
@@ -15,7 +15,7 @@
 #include "cklists.h"
 
 #define CHARMDEBUG_MAJOR   10
-#define CHARMDEBUG_MINOR    2
+#define CHARMDEBUG_MINOR    3
 
 void *CpdGetCurrentObject();
 void *CpdGetCurrentMsg();
index 05aec6082c5b8f3b5a8727f4619b1e6b5db8b53e..ab82582ab2aaa4d6fe0f9fad6826195ef3efed24 100644 (file)
@@ -236,11 +236,33 @@ void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
 
 /* Call the handler */
   CpvAccess(ccsReq)=hdr;
-  callHandlerRec(fn,reqLen,reqData);
+  if (conditionalPipe[1]!=0 && _conditionalDelivery==0) {
+    /* We are conditionally delivering, send the message to the child and wait for its response */
+    int bytes = reqLen+((int)(reqData-((char*)hdr)))+CmiReservedHeaderSize;
+    write(conditionalPipe[1], &bytes, 4);
+    write(conditionalPipe[1], ((char*)hdr)-CmiReservedHeaderSize, bytes);
+    if (4==read(conditionalPipe[0], &bytes, 4)) {
+      char *buf = malloc(bytes);
+      read(conditionalPipe[0], buf, bytes);
+      CcsSendReply(bytes,buf);
+      free(buf);
+    } else {
+      /* the pipe has been closed */
+      close(conditionalPipe[0]);
+      conditionalPipe[0] = 0;
+      close(conditionalPipe[1]);
+      conditionalPipe[1] = 0;
+      wait(NULL);
+      CcsSendReply(0,NULL);
+    }
+  }
+  else {
+    callHandlerRec(fn,reqLen,reqData);
   
 /*Check if a reply was sent*/
-  if (CpvAccess(ccsReq)!=NULL)
-    CcsSendReply(0,NULL);/*Send an empty reply if not*/
+    if (CpvAccess(ccsReq)!=NULL)
+      CcsSendReply(0,NULL);/*Send an empty reply if not*/
+  }
 }
 
 #if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
index ebd6fc3b58fa6698a959ad5ce2c183317626139e..6c00e730a0be304178082fee3d964093f35db565 100644 (file)
@@ -73,7 +73,12 @@ extern "C" int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
       CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
     }
   } else {
-    CcsImpl_reply(rep, repLen, repData);
+    if (_conditionalDelivery == 0) CcsImpl_reply(rep, repLen, repData);
+    else {
+      /* We are the child of a conditional delivery, write to the parent the reply */
+      write(conditionalPipe[1], &repLen, 4);
+      write(conditionalPipe[1], repData, repLen);
+    }
   }
   return 0;
 }
index df77adbea9a7880d9c61d3d1409539332ee5891b..50daa8d50c2b353ac7f1dfa40c17a5876dd28fbc 100644 (file)
@@ -16,12 +16,14 @@ CpvExtern(int, freezeModeFlag);
 CpvStaticDeclare(int, continueFlag);
 CpvStaticDeclare(int, stepFlag);
 CpvExtern(void *, debugQueue);
+CpvExtern(void *, conditionalQueue);
 int _debugHandlerIdx;
 
 char ** memoryBackup;
 
 /** Specify if we are replaying the processor from message logs, thus disable delivering of messages */
 int _replaySystem = 0;
+int _conditionalDelivery = 0;
 
 #undef ConverseDeliver
 int ConverseDeliver(int pe) {
@@ -251,6 +253,7 @@ freeze mode-- only executes CCS requests.
 void CcsServerCheck(void);
 extern int _isCcsHandlerIdx(int idx);
 int (*CpdIsDebugMessage)(void *);
+void * (*CpdGetNextMessage)(CsdSchedulerState_t*);
 
 void CpdFreezeModeScheduler(void)
 {
@@ -268,7 +271,7 @@ void CpdFreezeModeScheduler(void)
 #if NODE_0_IS_CONVHOST
       if (CmiMyPe()==0) CcsServerCheck(); /*Make sure we can get CCS messages*/
 #endif
-      msg = CsdNextMessage(&state);
+      msg = CpdGetNextMessage(&state);
 
       if (msg!=NULL) {
         /*int hIdx=CmiGetHandler(msg);*/
@@ -283,8 +286,9 @@ void CpdFreezeModeScheduler(void)
            / * Debug messages should be handled immediately * /
            CmiHandleMessage(msg);
          } else */
-         if (CpdIsDebugMessage(msg)) {
-           CmiHandleMessage(msg);
+        
+      if (CpdIsDebugMessage(msg)) {
+        CmiHandleMessage(msg);
          }
          else
          /*An ordinary charm++ message-- queue it up*/
@@ -312,6 +316,9 @@ void CpdInit(void)
 
   CpvInitialize(void *, debugQueue);
   CpvAccess(debugQueue) = CdsFifo_Create();
+
+  CpvInitialize(void *, conditionalQueue);
+  CpvAccess(conditionalQueue) = CdsFifo_Create();
 #endif
   
   CcsRegisterHandler("ccs_debug", (CmiHandler)CpdDebugHandler);
index e733023acef43889a9f866edf9a75546b117816f..4007265b379ce647701df180ed68a6993ddcc77e 100644 (file)
@@ -35,6 +35,9 @@ void CpdStartGdb(void);
 void Cpd_CmiHandleMessage(void *msg);
 
 extern int (*CpdIsDebugMessage)(void*);
+extern void * (*CpdGetNextMessage)(CsdSchedulerState_t*);
+extern int _conditionalDelivery;
+extern int conditionalPipe[2];
 
 enum {
   CPD_ERROR = 0,