Changed CCS to support Converse-level broadcasts and multicasts (i.e same handler...
authorFilippo Gioachin <gioachin@illinois.edu>
Thu, 20 Aug 2009 01:51:29 +0000 (01:51 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Thu, 20 Aug 2009 01:51:29 +0000 (01:51 +0000)
CpdDebug now uses the new CCS broadcast to reply to queries.
Changed how notifications are sent to CharmDebug: a new generic function "CpdNotify" is introduced.
Enabled "EmergencyExit" on net and mpi layers. This function is called whenever CmiAbort or a signal is received. In the MPI layer, signals are now registered, and the shutdown process has a barrier to ensure all processors have a chance of calling EmergencyExit.

src/arch/mpi/machine.c
src/arch/net/charmrun/charmrun.c
src/arch/net/machine.c
src/conv-ccs/ccs-server.c
src/conv-ccs/conv-ccs.c
src/conv-ccs/conv-ccs.h
src/conv-core/debug-conv.c
src/conv-core/debug-conv.h

index 2e5ab2a5fe5710d4efd8a2f50f748e9478f1428c..90106a032b1ec9b09699737a277dcede4caff8e4 100644 (file)
@@ -447,7 +447,7 @@ static void CmiStartThreads(char **argv)
 #endif /* non smp */
 
 /*Add a message to this processor's receive queue, pe is a rank */
-static void CmiPushPE(int pe,void *msg)
+void CmiPushPE(int pe,void *msg)
 {
   CmiState cs = CmiGetStateN(pe);
   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
@@ -1580,6 +1580,36 @@ static void CmiNotifyStillIdle(CmiIdleState *s)
 FILE *debugLog = NULL;
 #endif
 
+static int machine_exit_idx;
+static void machine_exit(char *m) {
+  EmergencyExit();
+  /*printf("--> %d: machine_exit\n",CmiMyPe());*/
+  fflush(stdout);
+  CmiNodeBarrier();
+  if (CmiMyRank() == 0) {
+    MPI_Barrier(MPI_COMM_WORLD);
+    /*printf("==> %d: passed barrier\n",CmiMyPe());*/
+    MPI_Abort(MPI_COMM_WORLD, 1);
+  } else {
+    while (1) CmiYield();
+  }
+}
+
+#include <signal.h>
+static void KillOnAllSigs(int sigNo) {
+  static int already_in_signal_handler = 0;
+  if (already_in_signal_handler) MPI_Abort(1);
+  already_in_signal_handler = 1;
+  if (CpvAccess(cmiArgDebugFlag)) {
+    CpdNotify(CPD_SIGNAL, sigNo);
+    CpdFreeze();
+  }
+  char *m = CmiAlloc(CmiMsgHeaderSizeBytes);
+  CmiSetHandler(m, machine_exit_idx);
+  CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
+  machine_exit(m);
+}
+
 static void ConverseRunPE(int everReturn)
 {
   CmiIdleState *s=CmiNotifyGetState();
@@ -1600,6 +1630,7 @@ static void ConverseRunPE(int everReturn)
   CthInit(CmiMyArgv);
 
   ConverseCommonInit(CmiMyArgv);
+  machine_exit_idx = CmiRegisterHandler(machine_exit);
 
 #if CMI_MPI_TRACE_USEREVENTS
 #ifndef CMK_OPTIMIZE
@@ -1723,6 +1754,22 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
     printf("Charm++: Running in idle blocking mode.\n");
   }
 
+  /* setup signal handlers */
+  signal(SIGSEGV, KillOnAllSigs);
+  signal(SIGFPE, KillOnAllSigs);
+  signal(SIGILL, KillOnAllSigs);
+  signal(SIGINT, KillOnAllSigs);
+  signal(SIGTERM, KillOnAllSigs);
+  signal(SIGABRT, KillOnAllSigs);
+#   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
+  signal(SIGQUIT, KillOnAllSigs);
+  signal(SIGBUS, KillOnAllSigs);
+/*#     if CMK_HANDLE_SIGUSR
+  signal(SIGUSR1, HandleUserSignals);
+  signal(SIGUSR2, HandleUserSignals);
+#     endif*/
+#   endif /*UNIX*/
+  
 #if CMK_NO_OUTSTANDING_SENDS
   no_outstanding_sends=1;
 #endif
@@ -1843,10 +1890,21 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
 
 void CmiAbort(const char *message)
 {
+  /* if CharmDebug is attached simply try to send a message to it */
+  if (CpvAccess(cmiArgDebugFlag)) {
+    CpdNotify(CPD_ABORT, message);
+    CpdFreeze();
+  }
+  
   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
         "Reason: %s\n",CmiMyPe(),message);
  /*  CmiError(message); */
   CmiPrintStackTrace(0);
+  char *m = CmiAlloc(CmiMsgHeaderSizeBytes);
+  CmiSetHandler(m, machine_exit_idx);
+  CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
+  machine_exit(m);
+  /* Program never reaches here */
   MPI_Abort(MPI_COMM_WORLD, 1);
 }
 
index 32f212cba4055d96993b8d352ebc3c7c4609c2af..0cdfc1ae58501974d1efed049534145fe9a3131b 100644 (file)
@@ -1381,11 +1381,21 @@ void req_ccs_connect(void)
   pe=ChMessageInt(h.hdr.pe);
   reqBytes=ChMessageInt(h.hdr.len);
 
-  if (pe<0 || pe>=nodetab_size) {
+  if (pe<=-nodetab_size || pe>=nodetab_size) {
+    /*Treat out of bound values as value 0*/
        pe=0;
        h.hdr.pe=ChMessageInt_new(pe);
   }
-
+  else if (pe == -1) {
+    /*Treat -1 as broadcast and sent to 0 as root of the spanning tree*/
+    pe = 0;
+  }
+  else if (pe < -1) {
+    /*Treat negative values as multicast to a number of processors specified by -pe.
+      The pes to multicast to follows sits at the beginning of reqData*/
+    pe = *(int*)reqData;
+  }
+  
   if (! check_stdio_header(&h.hdr)) {
 
 #define LOOPBACK 0
index 32c52a947d1a07c45d9a3f25d44522aa5c171145..c79ff7ecee519b2c3434dab64eb0edfeed77fe98 100644 (file)
@@ -343,6 +343,7 @@ static void machine_exit(int status)
   machine_initiated_shutdown=1;
 
   CmiDestoryLocks();           /* destory locks to prevent dead locking */
+  EmergencyExit();
 
 #if CMK_USE_GM
   if (gmport) { 
@@ -383,7 +384,7 @@ static void KillOnAllSigs(int sigNo)
   already_in_signal_handler=1;
 
   if (CpvAccess(cmiArgDebugFlag)) {
-    CmiPrintf("CPD: Signal received on processor %d: %d\n",CmiMyPe(),sigNo);
+    CpdNotify(CPD_SIGNAL,sigNo);
     CpdFreeze();
   }
   
@@ -391,7 +392,7 @@ static void KillOnAllSigs(int sigNo)
 
   if (sigNo==SIGSEGV) {
      sig="segmentation violation";
-     suggestion="Try running with '++debug', or linking with '-memory paranoid'.\n";
+     suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
   }
   if (sigNo==SIGFPE) {
      sig="floating point exception";
@@ -581,7 +582,7 @@ void CmiAbort(const char *message)
 
   /* if CharmDebug is attached simply try to send a message to it */
   if (CpvAccess(cmiArgDebugFlag)) {
-    CmiPrintf("CPD: CmiAbort called on processor %d\n",CmiMyPe());
+    CpdNotify(CPD_ABORT, message);
     CpdFreeze();
   }
   
@@ -1233,7 +1234,7 @@ void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
 {
   MACHSTATE(2,"Outgoing CCS reply");
   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
-                      repData,repLen);
+      repData,repLen);
   MACHSTATE(1,"Outgoing CCS reply away");
 }
 #endif
@@ -1325,29 +1326,35 @@ static int InternalScanf(char *fmt, va_list l)
 /*New stdarg.h declarations*/
 void CmiPrintf(const char *fmt, ...)
 {
+  CpdSystemEnter();
   va_list p; va_start(p, fmt);
   if (Cmi_charmrun_fd!=-1)
     InternalPrintf(fmt, p);
   else
     vfprintf(stdout,fmt,p);
   va_end(p);
+  CpdSystemExit();
 }
 
 void CmiError(const char *fmt, ...)
 {
+  CpdSystemEnter();
   va_list p; va_start (p, fmt);
   if (Cmi_charmrun_fd!=-1)
     InternalError(fmt, p);
   else
     vfprintf(stderr,fmt,p);
   va_end(p);
+  CpdSystemExit();
 }
 
 int CmiScanf(const char *fmt, ...)
 {
+  CpdSystemEnter();
   va_list p; int i; va_start(p, fmt);
   i = InternalScanf((char *)fmt, p);
   va_end(p);
+  CpdSystemExit();
   return i;
 }
 
index 44e3d17796a7834adf70019ea6cc12e052c6b0e0..fd97c5cf305f088ebc95074f041090feb10f2b0a 100644 (file)
@@ -394,8 +394,13 @@ static int CcsServer_recvRequestData(SOCKET fd,
   hdr->len=req.len;
   hdr->replyFd=ChMessageInt_new(fd);
 
+  /*Is it a multicast?*/
+  int numPes = 0;
+  int destPE = ChMessageInt(hdr->pe);
+  if (destPE < -1) numPes = -destPE;
+  
   /*Grab the user data portion of the message*/
-  reqBytes=ChMessageInt(req.len);
+  reqBytes=ChMessageInt(req.len) + numPes*sizeof(int);
   *reqData=(char *)malloc(reqBytes);
   if (-1==skt_recvN(fd,*reqData,reqBytes)) {
     fprintf(stdout,"CCS ERROR> Retrieving %d message bytes\n",reqBytes);
index 92052356ab2ff739da9abdabae3a5a1b356c2b36..5aaa089d7bafd7eb2feeee42bb1ee9281c3cf673 100644 (file)
@@ -32,6 +32,7 @@ typedef struct CcsHandlerRec {
        CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
        CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
        void *userPtr;
+       CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
        int nCalls; /* Number of times handler has been executed*/
 } CcsHandlerRec;
 
@@ -42,6 +43,7 @@ static void initHandlerRec(CcsHandlerRec *c,const char *name) {
   c->fn=NULL;
   c->fnOld=NULL;
   c->userPtr=NULL;
+  c->mergeFn=NULL;
   c->nCalls=0;
 }
 
@@ -67,21 +69,26 @@ CpvStaticDeclare(CcsHandlerTable, ccsTab);
 
 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
 
-void CcsRegisterHandler(const char *name, CmiHandler fn)
-{
+void CcsRegisterHandler(const char *name, CmiHandler fn) {
   CcsHandlerRec cp;
   initHandlerRec(&cp,name);
   cp.fnOld=fn;
   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
 }
-void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr)
-{
+void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
   CcsHandlerRec cp;
   initHandlerRec(&cp,name);
   cp.fn=fn;
   cp.userPtr=ptr;
   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
 }
+void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
+  CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
+  if (rec==NULL) {
+    CmiAbort("CCS: Unknown CCS handler name.\n");
+  }
+  rec->mergeFn=newMerge;
+}
 
 int CcsEnabled(void)
 {
@@ -99,11 +106,46 @@ void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
 }
 
+static int rep_fw_handler_idx;
+
+/**
+ * Decide if the reply is ready to be forwarded to the waiting client,
+ * or if combination is required (for broadcast/multicast CCS requests.
+ */
+int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
+  int repPE = (int)ChMessageInt(rep->pe);
+  if (repPE <= -1) {
+    /* Reduce the message to get the final reply */
+    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+repLen;
+    char *msg=CmiAlloc(len);
+    char *r=msg+CmiMsgHeaderSizeBytes;
+    rep->len = ChMessageInt_new(repLen);
+    *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
+    memcpy(r,repData,repLen);
+    CmiSetHandler(msg,rep_fw_handler_idx);
+    char *handlerStr=rep->handler;
+    CcsHandlerRec *fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
+    if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
+    if (repPE <= -1) {
+      /* CCS Broadcast */
+      CmiReduce(msg, len, fn->mergeFn);
+    } else {
+      /* CCS Multicast */
+      CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn);
+    }
+  } else {
+    CcsImpl_reply(rep, repLen, repData);
+  }
+}
+
 CcsDelayedReply CcsDelayReply(void)
 {
   CcsDelayedReply ret;
-  ret.attr=CpvAccess(ccsReq)->attr;
-  ret.replyFd=CpvAccess(ccsReq)->replyFd;
+  int len = sizeof(CcsImplHeader);
+  if (ChMessageInt(CpvAccess(ccsReq)->pe) < -1)
+    len += ChMessageInt(CpvAccess(ccsReq)->pe) * sizeof(int);
+  ret.hdr = (CcsImplHeader*)malloc(len);
+  memcpy(ret.hdr, CpvAccess(ccsReq), len);
   CpvAccess(ccsReq)=NULL;
   return ret;
 }
@@ -113,34 +155,32 @@ void CcsSendReply(int replyLen, const void *replyData)
   if (CpvAccess(ccsReq)==NULL)
     CmiAbort("CcsSendReply: reply already sent!\n");
   CpvAccess(ccsReq)->len = ChMessageInt_new(1);
-  CcsImpl_reply(CpvAccess(ccsReq),replyLen,replyData);
+  CcsReply(CpvAccess(ccsReq),replyLen,replyData);
   CpvAccess(ccsReq) = NULL;
 }
 
 void CcsSendDelayedReply(CcsDelayedReply d,int replyLen, const void *replyData)
 {
-  CcsImplHeader h;
-  h.attr=d.attr;
-  h.replyFd=d.replyFd;
-  h.len=ChMessageInt_new(1);
-  CcsImpl_reply(&h,replyLen,replyData);
+  CcsImplHeader *h = d.hdr;
+  h->len=ChMessageInt_new(1);
+  CcsReply(h,replyLen,replyData);
+  free(h);
 }
 
 void CcsNoReply()
 {
   if (CpvAccess(ccsReq)==NULL) return;
   CpvAccess(ccsReq)->len = ChMessageInt_new(0);
-  CcsImpl_reply(CpvAccess(ccsReq),0,NULL);
+  CcsReply(CpvAccess(ccsReq),0,NULL);
   CpvAccess(ccsReq) = NULL;
 }
 
 void CcsNoDelayedReply(CcsDelayedReply d)
 {
-  CcsImplHeader h;
-  h.attr=d.attr;
-  h.replyFd=d.replyFd;
-  h.len = ChMessageInt_new(0);
-  CcsImpl_reply(&h,0,NULL);
+  CcsImplHeader *h = d.hdr;
+  h->len = ChMessageInt_new(0);
+  CcsReply(h,0,NULL);
+  free(h);
 }
 
 
@@ -183,9 +223,31 @@ static void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
 int _ccsHandlerIdx;/*Converse handler index of below routine*/
 static void req_fw_handler(char *msg)
 {
-  CcsHandleRequest((CcsImplHeader *)(msg+CmiMsgHeaderSizeBytes),
-                  msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
-  CmiFree(msg);  
+  CcsImplHeader *hdr = (CcsImplHeader *)(msg+CmiMsgHeaderSizeBytes);
+  int destPE = (int)ChMessageInt(hdr->pe);
+  if (CmiMyPe() == 0 && destPE == -1) {
+    /* Broadcast message to all other processors */
+    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len);
+    CmiSyncBroadcast(len, msg);
+  }
+  else if (destPE < -1) {
+    /* Multicast the message to your children */
+    int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+ChMessageInt(hdr->len)-destPE*sizeof(int);
+    int index;
+    int *pes = (int*)(msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
+    for (index=0; index<-destPE; ++index) {
+      if (pes[index] == CmiMyPe()) break;
+    }
+    int child = (index << 2) + 1;
+    int i;
+    for (i=0; i<4; ++i) {
+      if (child+i < -destPE) {
+        CmiSyncSend(pes[child+i], len, msg);
+      }
+    }
+  }
+  CcsHandleRequest(hdr, msg+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader));
+  CmiFree(msg);
 }
 
 /*Convert CCS header & message data into a converse message 
@@ -193,6 +255,8 @@ static void req_fw_handler(char *msg)
 char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_len)
 {
   int reqLen=ChMessageInt(hdr->len);
+  int destPE = ChMessageInt(hdr->pe);
+  if (destPE < -1) reqLen += destPE*sizeof(int);
   int len=CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader)+reqLen;
   char *msg=(char *)CmiAlloc(len);
   memcpy(msg+CmiMsgHeaderSizeBytes,hdr,sizeof(CcsImplHeader));
@@ -202,22 +266,19 @@ char *CcsImpl_ccs2converse(const CcsImplHeader *hdr,const void *data,int *ret_le
   return msg;
 }
 
-/*Forward this request to the appropriate PE*/
-void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
+/*Receives reply messages passed up from
+converse to node 0.*/
+static void rep_fw_handler(char *msg)
 {
-  int len,repPE=ChMessageInt(hdr->pe);
-  if (repPE<0 && repPE>=CmiNumPes()) {
-       repPE=0;
-       hdr->pe=ChMessageInt_new(repPE);
-  }
-
-  {
-    char *msg=CcsImpl_ccs2converse(hdr,reqData,&len);
-    CmiSyncSendAndFree(repPE,len,msg);
-  }
+  int len;
+  char *r=msg+CmiMsgHeaderSizeBytes;
+  CcsImplHeader *hdr=(CcsImplHeader *)r; 
+  r+=sizeof(CcsImplHeader);
+  len=ChMessageInt(hdr->len);
+  CcsImpl_reply(hdr,len,r);
+  CmiFree(msg);
 }
 
-
 #if NODE_0_IS_CONVHOST
 /************** NODE_0_IS_CONVHOST ***********
 Non net- versions of charm++ are run without a 
@@ -250,7 +311,7 @@ which sends the reply back to the original requestor,
 on the (still-open) request socket.
  */
 
-/*
+/**
 Send a Ccs reply back to the requestor, down the given socket.
 Since there is no conv-host, node 0 does all the CCS 
 communication-- this means all requests come to node 0
@@ -258,8 +319,6 @@ and are forwarded out; all replies are forwarded back to node 0.
 
 Note: on Net- versions, CcsImpl_reply is implemented in machine.c
 */
-static int rep_fw_handler_idx;
-
 void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
 {
   const int repPE=0;
@@ -270,7 +329,7 @@ void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
   } else {
     /*Forward data & socket # to the replyPE*/
     int len=CmiMsgHeaderSizeBytes+
-              sizeof(CcsImplHeader)+repLen;
+           sizeof(CcsImplHeader)+repLen;
     char *msg=CmiAlloc(len);
     char *r=msg+CmiMsgHeaderSizeBytes;
     *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
@@ -279,26 +338,41 @@ void CcsImpl_reply(CcsImplHeader *rep,int repLen,const void *repData)
     CmiSyncSendAndFree(repPE,len,msg);
   }
 }
-/*Receives reply messages passed up from
-converse to node 0.*/
-static void rep_fw_handler(char *msg)
-{
-  int len;
-  char *r=msg+CmiMsgHeaderSizeBytes;
-  CcsImplHeader *hdr=(CcsImplHeader *)r; 
-  r+=sizeof(CcsImplHeader);
-  len=ChMessageInt(hdr->len);
-  CcsImpl_reply(hdr,len,r);
-  CmiFree(msg);
-}
 
 /*No request will be sent through this socket.
 Closes it.
 */
-void CcsImpl_noReply(CcsImplHeader *hdr)
+/*void CcsImpl_noReply(CcsImplHeader *hdr)
 {
   int fd=ChMessageInt(hdr->replyFd);
   skt_close(fd);
+}*/
+
+/**
+ * This is the entrance point of a CCS request into the server.
+ * It is executed only on proc 0, and it forwards the request to the appropriate PE.
+ */
+void CcsImpl_netRequest(CcsImplHeader *hdr,const void *reqData)
+{
+  int len,repPE=ChMessageInt(hdr->pe);
+  if (repPE<=-CmiNumPes() || repPE>=CmiNumPes()) {
+    repPE=0;
+    hdr->pe=ChMessageInt_new(repPE);
+  }
+
+  {
+    char *msg=CcsImpl_ccs2converse(hdr,reqData,&len);
+    if (repPE >= 0) {
+      CmiSyncSendAndFree(repPE,len,msg);
+    } else if (repPE == -1) {
+      /* Broadcast to all processors */
+      CmiPushPE(0, msg);
+    } else {
+      /* Multicast to -repPE processors, specified right at the beginning of reqData (as a list of pes) */
+      int firstPE = *(int*)reqData;
+      CmiSyncSendAndFree(firstPE,len,msg);
+    }
+  }
 }
 
 /*
@@ -333,9 +407,7 @@ void CcsServerCheck(void)
 
 int _isCcsHandlerIdx(int hIdx) {
   if (hIdx==_ccsHandlerIdx) return 1;
-#if NODE_0_IS_CONVHOST 
   if (hIdx==rep_fw_handler_idx) return 1;
-#endif
   return 0;
 }
 
@@ -358,8 +430,8 @@ void CcsInit(char **argv)
 
   CcsBuiltinsInit(argv);
 
-#if NODE_0_IS_CONVHOST
   rep_fw_handler_idx = CmiRegisterHandler((CmiHandler)rep_fw_handler);
+#if NODE_0_IS_CONVHOST
 #if ! CMK_CMIPRINTF_IS_A_BUILTIN
   print_fw_handler_idx = CmiRegisterHandler((CmiHandler)print_fw_handler);
 #endif
index 660bcb861a784d38204760d9a6af718958a527f6..427caa996436d83c8312c4f4576c8ad194049f8d 100644 (file)
@@ -30,8 +30,7 @@ extern int _ccsHandlerIdx;
 #if CMK_CCS_AVAILABLE
 
 typedef struct CcsDelayedReply_struct {
-       CcsSecAttr     attr; /*Source information*/
-       ChMessageInt_t replyFd;/*Send reply back here*/
+  CcsImplHeader *hdr;
 } CcsDelayedReply;
 
 /**
@@ -48,6 +47,11 @@ void CcsRegisterHandler(const char *ccs_handlername, CmiHandler fn);
 typedef void (*CcsHandlerFn)(void *userPtr,int reqLen,const void *reqData);
 void CcsRegisterHandlerFn(const char *ccs_handlername, CcsHandlerFn fn, void *userPtr);
 
+/**
+ * Set the merging function for this CCS handler to newMerge.
+ */
+void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge);
+
 void CcsInit(char **argv);
 int CcsEnabled(void);
 int CcsIsRemoteRequest(void);
index fae14d19a2c7d8f218942fd1d9b920bba402711b..5e997a006d3d75ac03de56183136bfcd8498ecf7 100644 (file)
@@ -85,7 +85,7 @@ void CpdSearchLeaks(char * msg) {
 void * (*CpdDebugGetAllocationTree)(int *);
 void (*CpdDebug_pupAllocationPoint)(pup_er p, void *data);
 void (*CpdDebug_deleteAllocationPoint)(void *ptr);
-void * (*CpdDebug_MergeAllocationTree)(void *data, void **remoteData, int numRemote);
+void * (*CpdDebug_MergeAllocationTree)(int *size, void *data, void **remoteData, int numRemote);
 CpvDeclare(int, CpdDebugCallAllocationTree_Index);
 CpvStaticDeclare(CcsDelayedReply, allocationTreeDelayedReply);
 
@@ -131,7 +131,7 @@ static void CpdDebugCallAllocationTree(char *msg)
 void * (*CpdDebugGetMemStat)(void);
 void (*CpdDebug_pupMemStat)(pup_er p, void *data);
 void (*CpdDebug_deleteMemStat)(void *ptr);
-void * (*CpdDebug_mergeMemStat)(void *data, void **remoteData, int numRemote);
+void * (*CpdDebug_mergeMemStat)(int *size, void *data, void **remoteData, int numRemote);
 CpvDeclare(int, CpdDebugCallMemStat_Index);
 CpvStaticDeclare(CcsDelayedReply, memStatDelayedReply);
 
@@ -179,6 +179,28 @@ static void CpdDebugCallMemStat(char *msg) {
   CmiFree(msg);
 }
 
+static void * CpdDebugMerge(int *size,void *local,void **remote,int n) {
+  CcsImplHeader *hdr;
+  int total = *size;
+  int i;
+  for (i=0; i<n; ++i) {
+    hdr = (CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes);
+    total += ChMessageInt(hdr->len);
+  }
+  void *reply = CmiAlloc(total);
+  memcpy(reply, local, *size);
+  ((CcsImplHeader*)(((char*)reply)+CmiMsgHeaderSizeBytes))->len = ChMessageInt_new(total-CmiMsgHeaderSizeBytes-sizeof(CcsImplHeader));
+  CmiFree(local);
+  char *ptr = ((char*)reply)+*size;
+  for (i=0; i<n; ++i) {
+    int len = ChMessageInt(((CcsImplHeader*)(((char*)remote[i])+CmiMsgHeaderSizeBytes))->len);
+    memcpy(ptr, ((char*)remote[i])+CmiMsgHeaderSizeBytes+sizeof(CcsImplHeader), len);
+    ptr += len;
+  }
+  *size = total;
+  return reply;
+}
+
 static void CpdDebugHandler(char *msg)
 {
     char name[128];
@@ -191,18 +213,20 @@ static void CpdDebugHandler(char *msg)
       CpdUnFreeze();
     }
     else if (strncmp(name, "step", strlen("step")) == 0){
-      CmiPrintf("step received\n");
+      /*CmiPrintf("step received\n");*/
       CpvAccess(stepFlag) = 1;
       CpdUnFreeze();
     }
     else if (strncmp(name, "continue", strlen("continue")) == 0){
-      CmiPrintf("continue received\n");
+      /*CmiPrintf("continue received\n");*/
       CpvAccess(continueFlag) = 1;
       CpdUnFreeze();
     }
     else if (strncmp(name, "status", strlen("status")) == 0) {
-      char reply = CpdIsFrozen() ? 0 : 1;
-      CcsSendReply(1, &reply);
+      ChMessageInt_t reply[2];
+      reply[0] = ChMessageInt_new(CmiMyPe());
+      reply[1] = ChMessageInt_new(CpdIsFrozen() ? 0 : 1);
+      CcsSendReply(2*sizeof(ChMessageInt_t), reply);
     }
 #if 0
     else if (strncmp(name, "setBreakPoint", strlen("setBreakPoint")) == 0){
@@ -215,6 +239,7 @@ static void CpdDebugHandler(char *msg)
     else{
       CmiPrintf("bad debugger command:%s received,len=%ld\n",name,strlen(name));
     }
+    CmiFree(msg);
 }
 
 
@@ -224,7 +249,7 @@ static void CpdDebugHandler(char *msg)
  */
 void CpdFreeze(void)
 {
-  CmiPrintf("CPD: Frozen processor %d\n",CmiMyPe());
+  CpdNotify(CPD_FREEZE,getpid());
   if (CpvAccess(freezeModeFlag)) return; /*Already frozen*/
   CpvAccess(freezeModeFlag) = 1;
   CpdFreezeModeScheduler();
@@ -315,6 +340,7 @@ void CpdInit(void)
   CpvAccess(debugQueue) = CdsFifo_Create();
 
   CcsRegisterHandler("ccs_debug", (CmiHandler)CpdDebugHandler);
+  CcsSetMergeFn("ccs_debug", CpdDebugMerge);
 
   CcsRegisterHandler("ccs_debug_allocationTree", (CmiHandler)CpdDebugCallAllocationTree);
   CpvInitialize(int, CpdDebugCallAllocationTree_Index);
@@ -343,19 +369,22 @@ void CpdInit(void)
 
 }
 
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+void CpdNotify(int type, ...) {
+  va_list list;
+  va_start(list, type);
+  switch (type) {
+  case CPD_ABORT:
+    CmiPrintf("CPD: %d Abort %s\n",CmiMyPe(), va_arg(list, char*));
+    break;
+  case CPD_SIGNAL:
+    CmiPrintf("CPD: %d Signal %d\n",CmiMyPe(), va_arg(list, int));
+    break;
+  case CPD_FREEZE:
+    CmiPrintf("CPD: %d Freeze %d\n",CmiMyPe(),getpid());
+    break;
+  case CPD_BREAKPOINT:
+    CmiPrintf("CPD: %d BP %s\n",CmiMyPe(), va_arg(list, char*));
+    break;
+  }
+  va_end(list);
+}
index 5f743eb9b3105522b25890b6661c6d6c27d9d6c8..6075046e3fbd0d5bef3149f7879e44ba1a34c9b3 100644 (file)
@@ -14,12 +14,12 @@ extern "C" {
 extern void * (*CpdDebugGetAllocationTree)(int*);
 extern void (*CpdDebug_pupAllocationPoint)(pup_er p, void *data);
 extern void (*CpdDebug_deleteAllocationPoint)(void *ptr);
-extern void * (*CpdDebug_MergeAllocationTree)(void *data, void **remoteData, int numRemote);
+extern void * (*CpdDebug_MergeAllocationTree)(int *size, void *data, void **remoteData, int numRemote);
 
 extern void * (*CpdDebugGetMemStat)(void);
 extern void (*CpdDebug_pupMemStat)(pup_er p, void *data);
 extern void (*CpdDebug_deleteMemStat)(void *ptr);
-extern void * (*CpdDebug_mergeMemStat)(void *data, void **remoteData, int numRemote);
+extern void * (*CpdDebug_mergeMemStat)(int *size, void *data, void **remoteData, int numRemote);
 
 CpvExtern(int, cmiArgDebugFlag);
 extern char ** memoryBackup;
@@ -36,6 +36,15 @@ void Cpd_CmiHandleMessage(void *msg);
 
 extern int (*CpdIsDebugMessage)(void*);
 
+enum {
+  CPD_ERROR = 0,
+  CPD_SIGNAL = 1,
+  CPD_ABORT = 2,
+  CPD_FREEZE = 3,
+  CPD_BREAKPOINT = 4
+};
+extern void CpdNotify(int type, ...);
+
 typedef struct LeakSearchInfo {
   char *begin_data, *end_data;
   char *begin_bss, *end_bss;