Updated for new CCS protocol.
authorOrion Lawlor <olawlor@acm.org>
Thu, 21 Sep 2000 23:23:58 +0000 (23:23 +0000)
committerOrion Lawlor <olawlor@acm.org>
Thu, 21 Sep 2000 23:23:58 +0000 (23:23 +0000)
src/conv-ccs/ccs-client.c
src/conv-ccs/ccs-client.h

index 1bf9c0d427ae45076565a14eb9bec1c74f4c462e..2d03f58158a1453e0ec06c3fd5e616a52e9ca1bf 100644 (file)
-#include <sys/types.h>
-#include <sys/socket.h>
-#include "ccs.h"
-#include <stdio.h>
-#include <sys/time.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <errno.h>
-
-
-static void zap_newline(char *s)
-{
-  char *p;
-  p = s + strlen(s)-1;
-  if (*p == '\n') *p = '\0';
-}
-
+/*****************************************************************************
+ * $Source$
+ * $Author$
+ * $Date$
+ * $Revision$
+ *****************************************************************************/
 /*
- * return IP address for hostname. If hostname=0, return self IP
- */
-static unsigned int skt_ip(char *hostname)
-{
-  unsigned int ip;
-  struct hostent *hostent;
-  if(strcmp(hostname, "") == 0)
-    hostent = gethostent();
-  else {
-    hostent = gethostbyname(hostname);
-  }
-  if (hostent == 0) return 0x7f000001;
-  ip = htonl(*((int *)(hostent->h_addr_list[0])));
-
-  /*Debugging*/
-  printf("hostname = %s, IP address = %u\n", hostname, ip);
-
-  return ip;
-}
+Converse Client-Server:
+  Lets you execute, from an arbitrary program on the network, 
+pre-registered "handlers" in a running converse program.  
+Also allows you to recv replies from the handlers.  
+All requests and replies consist of user-defined binary data.
+
+  This file provides the client interface.
+
+CCS Protocol spec:
+
+ A CCS request message asks a running Converse program to
+execute a pre-registered "handler" routine.  You send the
+request directly to conv-host's CCS server port.
+The request, with header, has the following format on the
+network: 
+Ccs Message----------------------------------------------
+ /--CcsMessageHeader---------------------------       ^
+ | 4 bytes  |   Message data length d         ^       |
+ | 4 bytes  |   Dest. processor number        |       |
+ |          |   (big-endian binary integers)  |   40+d bytes
+ +-----------------------------------      40 bytes   |
+ |32 bytes  |   CCS Handler name              |       |
+ |          |   (ASCII, Null-terminated)      v       |
+ \---------------------------------------------       |
+    d bytes |   User data (passed to handler)         v
+-------------------------------------------------------
+
+ A CCS reply message (if any) comes back on the request socket,
+and has only a length header:
+CCS Reply ----------------------------------
+ | 4 bytes  |   Message data length d        
+ |          |   (big-endian binary integer)  
+ +----------------------------------------- 
+ | d bytes  |   User data                   
+--------------------------------------------
 
-static void jsleep(int sec, int usec)
-{
-  int ntimes,i;
-  struct timeval tm;
-
-  ntimes = sec*200 + usec/5000;
-  for(i=0;i<ntimes;i++) {
-    tm.tv_sec = 0;
-    tm.tv_usec = 5000;
-    while(1) {
-      if (select(0,NULL,NULL,NULL,&tm)==0) break;
-      if ((errno!=EBADF)&&(errno!=EINTR)) return;
-    }
-  }
-}
-
-/*
- * Create a socket connected to <ip> at port <port>
- */
-static int skt_connect(ip, port, seconds)
-unsigned int ip; int port; int seconds;
-{
-  struct sockaddr_in remote; short sport=port;
-  int fd, ok, len, retry, begin;
-    
-  /* create an address structure for the server */
-  memset(&remote, 0, sizeof(remote));
-  remote.sin_family = AF_INET;
-  remote.sin_port = htons(sport);
-  remote.sin_addr.s_addr = htonl(ip);
-    
-  begin = time(0); ok= -1;
-  while (time(0)-begin < seconds) {
-  sock:
-    fd = socket(AF_INET, SOCK_STREAM, 0);
-    if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto sock;
-    if (fd < 0) { perror("socket 3"); exit(1); }
-    
-  conn:
-    ok = connect(fd, (struct sockaddr *)&(remote), sizeof(remote));
-    if (ok>=0) break;
-    close(fd);
-    switch (errno) {
-    case EINTR: case EBADF: case EALREADY: break;
-    case ECONNREFUSED: jsleep(1,0); break;
-    case EADDRINUSE: jsleep(1,0); break;
-    case EADDRNOTAVAIL: jsleep(5,0); break;
-    default: return -1;
-    }
-  }
-  if (ok<0) return -1;
-  return fd;
-}
-
-/*
- * Create a server socket
  */
-static void skt_server(CcsServer *svr)
-{
-  int fd= -1;
-  int ok, len;
-  struct sockaddr_in addr;
-  char hostname[100];
-  fd = socket(PF_INET, SOCK_STREAM, 0);
-  if (fd < 0) { perror("socket"); exit(1); }
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = AF_INET;
-  ok = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
-  if (ok < 0) { perror("bind"); exit(1); }
-  ok = listen(fd,5);
-  if (ok < 0) { perror("listen"); exit(1); }
-  len = sizeof(addr);
-  ok = getsockname(fd, (struct sockaddr *)&addr, &len);
-  if (ok < 0) { perror("getsockname"); exit(1); }
+#include "ccs.h"
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
 
-  if (gethostname(hostname, 99) < 0) strcpy(hostname, "");
-  
-  /*Debugging*/
-  printf("hostname = %s, %d\n", hostname, strlen(hostname));
+/*Include the socket and message interface routines
+  here *whole*, which keeps client linking simple.*/
+#include "sockRoutines.c"
 
-  svr->myFd = fd;
-  svr->myIP = skt_ip(hostname);
-  svr->myPort = ntohs(addr.sin_port);
-}
+#define DEBUGF(x) printf x
 
-static int skt_accept(CcsServer *svr)
+/*Parse list of nodes given to us by conv-host.
+*/
+static void parseInfo(CcsServer *svr,const char *data)
 {
-  int i, fd, ok;
-  struct sockaddr_in remote;
-  i = sizeof(remote);
- acc:
-  fd = accept(svr->myFd, (struct sockaddr *)&remote, &i);
-  if ((fd<0)&&(errno==EINTR)) goto acc;
-  if ((fd<0)&&(errno==EMFILE)) goto acc;
-  if ((fd<0)&&(errno==EPROTO)) goto acc;
-  if (fd<0) { perror("accept"); exit(1); }
-  return fd;
-}
+  /*Data conv-host sends us is just a big list of integers*/
+  const ChMessageInt_t *d=(const ChMessageInt_t *)data;
+  int i,index=0; /*Current offset in above array*/
 
-static char *skipstuff(char *line)
-{
-  while (*line != ' ') line++;
-  return line;
-}
-
-static char *skipblanks(char *line)
-{
-  while (*line == ' ') line++;
-  return line;
-}
-
-static void parseInfo(CcsServer *svr, char *line)
-{
-  char ans[32];
-  int num, i;
-  line = skipblanks(line);
-  sscanf(line, "%s", ans);
-  line = skipstuff(line); line = skipblanks(line);
-  sscanf(line, "%d", &(svr->numNodes));
-  line = skipstuff(line); line = skipblanks(line);
+  svr->numNodes=ChMessageInt(d[index++]);
   svr->numProcs = (int *) malloc(svr->numNodes * sizeof(int));
-  svr->nodeIPs = (int *) malloc(svr->numNodes * sizeof(int));
-  svr->nodePorts = (int *) malloc(svr->numNodes * sizeof(int));
   svr->numPes = 0;
-  for(i=0;i<svr->numNodes;i++) {
-    sscanf(line, "%d", &(svr->numProcs[i]));
-    line = skipstuff(line); line= skipblanks(line);
-    svr->numPes += svr->numProcs[i];
-  }
-  for(i=0;i<svr->numNodes;i++) {
-    sscanf(line, "%d", &(svr->nodeIPs[i]));
-    line = skipstuff(line); line= skipblanks(line);
-  }
-  for(i=0;i<svr->numNodes;i++) {
-    sscanf(line, "%d", &(svr->nodePorts[i]));
-    line = skipstuff(line); line= skipblanks(line);
-  }
+  for(i=0;i<svr->numNodes;i++)
+    svr->numPes+=svr->numProcs[i]=ChMessageInt(d[index++]);
 }
 
 static void printSvr(CcsServer *svr)
 {
   int i;
-  printf("hostIP: %d\n", svr->hostIP);
-  printf("hostPort: %d\n", svr->hostPort);
-  printf("myIP: %d\n", svr->myIP);
-  printf("myPort: %d\n", svr->myPort);
-  printf("myFd: %d\n", svr->myFd);
-  printf("numNodes: %d\n", svr->numNodes);
-  printf("numPes: %d\n", svr->numPes);
+  DEBUGF(("hostIP: %d\n", svr->hostIP));
+  DEBUGF(("hostPort: %d\n", svr->hostPort));
+  DEBUGF(("replyFd: %d\n", svr->replyFd));
+  DEBUGF(("numNodes: %d\n", svr->numNodes));
+  DEBUGF(("numPes: %d\n", svr->numPes));
   for(i=0;i<svr->numNodes;i++) {
-    printf("Node[%d] has %d processors at IP=%d, port=%d\n",
-            i, svr->numProcs[i], svr->nodeIPs[i], svr->nodePorts[i]);
+    DEBUGF(("Node[%d] has %d processors\n",i, svr->numProcs[i]));
   }
 }
 
 /**
  * Converse Client-Server Module: Client Side
  */
-int CcsConnect(CcsServer *svr, char *host, int port)
+void CcsConnect(CcsServer *svr, char *host, int port)
+{
+  skt_init();
+  CcsConnectIp(svr,skt_lookup_ip(host),port);
+}
+void CcsConnectIp(CcsServer *svr, int ip, int port)
 {
-  int fd;
-  char ans[32];
-  char line[1024];
-  FILE *f;
-  strcpy(svr->hostAddr, host);
+  unsigned int msg_len;char *msg_data;/*Reply message*/
+  skt_init();
+  svr->hostIP = ip;
   svr->hostPort = port;
-  svr->hostIP = skt_ip(host);
-  skt_server(svr);
-  fd = skt_connect(svr->hostIP, svr->hostPort, 120);
-  if(fd == (-1)) {
-    fprintf(stderr, "Cannot connect to server\n");
-    exit(1);
-  }
-  write(fd, "getinfo ", strlen("getinfo "));
-  sprintf(ans, "%d %d\n", svr->myIP, svr->myPort);
-  write(fd, ans, strlen(ans));
-  close(fd);
-  fd = skt_accept(svr);
-  f = fdopen(fd, "r+");
-  line[0] = 0;
-  fgets(line, 1023, f);
-  fclose(f);
-  close(fd);
-  zap_newline(line);
-  parseInfo(svr, line);
+  svr->replyFd=INVALID_SOCKET;
+
+  /*Request the parallel machine's node info*/
+  CcsSendRequest(svr,"ccs_getinfo",0,0,NULL);
+  
+  /*Wait for conv-host to get back to us*/
+  DEBUGF(("Waiting for conv-host to call us back...\n"));
+  CcsRecvResponseMsg(svr,&msg_len,&msg_data,60);
+  parseInfo(svr,msg_data);
+  free(msg_data);
+  
+  /**/ printSvr(svr);/**/
 }
 
 int CcsNumNodes(CcsServer *svr)
@@ -248,55 +133,66 @@ int CcsNodeSize(CcsServer *svr,int node)
   return svr->numProcs[node];
 }
 
-int CcsSendRequest(CcsServer *svr, char *hdlrID, int pe, uint size, void *msg)
+void CcsSendRequest(CcsServer *svr, char *hdlrID, int pe, unsigned int size, const char *msg)
 {
-  int startpe=0, endpe=0, i;
-  int fd;
-  char line[1024];
-  for(i=0;i<svr->numNodes;i++) {
-    endpe += svr->numProcs[i];
-    if(pe >= startpe && pe < endpe)
-      break;
-    startpe = endpe;
-  }
-  pe -= startpe;
-  fd = skt_connect(svr->nodeIPs[i], svr->nodePorts[i], 120);
-  sprintf(line, "req %d %d %d %d %s\n", pe, size, svr->myIP, svr->myPort, 
-                                        hdlrID);
-  write(fd, line, strlen(line));
-  write(fd, msg, size);
-  close(fd);
+  CcsMessageHeader hdr;/*CCS request header*/
+
+  /*Close the old connection (if any)*/
+  if (svr->replyFd!=-1) {skt_close(svr->replyFd);svr->replyFd=-1;}
+
+  /*Connect to conv-host, and send the message */
+  svr->replyFd=skt_connect(svr->hostIP, svr->hostPort,120);
+
+  hdr.len=ChMessageInt_new(size);
+  hdr.pe=ChMessageInt_new(pe);
+  strncpy(hdr.handler,hdlrID,CCS_HANDLERLEN);
+  skt_sendN(svr->replyFd, (char *)&hdr, sizeof(hdr));
+  skt_sendN(svr->replyFd, msg, size);
+  /*Leave socket open for reply*/
 }
 
-int CcsRecvResponse(CcsServer *svr, uint maxsize, void *recvBuffer)
+/*Receive data back from the server. (Arbitrary length response)
+*/
+int CcsRecvResponseMsg(CcsServer *svr, unsigned int *size,char **newBuf, int timeout)
 {
-  char line[1024], ans[16];
-  int size, fd;
-  FILE *f;
-  fd = skt_accept(svr);
-  f = fdopen(fd, "r+");
-  line[0] = 0;
-  fgets(line, 1023, f);
-  zap_newline(line);
-  sscanf(line, "%s%d", ans, &size);
-  fread(recvBuffer, 1, size, f);
-  fclose(f); close(fd);
+  ChMessageInt_t netLen;
+  unsigned int len;  
+  SOCKET fd=svr->replyFd;
+  skt_recvN(fd,(char *)&netLen,sizeof(netLen));
+  *size=len=ChMessageInt(netLen);
+  *newBuf=(char *)malloc(len);
+  skt_recvN(fd,(char *)*newBuf,len);
+  return len;
 }
 
-int CcsProbe(CcsServer *svr)
+/*Receive data from the server. (In-place receive)
+*/
+int CcsRecvResponse(CcsServer *svr,  unsigned int maxsize, char *recvBuffer,int timeout)
 {
-  fprintf(stderr, "CcsProbe not implemented.\n");
-  exit(1);
+  ChMessageInt_t netLen;
+  unsigned int len;
+  SOCKET fd=svr->replyFd;
+  skt_recvN(fd,(char *)&netLen,sizeof(netLen));
+  len=ChMessageInt(netLen);
+  if (len>maxsize) 
+    {skt_close(fd);return -1;/*Buffer too small*/}
+  skt_recvN(fd,(char *)recvBuffer,len);
+  return len;
 }
 
-int CcsResponseHandler(CcsServer *svr, CcsHandlerFn fn)
+int CcsProbe(CcsServer *svr)
 {
-  svr->callback = fn;
-  fprintf(stderr, "CcsResponseHandler not implemented.\n");
+  fprintf(stderr, "CcsProbe not implemented.\n");
   exit(1);
+  return 1;
 }
 
-int CcsFinalize(CcsServer *svr)
+void CcsFinalize(CcsServer *svr)
 {
-  close(svr->myFd);
+  if (svr->replyFd!=-1) skt_close(svr->replyFd);
 }
+
+
+
+
+
index 7c492b3d5cc6a790470d7b5b08ac6c370a166f1c..768529c5af36fbc8ebad7f6d6f54afdcdfdb33a3 100644 (file)
@@ -1,3 +1,10 @@
+/*****************************************************************************
+ * $Source$
+ * $Author$
+ * $Date$
+ * $Revision$
+ *****************************************************************************/
+
 /**
  * Converse Client-Server Module: Client Side
  */
 #ifndef _CCS_H_
 #define _CCS_H_
 
-typedef int (*CcsHandlerFn)(int, void *);
+#include "sockRoutines.h"
 
 typedef struct CcsServer {
+  /*Conv-host:*/
   char hostAddr[128];
   unsigned int hostIP;
   unsigned int hostPort;
-  unsigned int myIP;
-  unsigned int myPort;
-  int myFd;
+  /*Parallel machine:*/
   int numNodes;
   int numPes;
-  int *numProcs;
-  int *nodeIPs;
-  int *nodePorts;
-  CcsHandlerFn callback;
+  int *numProcs; /*# of processors for each node*/
+  /*Current State:*/
+  SOCKET replyFd;/*Socket for replies*/
 } CcsServer;
 
-int CcsConnect(CcsServer *svr, char *host, int port);
+void CcsConnect(CcsServer *svr, char *host, int port);
+void CcsConnectIp(CcsServer *svr, int ip, int port);
 int CcsNumNodes(CcsServer *svr);
 int CcsNumPes(CcsServer *svr);
 int CcsNodeFirst(CcsServer *svr, int node);
 int CcsNodeSize(CcsServer *svr,int node);
-int CcsSendRequest(CcsServer *svr, char *hdlrID, int pe, unsigned int size, void *msg);
-int CcsRecvResponse(CcsServer *svr, unsigned int maxsize, void *recvBuffer);
+void CcsSendRequest(CcsServer *svr, char *hdlrID, int pe, 
+                   unsigned int size, const char *msg);
+int CcsRecvResponse(CcsServer *svr, 
+                   unsigned int maxsize, char *recvBuffer, int timeout);
+int CcsRecvResponseMsg(CcsServer *svr, 
+                   unsigned int *retSize,char **newBuf, int timeout);
 int CcsProbe(CcsServer *svr);
-int CcsResponseHandler(CcsServer *svr, CcsHandlerFn fn);
-int CcsFinalize(CcsServer *svr);
+void CcsFinalize(CcsServer *svr);
 
 #endif