1e95413b13660e49211d3358ad39ac8983547874
[charm.git] / src / util / sockRoutines.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include "sockRoutines.h"
9
10 #ifndef CMK_NO_SOCKETS /*<- for ASCI Red*/
11
12 #include <stdio.h>
13 #include <errno.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <signal.h>
17 #include <time.h>
18 #include <ctype.h>
19
20 #if CMK_BPROC
21 #include <sys/bproc.h>
22 #endif
23
24 #if CMK_USE_CONVERSE
25 #  include "converse.h" /* use real CmiTmpAlloc/Free */
26 #else /* fake CmiTmpAlloc/Free via malloc */
27 #  define CMI_TMP_SKIP
28 #  define CmiTmpAlloc(size) malloc(size)
29 #  define CmiTmpFree(ptr) free(ptr)
30 #endif
31
32 #if !CMK_HAS_SOCKLEN
33 typedef int socklen_t;
34 #endif
35
36 /*Just print out error message and exit*/
37 static int default_skt_abort(int code,const char *msg)
38 {
39   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
40   exit(1);
41   return -1;
42 }
43
44 static skt_idleFn idleFunc=NULL;
45 static skt_abortFn skt_abort=default_skt_abort;
46 void skt_set_idle(skt_idleFn f) {idleFunc=f;}
47 skt_abortFn skt_set_abort(skt_abortFn f) 
48 {
49         skt_abortFn old=skt_abort;
50         skt_abort=f;
51         return old;
52 }
53
54 /* These little flags are used to ignore the SIGPIPE signal
55  * while we're inside one of our socket calls.
56  * This lets us only handle SIGPIPEs we generated. */
57 static int skt_ignore_SIGPIPE=0;
58
59 #if defined(_WIN32) && !defined(__CYGWIN__) /*Windows systems:*/
60 static void doCleanup(void)
61 { WSACleanup();}
62 static int skt_inited=0;
63 /*Initialization routine (Windows only)*/
64 void skt_init(void)
65 {
66   WSADATA WSAData;
67   WORD version=0x0002;
68   if (skt_inited) return;
69   skt_inited=1;
70   WSAStartup(version, &WSAData);
71   atexit(doCleanup);
72 }
73
74 void skt_close(SOCKET fd)
75 {
76         closesocket(fd);
77 }
78 #else /*UNIX Systems:*/
79
80 typedef void (*skt_signal_handler_fn)(int sig);
81 static skt_signal_handler_fn skt_fallback_SIGPIPE=NULL;
82 static void skt_SIGPIPE_handler(int sig) {
83         if (skt_ignore_SIGPIPE) {
84                 fprintf(stderr,"Caught SIGPIPE.\n");
85                 signal(SIGPIPE,skt_SIGPIPE_handler);
86         }
87         else
88                 skt_fallback_SIGPIPE(sig);
89 }
90
91 void skt_init(void)
92 {
93         /* Install a SIGPIPE signal handler.
94           This prevents us from dying when one of our network
95           connections goes down
96         */
97         skt_fallback_SIGPIPE=signal(SIGPIPE,skt_SIGPIPE_handler);
98 }
99 void skt_close(SOCKET fd)
100 {
101         skt_ignore_SIGPIPE=1;
102         close(fd);
103         skt_ignore_SIGPIPE=0;
104 }
105 #endif
106
107 #ifndef SKT_HAS_BUFFER_BEGIN
108 void skt_buffer_begin(SOCKET sk) {}
109 void skt_buffer_end(SOCKET sk) {}
110 #endif
111
112
113 /*Called when a socket or select routine returns
114 an error-- determines how to respond.
115 Return 1 if the last call was interrupted
116 by, e.g., an alarm and should be retried.
117 */
118 static int skt_should_retry(void)
119 {
120         int isinterrupt=0,istransient=0;
121 #if defined(_WIN32) && !defined(__CYGWIN__) /*Windows systems-- check Windows Sockets Error*/
122         int err=WSAGetLastError();
123         if (err==WSAEINTR) isinterrupt=1;
124         if (err==WSATRY_AGAIN||err==WSAECONNREFUSED)
125                 istransient=1;
126 #else /*UNIX systems-- check errno*/
127         int err=errno;
128         if (err==EINTR) isinterrupt=1;
129         if (err==EAGAIN||err==ECONNREFUSED||err==EWOULDBLOCK||err==ECONNRESET||err==ENOBUFS)
130                 istransient=1;
131 #endif
132         if (isinterrupt) {
133                 /*We were interrupted by an alarm.  Schedule, then retry.*/
134                 if (idleFunc!=NULL) idleFunc();
135         }
136         else if (istransient)
137         { /*A transient error-- idle a while, then try again later.*/
138                 if (idleFunc!=NULL) idleFunc();
139                 else sleep(1);
140         }
141         else 
142                 return 0; /*Some unrecognized problem-- abort!*/
143         return 1;/*Otherwise, we recognized it*/
144 }
145
146 /*Sleep on given read socket until msec or readable*/
147 int skt_select1(SOCKET fd,int msec)
148 {
149   int sec=msec/1000;
150   fd_set  rfds;
151   struct timeval tmo;
152   int  secLeft=sec;
153   int            begin, nreadable;
154   
155   FD_ZERO(&rfds);
156   FD_SET(fd, &rfds);
157
158   if (msec>0) begin = time(0);
159   do
160   {
161     tmo.tv_sec=secLeft;
162     tmo.tv_usec = (msec-1000*sec)*1000;
163     skt_ignore_SIGPIPE=1;
164     nreadable = select(1+fd, &rfds, NULL, NULL, &tmo);
165     skt_ignore_SIGPIPE=0;
166     
167     if (nreadable < 0) {
168                 if (skt_should_retry()) continue;
169                 else skt_abort(93200,"Fatal error in select");
170         }
171     if (nreadable >0) return 1; /*We gotta good socket*/
172   }
173   while(msec>0 && ((secLeft = sec - (time(0) - begin))>0));
174
175   return 0;/*Timed out*/
176 }
177
178
179 /******* DNS *********/
180 skt_ip_t _skt_invalid_ip={{0}};
181
182 skt_ip_t skt_my_ip(void)
183 {
184   char hostname[1000];
185   
186   if (gethostname(hostname, 999)==0)
187       return skt_lookup_ip(hostname);
188
189   return _skt_invalid_ip;
190 }
191
192 static int skt_parse_dotted(const char *str,skt_ip_t *ret)
193 {
194   int i,v;
195   *ret=_skt_invalid_ip;
196   for (i=0;i<sizeof(skt_ip_t);i++) {
197     if (1!=sscanf(str,"%d",&v)) return 0;
198     if (v<0 || v>255) return 0;
199     while (isdigit(*str)) str++; /* Advance over number */
200     if (i!=sizeof(skt_ip_t)-1) { /*Not last time:*/
201       if (*str!='.') return 0; /*Check for dot*/
202     } else { /*Last time:*/
203       if (*str!=0) return 0; /*Check for end-of-string*/
204     }
205     str++;
206     ret->data[i]=(unsigned char)v;
207   }
208   return 1;
209 }
210
211 skt_ip_t skt_lookup_ip(const char *name)
212 {
213   skt_ip_t ret=_skt_invalid_ip;
214   /*First try to parse the name as dotted decimal*/
215   if (skt_parse_dotted(name,&ret))
216     return ret;
217   else {/*Try a DNS lookup*/
218     struct hostent *h = gethostbyname(name);
219     if (h==0) return _skt_invalid_ip;
220     memcpy(&ret,h->h_addr_list[0],h->h_length);
221     return ret;
222   }
223 }
224
225 /* these 2 functions will return the inner node IP, special for
226    Linux Scyld.  G. Zheng 
227 */
228 skt_ip_t skt_innode_my_ip(void)
229 {  
230 #if CMK_BPROC
231   /* on Scyld, the hostname is just the node number */
232   char hostname[200];
233   sprintf(hostname, "%d", bproc_currnode());
234   return skt_innode_lookup_ip(hostname);
235 #else
236   return skt_my_ip();
237 #endif
238 }
239
240 skt_ip_t skt_innode_lookup_ip(const char *name)
241 {
242 #if CMK_BPROC
243   struct sockaddr_in addr;
244   int len = sizeof(struct sockaddr_in);
245   if (-1 == bproc_nodeaddr(atoi(name), &addr, &len)) {
246     return _skt_invalid_ip;
247   }
248   else {
249     skt_ip_t ret;
250     memcpy(&ret,&addr.sin_addr.s_addr,sizeof(ret));
251     return ret;
252   }
253 #else
254   return skt_lookup_ip(name);
255 #endif
256 }
257
258 /*Write as dotted decimal*/
259 char *skt_print_ip(char *dest,skt_ip_t addr)
260 {
261   char *o=dest;
262   int i;
263   for (i=0;i<sizeof(addr);i++) {
264     const char *trail=".";
265     if (i==sizeof(addr)-1) trail=""; /*No trailing separator dot*/
266     sprintf(o,"%d%s",(int)addr.data[i],trail);
267     o+=strlen(o);
268   }
269   return dest;
270 }
271 int skt_ip_match(skt_ip_t a,skt_ip_t b)
272 {
273   return 0==memcmp(&a,&b,sizeof(a));
274 }
275 struct sockaddr_in skt_build_addr(skt_ip_t IP,int port)
276 {
277   struct sockaddr_in ret={0};
278   ret.sin_family=AF_INET;
279   ret.sin_port = htons((short)port);
280   memcpy(&ret.sin_addr,&IP,sizeof(IP));
281   return ret;  
282 }
283
284 SOCKET skt_datagram(unsigned int *port, int bufsize)
285 {  
286   int connPort=(port==NULL)?0:*port;
287   struct sockaddr_in addr=skt_build_addr(_skt_invalid_ip,connPort);
288   socklen_t          len;
289   SOCKET             ret;
290   
291 retry:
292   ret = socket(AF_INET,SOCK_DGRAM,0);
293   if (ret == SOCKET_ERROR) {
294     if (skt_should_retry()) goto retry;  
295     return skt_abort(93490,"Error creating datagram socket.");
296   }
297   if (bind(ret, (struct sockaddr *)&addr, sizeof(addr)) == SOCKET_ERROR)
298           return skt_abort(93491,"Error binding datagram socket.");
299   
300   len = sizeof(addr);
301   if (getsockname(ret, (struct sockaddr *)&addr , &len))
302           return skt_abort(93492,"Error getting address on datagram socket.");
303
304   if (bufsize) 
305   {
306     len = sizeof(int);
307     if (setsockopt(ret, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len) == SOCKET_ERROR) 
308                 return skt_abort(93495,"Error on RCVBUF sockopt for datagram socket.");
309     if (setsockopt(ret, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len) == SOCKET_ERROR) 
310                 return skt_abort(93496,"Error on SNDBUF sockopt for datagram socket.");
311   }
312   
313   if (port!=NULL) *port = (int)ntohs(addr.sin_port);
314   return ret;
315 }
316 SOCKET skt_server(unsigned int *port)
317 {
318   return skt_server_ip(port,NULL);
319 }
320
321 SOCKET skt_server_ip(unsigned int *port,skt_ip_t *ip)
322 {
323   SOCKET             ret;
324   socklen_t          len;
325   int on = 1; /* for setsockopt */
326   int connPort=(port==NULL)?0:*port;
327   struct sockaddr_in addr=skt_build_addr((ip==NULL)?_skt_invalid_ip:*ip,connPort);
328   
329 retry:
330   ret = socket(PF_INET, SOCK_STREAM, 0);
331   
332   if (ret == SOCKET_ERROR) {
333     if (skt_should_retry()) goto retry;
334     else return skt_abort(93483,"Error creating server socket.");
335   }
336   setsockopt(ret, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on));
337   
338   if (bind(ret, (struct sockaddr *)&addr, sizeof(addr)) == SOCKET_ERROR) 
339           return skt_abort(93484,"Error binding server socket.");
340   if (listen(ret,5) == SOCKET_ERROR) 
341           return skt_abort(93485,"Error listening on server socket.");
342   len = sizeof(addr);
343   if (getsockname(ret, (struct sockaddr *)&addr, &len) == SOCKET_ERROR) 
344           return skt_abort(93486,"Error getting name on server socket.");
345
346   if (port!=NULL) *port = (int)ntohs(addr.sin_port);
347   if (ip!=NULL) memcpy(ip, &addr.sin_addr, sizeof(*ip));
348   return ret;
349 }
350
351 SOCKET skt_accept(SOCKET src_fd,skt_ip_t *pip, unsigned int *port)
352 {
353   socklen_t len;
354   struct sockaddr_in addr={0};
355   SOCKET ret;
356   len = sizeof(addr);
357 retry:
358   ret = accept(src_fd, (struct sockaddr *)&addr, &len);
359   if (ret == SOCKET_ERROR) {
360     if (skt_should_retry()) goto retry;
361     else return skt_abort(93523,"Error in accept.");
362   }
363   
364   if (port!=NULL) *port=ntohs(addr.sin_port);
365   if (pip!=NULL) memcpy(pip,&addr.sin_addr,sizeof(*pip));
366   return ret;
367 }
368
369
370 SOCKET skt_connect(skt_ip_t ip, int port, int timeout)
371 {
372   struct sockaddr_in addr=skt_build_addr(ip,port);
373   int                ok, begin;
374   SOCKET             ret;
375   
376   begin = time(0);
377   while (time(0)-begin < timeout) 
378   {
379     ret = socket(AF_INET, SOCK_STREAM, 0);
380     if (ret==SOCKET_ERROR) 
381     {
382           if (skt_should_retry()) continue;  
383       else return skt_abort(93512,"Error creating socket");
384     }
385     ok = connect(ret, (struct sockaddr *)&(addr), sizeof(addr));
386     if (ok != SOCKET_ERROR) 
387           return ret;/*Good connect*/
388         else { /*Bad connect*/
389           skt_close(ret);
390           if (skt_should_retry()) continue;
391           else return skt_abort(93515,"Error connecting to socket\n");
392     }
393   }
394   /*Timeout*/
395   if (timeout==60)
396      return skt_abort(93517,"Timeout in socket connect\n");
397   return INVALID_SOCKET;
398 }
399
400 void skt_setSockBuf(SOCKET skt, int bufsize)
401 {
402   int len = sizeof(int);
403   if (setsockopt(skt, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len) == SOCKET_ERROR)
404         skt_abort(93496,"Error on SNDBUF sockopt for datagram socket.");
405   if (setsockopt(skt, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len) == SOCKET_ERROR)
406         skt_abort(93496,"Error on RCVBUF sockopt for datagram socket.");
407 }
408
409 int skt_recvN(SOCKET hSocket,void *buff,int nBytes)
410 {
411   int nLeft,nRead;
412   char *pBuff=(char *)buff;
413
414   nLeft = nBytes;
415   while (0 < nLeft)
416   {
417     if (0==skt_select1(hSocket,60*1000))
418         return skt_abort(93610,"Timeout on socket recv!");
419     skt_ignore_SIGPIPE=1;
420     nRead = recv(hSocket,pBuff,nLeft,0);
421     skt_ignore_SIGPIPE=0;
422     if (nRead<=0)
423     {
424        if (nRead==0) return skt_abort(93620,"Socket closed before recv.");
425        if (skt_should_retry()) continue;/*Try again*/
426        else return skt_abort(93650+hSocket,"Error on socket recv!");
427     }
428     else
429     {
430       nLeft -= nRead;
431       pBuff += nRead;
432     }
433   }
434   return 0;
435 }
436
437 int skt_sendN(SOCKET hSocket,const void *buff,int nBytes)
438 {
439   int nLeft,nWritten;
440   const char *pBuff=(const char *)buff;
441   
442   nLeft = nBytes;
443   while (0 < nLeft)
444   {
445     skt_ignore_SIGPIPE=1;
446     nWritten = send(hSocket,pBuff,nLeft,0);
447     skt_ignore_SIGPIPE=0;
448     if (nWritten<=0)
449     {
450           if (nWritten==0) return skt_abort(93720,"Socket closed before send.");
451           if (skt_should_retry()) continue;/*Try again*/
452           else return skt_abort(93700+hSocket,"Error on socket send!");
453     }
454     else
455     {
456       nLeft -= nWritten;
457       pBuff += nWritten;
458     }
459   }
460   return 0;
461 }
462
463 /*Cheezy vector send: 
464   really should use writev on machines where it's available. 
465 */
466 #define skt_sendV_max (16*1024)
467
468 int skt_sendV(SOCKET fd,int nBuffers,const void **bufs,int *lens)
469 {
470         int b,len=0;
471         for (b=0;b<nBuffers;b++) len+=lens[b];
472         if (len<=skt_sendV_max) { /*Short message: Copy and do one big send*/
473                 char *buf=(char *)CmiTmpAlloc(skt_sendV_max);
474                 char *dest=buf;
475                 int ret;
476                 for (b=0;b<nBuffers;b++) {
477                         memcpy(dest,bufs[b],lens[b]);
478                         dest+=lens[b];
479                 }
480                 ret=skt_sendN(fd,buf,len);
481                 CmiTmpFree(buf);
482                 return ret;
483         }
484         else { /*Big message: Just send one-by-one as usual*/
485                 int ret;
486                 for (b=0;b<nBuffers;b++) 
487                         if (0!=(ret=skt_sendN(fd,bufs[b],lens[b])))
488                                 return ret;
489                 return 0;
490         }
491 }
492
493
494
495 /***********************************************
496   Routines for manipulating simple binary messages,
497  e.g., to/from conv-host.
498
499  A conv-host message with header has the following format 
500 on the network: 
501 ChMessage---------------------------------------------
502  /--ChMessageHeader----------------------------     ^
503  |12 bytes  |   Message type field           ^      |
504  |          |   (ASCII, Null-terminated)     |      |
505  +-----------------------------------     24 bytes  |
506  | 4 bytes  |   Message data length d        |      |
507  |          |   (big-endian binary integer)  |      |
508  +-----------------------------------        |      |
509  | 4 bytes  |   Return IP address            |      |
510  | 4 bytes  |   Return TCP port number       |      |
511  |          |   (big-endian binary integers) v  24+d bytes
512  \---------------------------------------------     |
513  d bytes  |   User data                             v
514 ------------------------------------------------------
515
516 For completeness, a big-endian (network byte order) 4 byte 
517 integer has this format on the network:
518 ChMessageInt---------------------------------
519   1 byte | Most significant byte  (&0xff000000; <<24)
520   1 byte | More significant byte  (&0x00ff0000; <<16)
521   1 byte | Less significant byte  (&0x0000ff00; <<8)
522   1 byte | Least significant byte (&0x000000ff; <<0)
523 ----------------------------------------------
524 */
525
526 ChMessageInt_t ChMessageInt_new(unsigned int src)
527 { /*Convert integer to bytes*/
528   int i; ChMessageInt_t ret;
529   for (i=0;i<4;i++) ret.data[i]=(unsigned char)(src>>(8*(3-i)));
530   return ret;
531 }
532 unsigned int ChMessageInt(ChMessageInt_t src)
533 { /*Convert bytes to integer*/
534   int i; unsigned int ret=0;
535   for (i=0;i<4;i++) {ret<<=8;ret+=src.data[i];}
536   return (int)ret;
537 }
538
539 ChMessageLong_t ChMessageLong_new(CMK_TYPEDEF_UINT8 src)
540 { /*Convert long integer to bytes*/
541   int i; ChMessageLong_t ret;
542   for (i=0;i<8;i++) ret.data[i]=(unsigned char)(src>>(8*(7-i)));
543   return ret;
544 }
545 CMK_TYPEDEF_UINT8 ChMessageLong(ChMessageLong_t src)
546 { /*Convert bytes to long integer*/
547   int i; CMK_TYPEDEF_UINT8 ret=0;
548   for (i=0;i<8;i++) {ret<<=8;ret+=src.data[i];}
549   return (CMK_TYPEDEF_UINT8)ret;
550 }
551
552 int ChMessage_recv(SOCKET fd,ChMessage *dst)
553 {
554   /*Get the binary header*/
555   if (0!=ChMessageHeader_recv(fd,dst)) return -1;
556   if (0!=ChMessageData_recv(fd,dst)) return -1;
557   return 0;
558 }
559
560 int ChMessageHeader_recv(SOCKET fd,ChMessage *dst)
561 {
562   /*Get the binary header*/
563   if (0!=skt_recvN(fd,(char *)&dst->header,sizeof(dst->header))) return -1;
564   /*Allocate a recieve buffer*/
565   dst->len=ChMessageInt(dst->header.len);
566   dst->data=0;
567   return 0;
568 }
569 int ChMessageData_recv(SOCKET fd,ChMessage *dst)
570 {
571   dst->data=(char *)malloc(dst->len);
572   /*Get the actual data*/
573   if (0!=skt_recvN(fd,dst->data,dst->len)) return -1;
574   return 0;
575 }
576
577 void ChMessage_free(ChMessage *doomed)
578 {
579   free(doomed->data);
580   strncpy(doomed->header.type,"Free'd",CH_TYPELEN);
581   doomed->data=NULL;
582   doomed->len=-1234;
583 }
584 void ChMessageHeader_new(const char *type,int len,ChMessageHeader *dst)
585 {
586   dst->len=ChMessageInt_new(len);
587   if (type==NULL) type="default";
588   strncpy(dst->type,type,CH_TYPELEN);
589 }
590 void ChMessage_new(const char *type,int len,ChMessage *dst)
591 {
592   ChMessageHeader_new(type,len,&dst->header);
593   dst->len=len;
594   dst->data=(char *)malloc(dst->len);
595 }
596 int ChMessage_send(SOCKET fd,const ChMessage *src)
597 {
598   const void *bufs[2]; int lens[2];
599   bufs[0]=&src->header; lens[0]=sizeof(src->header);
600   bufs[1]=src->data; lens[1]=src->len;
601   return skt_sendV(fd,2,bufs,lens);
602 } /*You must free after send*/
603
604 #endif /*!CMK_NO_SOCKETS*/
605
606
607