dcfeebd068a61d99f61b3efe0df7a5c4e4d6039a
[charm.git] / src / arch / net / machine.c
1
2 /******************************************************************************
3  *
4  * THE DATAGRAM STREAM
5  *
6  * Messages are sent using UDP datagrams.  The sender allocates a
7  * struct for each datagram to be sent.  These structs stick around
8  * until slightly after the datagram is acknowledged.
9  *
10  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
11  * Each node has an OtherNode struct for every other node in the
12  * system.  The OtherNode struct contains:
13  *
14  *   send_queue   (all datagram-structs not yet transmitted)
15  *   send_window  (all datagram-structs transmitted but not ack'd)
16  *
17  * When an acknowledgement comes in, all packets in the send-window
18  * are either marked as acknowledged or pushed back into the send
19  * queue for retransmission.
20  *
21  * THE OUTGOING MESSAGE
22  *
23  * When you send or broadcast a message, the first thing the system
24  * does is system creates an OutgoingMsg struct to represent the
25  * operation.  The OutgoingMsg contains a very direct expression
26  * of what you want to do:
27  *
28  * OutgoingMsg:
29  *
30  *   size      --- size of message in bytes
31  *   data      --- pointer to the buffer containing the message
32  *   src       --- processor which sent the message
33  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
34  *   freemode  --- see below.
35  *   refcount  --- see below.
36  *
37  * The OutgoingMsg is kept around until the transmission is done, then
38  * it is garbage collected --- the refcount and freemode fields are
39  * to assist garbage collection.
40  *
41  * The freemode indicates which kind of buffer-management policy was
42  * used (sync, async, or freeing).  The sync policy is handled
43  * superficially by immediately converting sync sends into freeing
44  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
45  * (freeing).  If the freemode is 'F', then garbage collection
46  * involves freeing the data and the OutgoingMsg structure itself.  If
47  * the freemode is 'A', then the only cleanup is to change the
48  * freemode to 'X', a condition which is then detectable by
49  * CmiAsyncMsgSent.  In this case, the actual freeing of the
50  * OutgoingMsg is done by CmiReleaseCommHandle.
51  *
52  * When the transmission is initiated, the system computes how many
53  * datagrams need to be sent, total.  This number is stored in the
54  * refcount field.  Each time a datagram is delivered, the refcount
55  * is decremented, when it reaches zero, cleanup is performed.  There
56  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
57  * is a send (not a broadcast) and can be performed with shared
58  * memory, the entire datagram system is bypassed, the message is
59  * simply delivered and freed, not using the refcount mechanism at
60  * all.  Exception 2: If the message is a broadcast, then part of the
61  * broadcast that can be done via shared memory is performed prior to
62  * initiating the datagram/refcount system.
63  *
64  * DATAGRAM FORMATS AND MESSAGE FORMATS
65  *
66  * Datagrams have this format:
67  *
68  *   srcpe   (16 bits) --- source processor number.
69  *   magic   (16 bits) --- magic number to make sure DG is good.
70  *   dstrank ( 8 bits) --- destination processor rank.
71  *   seqno   (24 bits) --- packet sequence number.
72  *   data    (XX byte) --- user data.
73  *
74  * The only reason the srcpe is in there is because the receiver needs
75  * to know which receive window to use.  The dstrank field is needed
76  * because transmission is node-to-node.  Once the message is
77  * assembled by the node, it must be delivered to the appropriate PE.
78  * The dstrank field is used to encode certain special-case scenarios.
79  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
80  * and should be delivered to all processors in the node.  If the dstrank
81  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
82  * which case the srcpe is the number of the acknowledger, the seqno is
83  * always zero, and the user data is a list of the seqno's being
84  * acknowledged.  There may be other dstrank codes for special functions.
85  *
86  * To send a message, one chops it up into datagrams and stores those
87  * datagrams in a send-queue.  These outgoing datagrams aren't stored
88  * in the explicit format shown above.  Instead, they are stored as
89  * ImplicitDgrams, which contain the datagram header and a pointer to
90  * the user data (which is in the user message buffer, which is in the
91  * OutgoingMsg).  At transmission time these are combined together.
92
93  * The combination of the datagram header with the user's data is
94  * performed right in the user's message buffer.  Note that the
95  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
96  * of the user's message with a datagram header, sends the datagram
97  * straight from the user's message buffer, then restores the user's
98  * buffer to its original state.  There is a small problem with the
99  * first datagram of the message: one needs 64 bits of space to store
100  * the datagram header.  To make sure this space is there, we added a
101  * 64-bit unused space to the front of the Cmi message header.  In
102  * addition to this, we also add 32 bits to the Cmi message header
103  * to make room for a length-field, making it possible to identify
104  * message boundaries.
105  *
106  * CONCURRENCY CONTROL
107  *
108  * This has changed recently.
109  *
110  * EFFICIENCY NOTES
111  *
112  * The sender-side does little copying.  The async and freeing send
113  * routines do no copying at all.  The sync send routines copy the
114  * message, then use the freeing-send routines.  The other alternative
115  * is to not copy the message, and use the async send mechanism
116  * combined with a blocking wait.  Blocking wait seems like a bad
117  * idea, since it could take a VERY long time to get all those
118  * datagrams out the door.
119  *
120  * The receiver side, unfortunately, must copy.  To avoid copying,
121  * it would have to receive directly into a preallocated message buffer.
122  * Unfortunately, this can't work: there's no way to know how much
123  * memory to preallocate, and there's no way to know which datagram
124  * is coming next.  Thus, we receive into fixed-size (large) datagram
125  * buffers.  These are then inspected, and the messages extracted from
126  * them.
127  *
128  * Note that we are allocating a large number of structs: OutgoingMsg's,
129  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
130  * is a fixed-size structure.  Thus, we can do memory allocation by
131  * simply keeping a linked-list of unused structs around.  The only
132  * place where expensive memory allocation is performed is in the
133  * sync routines.
134  *
135  * Since the datagrams from one node to another are fully ordered,
136  * there is slightly more ordering than is needed: in theory, the
137  * datagrams of one message don't need to be ordered relative to the
138  * datagrams of another.  This was done to simplify the sequencing
139  * mechanisms: implementing a fully-ordered stream is much simpler
140  * than a partially-ordered one.  It also makes it possible to
141  * modularize, layering the message transmitter on top of the
142  * datagram-sequencer.  In other words, it was just easier this way.
143  * Hopefully, this won't cause serious degradation: LAN's rarely get
144  * datagrams out of order anyway.
145  *
146  * A potential efficiency problem is the lack of message-combining.
147  * One datagram could conceivably contain several messages.  This
148  * might be more efficient, it's not clear how much overhead is
149  * involved in sending a short datagram.  Message-combining isn't
150  * really ``integrated'' into the design of this software, but you
151  * could fudge it as follows.  Whenever you pull a short datagram from
152  * the send-queue, check the next one to see if it's also a short
153  * datagram.  If so, pack them together into a ``combined'' datagram.
154  * At the receive side, simply check for ``combined'' datagrams, and
155  * treat them as if they were simply two datagrams.  This would
156  * require extra copying.  I have no idea if this would be worthwhile.
157  *
158  *****************************************************************************/
159
160 /*****************************************************************************
161  *
162  * Include Files
163  *
164  ****************************************************************************/
165
166 /*
167  * I_Hate_C because the ansi prototype for a varargs function is incompatible
168  * with the K&R definition of that varargs function.  Eg, this doesn't compile:
169  *
170  * void CmiPrintf(char *, ...);
171  *
172  * void CmiPrintf(va_alist) va_dcl
173  * {
174  *    ...
175  * }
176  *
177  * I can't define the function in an ANSI way, because our stupid SUNs dont
178  * yet have stdarg.h, even though they have gcc (which is ANSI).  So I have
179  * to leave the definition of CmiPrintf as a K&R form, but I have to
180  * deactivate the protos or the compiler barfs.  That's why I_Hate_C.
181  *
182  */
183
184 #define CmiPrintf I_Hate_C_1
185 #define CmiError  I_Hate_C_2
186 #define CmiScanf  I_Hate_C_3
187 #include "converse.h"
188 #undef CmiPrintf
189 #undef CmiError
190 #undef CmiScanf
191 void CmiPrintf();
192 void CmiError();
193 int  CmiScanf();
194
195 #include <sys/types.h>
196 #include <stdio.h>
197 #include <ctype.h>
198 #include <fcntl.h>
199 #include <netinet/in.h>
200 #include <arpa/inet.h>
201 #include <netdb.h>
202 #include <rpc/rpc.h>
203 #include <errno.h>
204 #include <setjmp.h>
205 #include <pwd.h>
206 #include <stdlib.h>
207 #include <signal.h>
208 #include <varargs.h>
209 #include <unistd.h>
210 #include <sys/file.h>
211 #include <sys/param.h>
212 #include <sys/resource.h>
213 #include <sys/socket.h>
214 #include <sys/stat.h>
215 #include <sys/time.h>
216 #include <varargs.h>
217
218 #if CMK_STRINGS_USE_STRINGS_H
219 #include <strings.h>
220 #endif
221
222 #if CMK_STRINGS_USE_STRING_H
223 #include <string.h>
224 #endif
225
226 #if CMK_STRINGS_USE_OWN_DECLARATIONS
227 char *strchr(), *strrchr(), *strdup();
228 #endif
229
230 #if CMK_RSH_IS_A_COMMAND
231 #define RSH_CMD "rsh"
232 #endif
233 #if CMK_RSH_USE_REMSH
234 #define RSH_CMD "remsh"
235 #endif
236
237 static void KillEveryone();
238 static void KillEveryoneCode();
239 static void CommunicationServer();
240 extern int CmemInsideMem();
241 extern void CmemCallWhenMemAvail();
242 void ConverseInitPE(void);
243 void *FIFO_Create(void);
244 int   FIFO_Fill(void *);
245 void *FIFO_Peek(void *);
246 void  FIFO_Pop(void *);
247 void  FIFO_EnQueue(void *, void *);
248 void  FIFO_EnQueue_Front(void *, void *);
249 void CmiYield();
250
251 /*****************************************************************************
252  *
253  *     Utility routines for network machine interface.
254  *
255  *
256  * zap_newline(char *s)
257  *
258  *   - Remove the '\n' from the end of a string.
259  *
260  * char *skipblanks(char *s)
261  *
262  *   - advance pointer over blank characters
263  *
264  * char *skipstuff(char *s)
265  *
266  *   - advance pointer over nonblank characters
267  *
268  * char *strdupl(char *s)
269  *
270  *   - return a freshly-allocated duplicate of a string
271  *
272  *****************************************************************************/
273
274 static void zap_newline(s) char *s;
275 {
276   char *p;
277   p = s + strlen(s)-1;
278   if (*p == '\n') *p = '\0';
279 }
280
281 static char *skipblanks(p) char *p;
282 {
283   while ((*p==' ')||(*p=='\t')||(*p=='\n')) p++;
284   return p;
285 }
286
287 static char *skipstuff(p) char *p;
288 {
289   while ((*p)&&(*p!=' ')&&(*p!='\t')) p++;
290   return p;
291 }
292
293 static char *strdupl(s) char *s;
294 {
295   int len = strlen(s);
296   char *res = (char *)malloc(len+1);
297   strcpy(res, s);
298   return res;
299 }
300
301 double GetClock()
302 {
303   struct timeval tv; int ok;
304   ok = gettimeofday(&tv, NULL);
305   if (ok<0) KillEveryoneCode(9343112);
306   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
307 }
308
309 void jmemcpy(char *dst, char *src, int len)
310 {
311   char *sdend = (char *)(((int)(src + len)) & ~(sizeof(double)-1));
312   while (src != sdend) {
313     *((double*)dst) = *((double*)src);
314     dst+=sizeof(double); src+=sizeof(double);
315   }
316   len &= (sizeof(double)-1);
317   while (len) { *dst++ = *src++; len--; }
318 }
319
320 char *CopyMsg(char *msg, int len)
321 {
322   char *copy = (char *)CmiAlloc(len);
323   jmemcpy(copy, msg, len);
324   return copy;
325 }
326
327 static char *parseint(p, value) char *p; int *value;
328 {
329   int val = 0;
330   while (((*p)==' ')||((*p)=='.')) p++;
331   if (((*p)<'0')||((*p)>'9')) KillEveryone("badly-formed number");
332   while ((*p>='0')&&(*p<='9')) { val*=10; val+=(*p)-'0'; p++; }
333   *value = val;
334   return p;
335 }
336
337 static char *DeleteArg(argv)
338 char **argv;
339 {
340   char *res = argv[0];
341   if (res==0) KillEveryone("Illegal Arglist");
342   while (*argv) { argv[0]=argv[1]; argv++; }
343   return res;
344 }
345
346 static int CountArgs(char **argv)
347 {
348   int count=0;
349   while (argv[0]) { count++; argv++; }
350   return count;
351 }
352
353
354 static void jsleep(int sec, int usec)
355 {
356   int ntimes,i;
357   struct timeval tm;
358
359   ntimes = sec*200 + usec/5000;
360   for(i=0;i<ntimes;i++) {
361     tm.tv_sec = 0;
362     tm.tv_usec = 5000;
363     while(1) {
364       if (select(0,NULL,NULL,NULL,&tm)==0) break;
365       if ((errno!=EBADF)&&(errno!=EINTR)) return;
366     }
367   }
368 }
369
370 static void writeall(int fd, char *buf, int size)
371 {
372   int ok;
373   while (size) {
374     retry:
375     CmiYield();
376     ok = write(fd, buf, size);
377     if ((ok<0)&&(errno==EBADF)) goto retry;
378     if (ok<=0) KillEveryone("write on tcp socket failed.");
379     size-=ok; buf+=ok;
380   }
381 }
382
383 static int wait_readable(fd, sec) int fd; int sec;
384 {
385   fd_set rfds;
386   struct timeval tmo;
387   int begin, nreadable;
388   
389   begin = time(0);
390   FD_ZERO(&rfds);
391   FD_SET(fd, &rfds);
392   while(1) {
393     tmo.tv_sec = (time(0) - begin) + sec;
394     tmo.tv_usec = 0;
395     nreadable = select(FD_SETSIZE, &rfds, NULL, NULL, &tmo);
396     if ((nreadable<0)&&((errno==EINTR)||(errno==EBADF))) continue;
397     if (nreadable == 0) { errno=ETIMEDOUT; return -1; }
398     return 0;
399   }
400 }
401
402 /**************************************************************************
403  *
404  * SKT - socket routines
405  *
406  *
407  * void skt_server(unsigned int *ppo, unsigned int *pfd)
408  *
409  *   - create a tcp server socket.  Performs the whole socket/bind/listen
410  *     procedure.  Returns the the port of the socket and the file descriptor.
411  *
412  * void skt_datagram(unsigned int *ppo, unsigned int *pfd, int bufsize)
413  *
414  *   - creates a UDP datagram socket.  Performs the whole socket/bind/
415  *     getsockname procedure.  Returns the port of the socket and
416  *     the file descriptor.  Bufsize, if nonzero, controls the amount
417  *     of buffer space the kernel sets aside for the socket.
418  *
419  * void skt_accept(int src,
420  *                 unsigned int *pip, unsigned int *ppo, unsigned int *pfd)
421  *
422  *   - accepts a connection to the specified socket.  Returns the
423  *     IP of the caller, the port number of the caller, and the file
424  *     descriptor to talk to the caller.
425  *
426  * int skt_connect(unsigned int ip, int port, int timeout)
427  *
428  *   - Opens a connection to the specified server.  Returns a socket for
429  *     communication.
430  *
431  *
432  **************************************************************************/
433
434 static void skt_server(ppo, pfd)
435 unsigned int *ppo;
436 unsigned int *pfd;
437 {
438   int fd= -1;
439   int ok, len;
440   struct sockaddr_in addr;
441   
442 retry:
443   CmiYield();
444   fd = socket(PF_INET, SOCK_STREAM, 0);
445   if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto retry;
446   if (fd < 0) { perror("socket 1"); KillEveryoneCode(93483); }
447   memset(&addr, 0, sizeof(addr));
448   addr.sin_family = AF_INET;
449   ok = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
450   if (ok < 0) { perror("bind"); KillEveryoneCode(22933); }
451   ok = listen(fd,5);
452   if (ok < 0) { perror("listen"); KillEveryoneCode(3948); }
453   len = sizeof(addr);
454   ok = getsockname(fd, (struct sockaddr *)&addr, &len);
455   if (ok < 0) { perror("getsockname"); KillEveryoneCode(93583); }
456
457   *pfd = fd;
458   *ppo = ntohs(addr.sin_port);
459 }
460
461 static void skt_datagram(ppo, pfd, bufsize)
462 unsigned int *ppo;
463 unsigned int *pfd;
464 unsigned int  bufsize;
465 {
466   struct sockaddr_in name;
467   int length, ok, skt;
468   
469   /* Create data socket */
470 retry:
471   CmiYield();
472   skt = socket(AF_INET,SOCK_DGRAM,0);
473   if ((skt<0)&&((errno==EINTR)||(errno==EBADF))) goto retry;
474   if (skt < 0)
475     { perror("socket 2"); KillEveryoneCode(8934); }
476   name.sin_family = AF_INET;
477   name.sin_port = 0;
478   name.sin_addr.s_addr = htonl(INADDR_ANY);
479   if (bind(skt, (struct sockaddr *)&name, sizeof(name)) == -1)
480     { perror("binding data socket"); KillEveryoneCode(2983); }
481   length = sizeof(name);
482   if (getsockname(skt, (struct sockaddr *)&name , &length))
483     { perror("getting socket name"); KillEveryoneCode(39483); }
484
485   if (bufsize) {
486     int len = sizeof(int);
487     ok = setsockopt(skt, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len);
488     if (ok < 0) KillEveryoneCode(35782);
489     ok = setsockopt(skt, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len);
490     if (ok < 0) KillEveryoneCode(35783);
491   }     
492
493   *pfd = skt;
494   *ppo = htons(name.sin_port);
495 }
496
497 static void skt_accept(src, pip, ppo, pfd)
498 int src;
499 unsigned int *pip;
500 unsigned int *ppo;
501 unsigned int *pfd;
502 {
503   int i, fd, ok;
504   struct sockaddr_in remote;
505   i = sizeof(remote);
506  acc:
507   CmiYield();
508   fd = accept(src, (struct sockaddr *)&remote, &i);
509   if ((fd<0)&&((errno==EINTR)||(errno==EBADF)||(errno==EPROTO))) goto acc;
510   if (fd<0) { perror("accept"); KillEveryoneCode(39489); }
511   *pip=htonl(remote.sin_addr.s_addr);
512   *ppo=htons(remote.sin_port);
513   *pfd=fd;
514 }
515
516 static int skt_connect(ip, port, seconds)
517 unsigned int ip; int port; int seconds;
518 {
519   struct sockaddr_in remote; short sport=port;
520   int fd, ok, len, retry, begin;
521     
522   /* create an address structure for the server */
523   memset(&remote, 0, sizeof(remote));
524   remote.sin_family = AF_INET;
525   remote.sin_port = htons(sport);
526   remote.sin_addr.s_addr = htonl(ip);
527     
528   begin = time(0); ok= -1;
529   while (time(0)-begin < seconds) {
530   sock:
531     CmiYield();
532     fd = socket(AF_INET, SOCK_STREAM, 0);
533     if ((fd<0)&&((errno==EINTR)||(errno==EBADF))) goto sock;
534     if (fd < 0) { perror("socket 3"); exit(1); }
535     
536   conn:
537     ok = connect(fd, (struct sockaddr *)&(remote), sizeof(remote));
538     if (ok>=0) break;
539     close(fd);
540     switch (errno) {
541     case EINTR: case EBADF: break;
542     case ECONNREFUSED: jsleep(1,0); break;
543     case EADDRINUSE: jsleep(1,0); break;
544     case EADDRNOTAVAIL: jsleep(5,0); break;
545     default: KillEveryone(strerror(errno));
546     }
547   }
548   if (ok<0) {
549     KillEveryone(strerror(errno)); exit(1);
550   }
551   return fd;
552 }
553
554 /*****************************************************************************
555  *
556  * Producer-Consumer Queues
557  *
558  * This queue implementation enables a producer and a consumer to
559  * communicate via a queue.  The queues are optimized for this situation,
560  * they don't require any operating system locks (they do require 32-bit
561  * reads and writes to be atomic.)  Cautions: there can only be one
562  * producer, and one consumer.  These queues cannot store null pointers.
563  *
564  ****************************************************************************/
565
566 #define PCQueueSize 0x100
567
568 typedef struct CircQueueStruct
569 {
570   struct CircQueueStruct *next;
571   int push;
572   int pull;
573   char *data[PCQueueSize];
574 }
575 *CircQueue;
576
577 typedef struct PCQueueStruct
578 {
579   CircQueue head;
580   CircQueue tail;
581 }
582 *PCQueue;
583
584 PCQueue PCQueueCreate()
585 {
586   CircQueue circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
587   PCQueue Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
588   Q->head = circ;
589   Q->tail = circ;
590   return Q;
591 }
592
593 char *PCQueuePop(PCQueue Q)
594 {
595   CircQueue circ; int pull; char *data;
596
597   while (1) {
598     circ = Q->head;
599     pull = circ->pull;
600     data = circ->data[pull];
601     if (data) {
602       circ->pull = (pull + 1) & (PCQueueSize-1);
603       circ->data[pull] = 0;
604       return data;
605     }
606     if (Q->tail == circ)
607       return 0;
608     Q->head = circ->next;
609     free(circ);
610   }
611 }
612
613 void PCQueuePush(PCQueue Q, char *data)
614 {
615   CircQueue circ; int push;
616   
617   circ = Q->tail;
618   push = circ->push;
619   if (circ->data[push] == 0) {
620     circ->data[push] = data;
621     circ->push = (push + 1) & (PCQueueSize-1);
622     return;
623   }
624   circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
625   circ->push = 1;
626   circ->data[0] = data;
627   Q->tail->next = circ;
628   Q->tail = circ;
629 }
630
631 /*****************************************************************************
632  *
633  * CmiAlloc, CmiSize, and CmiFree
634  *
635  * Note: this allocator is only used for messages.  Everything else
636  * is allocated with the less-expensive ``malloc''.
637  *
638  *****************************************************************************/
639
640 void *CmiAlloc(size)
641 int size;
642 {
643   char *res;
644   res =(char *)malloc(size+8);
645   if (res==0) KillEveryone("Memory allocation failed.");
646   ((int *)res)[0]=size;
647   return (void *)(res+8);
648 }
649
650 int CmiSize(blk)
651 void *blk;
652 {
653   return ((int *)(((char *)blk)-8))[0];
654 }
655
656 void CmiFree(blk)
657 void *blk;
658 {
659   free(((char *)blk)-8);
660 }
661
662
663 /*****************************************************************************
664  *                                                                           
665  * Neighbour-Lookup functions.                                               
666  *                                                                           
667  * the neighbour information is computed dynamically.  It imposes a
668  * (maybe partial) hypercube on the machine.
669  *                                                                           
670  *****************************************************************************/
671  
672 long CmiNumNeighbours(node)
673 int node;
674 {
675   int bit, count=0;
676   bit = 1;
677   while (1) {
678     int neighbour = node ^ bit;
679     if (neighbour < CmiNumPes()) count++;
680     bit = bit<<1; 
681     if (bit > CmiNumPes()) break;
682   }
683   return count;
684 }
685  
686 int CmiGetNodeNeighbours(node, neighbours)
687 int node, *neighbours;
688 {
689   int bit, count=0;
690   bit = 1;
691   while (1) {
692     int neighbour = node ^ bit;
693     if (neighbour < CmiNumPes()) neighbours[count++] = neighbour;
694     bit = bit<<1; 
695     if (bit > CmiNumPes()) break;
696   }
697   return count;
698 }
699  
700 int CmiNeighboursIndex(node, nbr)
701 int node, nbr;
702 {
703   int bit, count=0;
704   bit = 1;
705   while (1) {
706     int neighbour = node ^ bit;
707     if (neighbour < CmiNumPes()) { if (nbr==neighbour) return count; count++; }
708     bit = bit<<=1; 
709     if (bit > CmiNumPes()) break;
710   }
711   return(-1);
712 }
713
714 /*****************************************************************************
715  *
716  * Communication Structures
717  *
718  *****************************************************************************/
719
720 #define DGRAM_HEADER_SIZE 8
721
722 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
723 #define CmiMsgHeaderGetLength(msg)      (((int*)(msg))[2])
724 #define CmiMsgNext(msg) (*((void**)(msg)))
725
726 #define DGRAM_SRCPE_MASK    (0xFFFF)
727 #define DGRAM_MAGIC_MASK    (0xFFFF)
728 #define DGRAM_SEQNO_MASK    (0xFFFFFF)
729
730 #define DGRAM_DSTRANK_MAX   (0xFC)
731 #define DGRAM_SIMPLEKILL    (0xFD)
732 #define DGRAM_BROADCAST     (0xFE)
733 #define DGRAM_ACKNOWLEDGE   (0xFF)
734
735
736
737 typedef struct { char data[DGRAM_HEADER_SIZE]; } DgramHeader;
738
739 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
740
741 #define DgramHeaderMake(ptr, dstrank, srcpe, magic, seqno) { \
742    ((unsigned short *)ptr)[0] = srcpe; \
743    ((unsigned short *)ptr)[1] = magic; \
744    ((unsigned int *)ptr)[1] = (seqno<<8) | dstrank; \
745 }
746
747 #define DgramHeaderBreak(ptr, dstrank, srcpe, magic, seqno) { \
748    unsigned int tmp; \
749    srcpe = ((unsigned short *)ptr)[0]; \
750    magic = ((unsigned short *)ptr)[1]; \
751    tmp = ((unsigned int *)ptr)[1]; \
752    dstrank = (tmp&0xFF); seqno = (tmp>>8); \
753 }
754
755 #define PE_BROADCAST_OTHERS (-1)
756 #define PE_BROADCAST_ALL    (-2)
757
758
759 typedef struct OutgoingMsgStruct
760 {
761   struct OutgoingMsgStruct *next;
762   int   src, dst;
763   int   size;
764   char *data;
765   int   refcount;
766   int   freemode;
767 }
768 *OutgoingMsg;
769
770 typedef struct ExplicitDgramStruct
771 {
772   struct ExplicitDgramStruct *next;
773   int  srcpe, rank, seqno;
774   unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */
775   double data[1];
776 }
777 *ExplicitDgram;
778
779 typedef struct ImplicitDgramStruct
780 {
781   struct ImplicitDgramStruct *next;
782   struct OtherNodeStruct *dest;
783   int srcpe, rank, seqno;
784   char  *dataptr;
785   int    datalen;
786   OutgoingMsg ogm;
787 }
788 *ImplicitDgram;
789
790 typedef struct OtherNodeStruct
791 {
792   int nodestart, nodesize;
793   unsigned int IP, dataport, ctrlport;
794   struct sockaddr_in addr;
795   
796   double                   send_primer;    /* time to send primer packet */
797   unsigned int             send_last;      /* seqno of last dgram sent */
798   ImplicitDgram           *send_window;    /* datagrams sent, not acked */
799   ImplicitDgram            send_queue_h;   /* head of send queue */
800   ImplicitDgram            send_queue_t;   /* tail of send queue */
801   unsigned int             send_next;      /* next seqno to go into queue */
802   
803   int                      asm_rank;
804   int                      asm_total;
805   int                      asm_fill;
806   char                    *asm_msg;
807   
808   int                      recv_ack_cnt; /* number of unacked dgrams */
809   double                   recv_ack_time;/* time when ack should be sent */
810   unsigned int             recv_expect;  /* next dgram to expect */
811   ExplicitDgram           *recv_window;  /* Packets received, not integrated */
812   int                      recv_winsz;   /* Number of packets in recv window */
813   unsigned int             recv_next;    /* Seqno of first missing packet */
814 }
815 *OtherNode;
816
817 typedef struct CmiStateStruct
818 {
819   int pe, rank;
820   PCQueue recv;
821   void *localqueue;
822 }
823 *CmiState;
824
825 void CmiStateInit(int pe, int rank, CmiState state)
826 {
827   state->pe = pe;
828   state->rank = rank;
829   state->recv = PCQueueCreate();
830   state->localqueue = FIFO_Create();
831 }
832
833
834 static ImplicitDgram Cmi_freelist_implicit;
835 static ExplicitDgram Cmi_freelist_explicit;
836 static OutgoingMsg   Cmi_freelist_outgoing;
837
838 #define FreeImplicitDgram(dg) {\
839   ImplicitDgram d=(dg);\
840   d->next = Cmi_freelist_implicit;\
841   Cmi_freelist_implicit = d;\
842 }
843
844 #define MallocImplicitDgram(dg) {\
845   ImplicitDgram d = Cmi_freelist_implicit;\
846   if (d==0) d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
847   else Cmi_freelist_implicit = d->next;\
848   dg = d;\
849 }
850
851 #define FreeExplicitDgram(dg) {\
852   ExplicitDgram d=(dg);\
853   d->next = Cmi_freelist_explicit;\
854   Cmi_freelist_explicit = d;\
855 }
856
857 #define MallocExplicitDgram(dg) {\
858   ExplicitDgram d = Cmi_freelist_explicit;\
859   if (d==0) d = ((ExplicitDgram)malloc \
860                    (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
861   else Cmi_freelist_explicit = d->next;\
862   dg = d;\
863 }
864
865 /* Careful with these next two, need concurrency control */
866
867 #define FreeOutgoingMsg(m) (free(m))
868 #define MallocOutgoingMsg(m)\
869    (m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct)))
870
871 /******************************************************************************
872  *
873  * Configuration Data
874  *
875  * This data is all read in from the NETSTART variable (provided by the
876  * host) and from the command-line arguments.  Once read in, it is never
877  * modified.
878  *
879  *****************************************************************************/
880
881
882 int               Cmi_numpes;    /* Total number of processors */
883 int               Cmi_mynodesize;/* Number of processors in my address space */
884 static int        Cmi_mynode;    /* Which address space am I */
885 static int        Cmi_numnodes;  /* Total number of address spaces */
886 static int        Cmi_nodestart; /* First processor in this address space */
887 static CmiStartFn Cmi_startfn;   /* The start function */
888 static int        Cmi_usrsched;  /* Continue after start function finishes? */
889 static char     **Cmi_argv;
890 static int        Cmi_host_IP;
891 static int        Cmi_self_IP;
892 static int        Cmi_host_port;
893 static int        Cmi_host_pid;
894 static char       Cmi_host_IP_str[16];
895 static char       Cmi_self_IP_str[16];
896
897 static int    Cmi_max_dgram_size;
898 static int    Cmi_os_buffer_size;
899 static int    Cmi_window_size;
900 static int    Cmi_half_window;
901 static double Cmi_delay_retransmit;
902 static double Cmi_ack_delay;
903 static int    Cmi_dgram_max_data;
904 static int    Cmi_tickspeed;
905
906 static void setspeed_atm()
907 {
908   Cmi_max_dgram_size   = 2048;
909   Cmi_os_buffer_size   = 50000;
910   Cmi_window_size      = 20;
911   Cmi_delay_retransmit = 0.0150;
912   Cmi_ack_delay        = 0.0035;
913   Cmi_tickspeed        = 10000;
914 }
915
916 static void setspeed_eth()
917 {
918   Cmi_max_dgram_size   = 2048;
919   Cmi_os_buffer_size   = 50000;
920   Cmi_window_size      = 20;
921   Cmi_delay_retransmit = 0.0400;
922   Cmi_ack_delay        = 0.0100;
923   Cmi_tickspeed        = 10000;
924 }
925
926 static void parse_netstart()
927 {
928   char *ns;
929   int nread;
930   ns = getenv("NETSTART");
931   if (ns==0) goto abort;
932   nread = sscanf(ns, "%d%d%d%d%d%d%d%d",
933                  &Cmi_numnodes, &Cmi_mynode,
934                  &Cmi_nodestart, &Cmi_mynodesize, &Cmi_numpes,
935                  &Cmi_self_IP, &Cmi_host_IP, &Cmi_host_port, &Cmi_host_pid);
936   if (nread!=8) goto abort;
937   sprintf(Cmi_self_IP_str,"%d.%d.%d.%d",
938           (Cmi_self_IP>>24)&0xFF,(Cmi_self_IP>>16)&0xFF,
939           (Cmi_self_IP>>8)&0xFF,Cmi_self_IP&0xFF);
940   sprintf(Cmi_host_IP_str,"%d.%d.%d.%d",
941           (Cmi_host_IP>>24)&0xFF,(Cmi_host_IP>>16)&0xFF,
942           (Cmi_host_IP>>8)&0xFF,Cmi_host_IP&0xFF);
943   return;
944  abort:
945   KillEveryone("program not started using 'conv-host' utility. aborting.\n");
946   exit(1);
947 }
948
949 static void extract_args(argv)
950 char **argv;
951 {
952   setspeed_eth();
953   while (*argv) {
954     if (strcmp(*argv,"++atm")==0) {
955       setspeed_atm();
956       DeleteArg(argv);
957     } else if (strcmp(*argv,"++eth")==0) {
958       setspeed_eth();
959       DeleteArg(argv);
960     } else argv++;
961   }
962   Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
963   Cmi_half_window = Cmi_window_size >> 1;
964   if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
965     KillEveryone("Window size too big for OS buffer.");
966 }
967
968 /******************************************************************************
969  *
970  * Packet Performance Logging
971  *
972  * This module is designed to give a detailed log of the packets and their
973  * acknowledgements, for performance tuning.  It can be disabled.
974  *
975  *****************************************************************************/
976
977 #define LOGGING 0
978
979 #if LOGGING
980
981 typedef struct logent {
982   double time;
983   int seqno;
984   int srcpe;
985   int dstpe;
986   int kind;
987 } *logent;
988
989
990 logent log;
991 int    log_pos;
992 int    log_wrap;
993
994 static void log_init()
995 {
996   log = (logent)malloc(50000 * sizeof(struct logent));
997   log_pos = 0;
998   log_wrap = 0;
999 }
1000
1001 static void log_done()
1002 {
1003   char logname[100]; FILE *f; int i, size;
1004   sprintf(logname, "log.%d", Cmi_mynode);
1005   f = fopen(logname, "w");
1006   if (f==0) { perror("fopen"); exit(1); }
1007   if (log_wrap) size = 50000; else size=log_pos;
1008   for (i=0; i<size; i++) {
1009     logent ent = log+i;
1010     fprintf(f, "%1.4f %d %c %d %d\n",
1011             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
1012   }
1013   fclose(f);
1014 }
1015
1016 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
1017
1018 #endif
1019
1020
1021 #if !LOGGING
1022
1023 #define log_init() 0
1024 #define log_done() 0
1025 #define LOG(t,s,k,d,q) 0
1026
1027 #endif
1028
1029 /******************************************************************************
1030  *
1031  * Node state
1032  *
1033  *****************************************************************************/
1034
1035 static int        ctrlport, dataport, ctrlskt, dataskt;
1036
1037 static OtherNode *nodes_by_pe;  /* OtherNodes indexed by processor number */
1038 static OtherNode  nodes;        /* Indexed only by ``node number'' */
1039
1040 static volatile int          Cmi_shutdown_initiated;
1041 static CmiNodeLock  Cmi_scanf_mutex;
1042 static volatile char        *Cmi_scanf_data;
1043 static double       Cmi_clock;
1044
1045 /****************************************************************************
1046  *                                                                          
1047  * CheckSocketsReady
1048  *
1049  * Checks both sockets to see which are readable and which are writeable.
1050  * We check all these things at the same time since this can be done for
1051  * free with ``select.'' The result is stored in global variables, since
1052  * this is essentially global state information and several routines need it.
1053  *
1054  ***************************************************************************/
1055
1056 static int ctrlskt_ready_read;
1057 static int ctrlskt_ready_write;
1058 static int dataskt_ready_read;
1059 static int dataskt_ready_write;
1060
1061 void CheckSocketsReady()
1062 {
1063   static fd_set rfds; 
1064   static fd_set wfds; 
1065   struct timeval tmo;
1066   int nreadable;
1067   
1068   FD_SET(dataskt, &rfds);
1069   FD_SET(ctrlskt, &rfds);
1070   FD_SET(dataskt, &wfds);
1071   FD_SET(ctrlskt, &wfds);
1072   tmo.tv_sec = 0;
1073   tmo.tv_usec = 0;
1074   nreadable = select(FD_SETSIZE, &rfds, &wfds, NULL, &tmo);
1075   if (nreadable <= 0) {
1076     ctrlskt_ready_read = 0;
1077     ctrlskt_ready_write = 0;
1078     dataskt_ready_read = 0;
1079     dataskt_ready_write = 0;
1080     return;
1081   }
1082   ctrlskt_ready_read = (FD_ISSET(ctrlskt, &rfds));
1083   dataskt_ready_read = (FD_ISSET(dataskt, &rfds));
1084   ctrlskt_ready_write = (FD_ISSET(ctrlskt, &wfds));
1085   dataskt_ready_write = (FD_ISSET(dataskt, &wfds));
1086 }
1087
1088 /******************************************************************************
1089  *
1090  * OS Threads
1091  *
1092  * This version of converse is for multiple-processor workstations,
1093  * and we assume that the OS provides threads to gain access to those
1094  * multiple processors.  This section contains an interface layer for
1095  * the OS specific threads package.  It contains routines to start
1096  * the threads, routines to access their thread-specific state, and
1097  * routines to control mutual exclusion between them.
1098  *
1099  * In addition, we wish to support nonthreaded operation.  To do this,
1100  * we provide a version of these functions that uses the main/only thread
1101  * as a single PE, and simulates a communication thread using interrupts.
1102  *
1103  *
1104  * CmiStartThreads()
1105  *
1106  *    Allocates one CmiState structure per PE.  Initializes all of
1107  *    the CmiState structures using the function CmiStateInit.
1108  *    Starts processor threads 1..N (not 0, that's the one
1109  *    that calls CmiStartThreads), as well as the communication
1110  *    thread.  Each processor thread (other than 0) must call ConverseInitPE
1111  *    followed by Cmi_startfn.  The communication thread must be an infinite
1112  *    loop that calls the function CommunicationServer over and over,
1113  *    with a short delay between each call (ideally, the delay should end
1114  *    when a datagram arrives or when somebody notifies: see below).
1115  *
1116  * CmiGetState()
1117  *
1118  *    When called by a PE-thread, returns the processor-specific state
1119  *    structure for that PE.
1120  *
1121  * CmiGetStateN(int n)
1122  *
1123  *    returns processor-specific state structure for the PE of rank n.
1124  *
1125  * CmiMemLock() and CmiMemUnlock()
1126  *
1127  *    The memory module calls these functions to obtain mutual exclusion
1128  *    in the memory routines, and to keep interrupts from reentering malloc.
1129  *
1130  * CmiCommLock() and CmiCommUnlock()
1131  *
1132  *    These functions lock a mutex that insures mutual exclusion in the
1133  *    communication routines.
1134  *
1135  * CmiMyPe() and CmiMyRank()
1136  *
1137  *    The usual.  Implemented here, since a highly-optimized version
1138  *    is possible in the nonthreaded case.
1139  *
1140  *****************************************************************************/
1141
1142 #if CMK_SHARED_VARS_SUN_THREADS
1143
1144 static mutex_t memmutex;
1145 void CmiMemLock() { mutex_lock(&memmutex); }
1146 void CmiMemUnlock() { mutex_unlock(&memmutex); }
1147
1148 static thread_key_t Cmi_state_key;
1149 static CmiState     Cmi_state_vector;
1150
1151 CmiState CmiGetState()
1152 {
1153   CmiState result = 0;
1154   thr_getspecific(Cmi_state_key, (void **)&result);
1155   return result;
1156 }
1157
1158 int CmiMyPe()
1159 {
1160   CmiState result = 0;
1161   thr_getspecific(Cmi_state_key, (void **)&result);
1162   return result->pe;
1163 }
1164
1165 int CmiMyRank()
1166 {
1167   CmiState result = 0;
1168   thr_getspecific(Cmi_state_key, (void **)&result);
1169   return result->rank;
1170 }
1171
1172 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1173 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1174 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1175 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1176
1177 CmiNodeLock CmiCreateLock()
1178 {
1179   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(mutex_t));
1180   mutex_init(lk,0,0);
1181 }
1182
1183 void CmiDestroyLock(CmiNodeLock lk)
1184 {
1185   mutex_destroy(lk);
1186   free(lk);
1187 }
1188
1189 void CmiYield() { thr_yield(); }
1190
1191 #define CmiGetStateN(n) (Cmi_state_vector+(n))
1192
1193 static mutex_t comm_mutex;
1194 #define CmiCommLock() (mutex_lock(&comm_mutex))
1195 #define CmiCommUnlock() (mutex_unlock(&comm_mutex))
1196
1197 static void comm_thread(void)
1198 {
1199   struct timeval tmo; fd_set rfds;
1200   while (Cmi_shutdown_initiated == 0) {
1201     CmiCommLock();
1202     CommunicationServer();
1203     CmiCommUnlock();
1204     tmo.tv_sec = 0;
1205     tmo.tv_usec = Cmi_tickspeed;
1206     FD_ZERO(&rfds);
1207     FD_SET(dataskt, &rfds);
1208     FD_SET(ctrlskt, &rfds);
1209     select(FD_SETSIZE, &rfds, 0, 0, &tmo);
1210   }
1211   thr_exit(0);
1212 }
1213
1214 static void *call_startfn(void *vindex)
1215 {
1216   int index = (int)vindex;
1217   CmiState state = Cmi_state_vector + index;
1218   thr_setspecific(Cmi_state_key, state);
1219   ConverseInitPE();
1220   Cmi_startfn(CountArgs(Cmi_argv), Cmi_argv);
1221   if (Cmi_usrsched == 0) CsdScheduler(-1);
1222   ConverseExit();
1223   thr_exit(0);
1224 }
1225
1226 static void CmiStartThreads()
1227 {
1228   int i, pid, ok;
1229   
1230   thr_setconcurrency(Cmi_mynodesize);
1231   thr_keycreate(&Cmi_state_key, 0);
1232   Cmi_state_vector =
1233     (CmiState)calloc(Cmi_mynodesize, sizeof(struct CmiStateStruct));
1234   for (i=0; i<Cmi_mynodesize; i++)
1235     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
1236   for (i=1; i<Cmi_mynodesize; i++) {
1237     ok = thr_create(0, 256000, call_startfn, (void *)i, THR_BOUND, &pid);
1238     if (ok<0) { perror("thr_create"); exit(1); }
1239   }
1240   thr_setspecific(Cmi_state_key, Cmi_state_vector);
1241   ok = thr_create(0, 256000, (void *(*)(void *))comm_thread, 0, 0, &pid);
1242   if (ok<0) { perror("thr_create"); exit(1); }
1243 }
1244
1245 static void CmiJoinThreads()
1246 {
1247   int i, ok; void *status;
1248   for(i=0; i<Cmi_mynodesize; i++) {
1249     ok = thr_join(0, 0, &status);
1250     if (ok<0) perror("thr_join");
1251   }
1252 }
1253
1254 #endif
1255
1256 #if CMK_SHARED_VARS_UNAVAILABLE
1257
1258 static int memflag;
1259 void CmiMemLock() { memflag=1; }
1260 void CmiMemUnlock() { memflag=0; }
1261
1262 static struct CmiStateStruct Cmi_state;
1263 int Cmi_mype;
1264 int Cmi_myrank;
1265 #define CmiGetState() (&Cmi_state)
1266 #define CmiGetStateN(n) (&Cmi_state)
1267
1268 static int comm_flag;
1269 #define CmiCommLock() (comm_flag=1)
1270 #define CmiCommUnlock() (comm_flag=0)
1271
1272 void CmiYield() { jsleep(0,100); }
1273
1274 static void CommunicationInterrupt()
1275 {
1276   if (comm_flag) return;
1277   if (memflag) return;
1278   CommunicationServer();
1279 }
1280
1281 static void CmiStartThreads()
1282 {
1283   struct itimerval i;
1284   
1285   if ((Cmi_numpes != Cmi_numnodes) || (Cmi_mynodesize != 1))
1286     KillEveryone
1287       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
1288   
1289   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
1290   Cmi_mype = Cmi_nodestart;
1291   Cmi_myrank = 0;
1292   
1293 #if CMK_ASYNC_NOT_NEEDED
1294   CmiSignal(SIGALRM, 0, 0, CommunicationInterrupt);
1295 #else
1296   CmiSignal(SIGALRM, SIGIO, 0, CommunicationInterrupt);
1297   CmiEnableAsyncIO(dataskt);
1298   CmiEnableAsyncIO(ctrlskt);
1299 #endif
1300
1301   i.it_interval.tv_sec = 0;
1302   i.it_interval.tv_usec = Cmi_tickspeed;
1303   i.it_value.tv_sec = 0;
1304   i.it_value.tv_usec = Cmi_tickspeed;
1305   setitimer(ITIMER_REAL, &i, NULL);
1306 }
1307
1308 static void CmiJoinThreads() {}
1309
1310 #endif
1311
1312 CpvDeclare(void *, CmiLocalQueue);
1313
1314 /****************************************************************************
1315  *                                                                          
1316  * Fast shutdown                                                            
1317  *                                                                          
1318  ****************************************************************************/
1319
1320 static void KillIndividual(int ip,int port,int timeout,char *cmd,char *flag)
1321 {
1322   int fd;
1323   if (*flag) return;
1324   fd = skt_connect(ip, port, timeout);
1325   if (fd>=0) {
1326     writeall(fd, cmd, strlen(cmd));
1327     close(fd);
1328     *flag = 1;
1329   }
1330 }
1331
1332 static void KillEveryone(msg)
1333 char *msg;
1334 {
1335   char cmd[1024]; char *killed; char host=0; int i;
1336   if (nodes == 0) {
1337     fprintf(stderr,"%s\n", msg);
1338     exit(1);
1339   }
1340   sprintf(cmd,"die %s",msg);
1341   killed = (char *)calloc(1, Cmi_numnodes);
1342   KillIndividual(Cmi_host_IP, Cmi_host_port, 2, cmd, &host);
1343   for (i=0; i<Cmi_numnodes; i++)
1344     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 2, cmd, killed+i);
1345   KillIndividual(Cmi_host_IP, Cmi_host_port, 10, cmd, &host);
1346   for (i=0; i<Cmi_numnodes; i++)
1347     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 10, cmd, killed+i);
1348   KillIndividual(Cmi_host_IP, Cmi_host_port, 30, cmd, &host);
1349   for (i=0; i<Cmi_numnodes; i++)
1350     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 30, cmd, killed+i);
1351   log_done();
1352   ConverseCommonExit();
1353   exit(1);
1354 }
1355
1356 static void KillEveryoneCode(n)
1357 int n;
1358 {
1359   char buffer[1024];
1360   sprintf(buffer,"Internal error #%d (node %d)\n(Contact CHARM developers)\n",
1361           n,CmiMyPe());
1362   KillEveryone(buffer);
1363 }
1364
1365 static void KillOnSegv()
1366 {
1367   char buffer[1024];
1368   sprintf(buffer, "Node %d: Segmentation Fault.\n",CmiMyPe());
1369   KillEveryone(buffer);
1370 }
1371
1372 static void KillOnBus()
1373 {
1374   char buffer[1024];
1375   sprintf(buffer, "Node %d: Bus Error.\n",CmiMyPe());
1376   KillEveryone(buffer);
1377 }
1378
1379 static void KillOnIntr()
1380 {
1381   char buffer[1024];
1382   sprintf(buffer, "Node %d: Interrupted.\n",CmiMyPe());
1383   KillEveryone(buffer);
1384 }
1385
1386 static void KillOnCrash()
1387 {
1388   char buffer[1024];
1389   sprintf(buffer, "Node %d: Crashed.\n",CmiMyPe());
1390   KillEveryone(buffer);
1391 }
1392
1393 static void KillInit()
1394 {
1395   CmiSignal(SIGSEGV, 0, 0, KillOnSegv);
1396   CmiSignal(SIGBUS,  0, 0, KillOnBus);
1397   CmiSignal(SIGILL,  0, 0, KillOnCrash);
1398   CmiSignal(SIGABRT, 0, 0, KillOnCrash);
1399   CmiSignal(SIGFPE,  0, 0, KillOnCrash);
1400   CmiSignal(SIGPIPE, 0, 0, KillOnCrash);
1401   CmiSignal(SIGURG,  0, 0, KillOnCrash);
1402
1403 #ifdef SIGSYS
1404   CmiSignal(SIGSYS,  0, 0, KillOnCrash);
1405 #endif
1406
1407   CmiSignal(SIGTERM, 0, 0, KillOnIntr);
1408   CmiSignal(SIGQUIT, 0, 0, KillOnIntr);
1409   CmiSignal(SIGINT,  0, 0, KillOnIntr);
1410 }
1411
1412 /****************************************************************************
1413  *
1414  * ctrl_sendone
1415  *
1416  * Any thread can call this.  There's no need for concurrency control.
1417  *
1418  ****************************************************************************/
1419
1420 static void ctrl_sendone(va_alist) va_dcl
1421 {
1422   char buffer[1024];
1423   char *f; int fd, delay; char c;
1424   va_list p;
1425   va_start(p);
1426   delay = va_arg(p, int);
1427   f = va_arg(p, char *);
1428   vsprintf(buffer, f, p);
1429   fd = skt_connect(Cmi_host_IP, Cmi_host_port, delay);
1430   if (fd<0) KillEveryone("cannot contact host");
1431   writeall(fd, buffer, strlen(buffer));
1432 #if CMK_SYNCHRONIZE_ON_TCP_CLOSE
1433   shutdown(fd, 1);
1434   while (read(fd, &c, 1)==EINTR);
1435   close(fd);
1436 #else
1437   close(fd);
1438 #endif
1439 }
1440
1441 /****************************************************************************
1442  *
1443  * ctrl_getone
1444  *
1445  * This is handled only by the communication interrupt.
1446  *
1447  ****************************************************************************/
1448
1449 static void node_addresses_store();
1450
1451 static void ctrl_getone()
1452 {
1453   char line[10000];
1454   int ok, ip, port, fd;  FILE *f;
1455   skt_accept(ctrlskt, &ip, &port, &fd);
1456   f = fdopen(fd,"r");
1457   while (fgets(line, 9999, f)) {
1458     if (strncmp(line,"aval addr ",10)==0) {
1459       node_addresses_store(line);
1460     }
1461     else if (strncmp(line,"aval done ",10)==0) {
1462       Cmi_shutdown_initiated = 1;
1463     }
1464     else if (strncmp(line,"scanf-data ",11)==0) {
1465       Cmi_scanf_data=strdupl(line+11);
1466     } else if (strncmp(line,"die ",4)==0) {
1467       fprintf(stderr,"aborting: %s\n",line+4);
1468       log_done();
1469       ConverseCommonExit();
1470       exit(0);
1471     }
1472     else KillEveryoneCode(2932);
1473   }
1474   fclose(f);
1475   close(fd);
1476 }
1477
1478 /*****************************************************************************
1479  *
1480  * node_addresses
1481  *
1482  *  These two functions fill the node-table.
1483  *
1484  *
1485  *   This node, like all others, first sends its own address to the host
1486  *   using this command:
1487  *
1488  *     aset addr <my-nodeno> <ip-addr>.<ctrlport>.<dataport>.<nodestart>.<nodesize>
1489  *
1490  *   Then requests all addresses from the host using this command:
1491  *
1492  *     aget <my-ip-addr> <my-ctrlport> addr 0 <numnodes>
1493  *
1494  *   when the host has all the addresses, he sends a table to me:
1495  *
1496  *     aval addr <ip-addr-0>.<ctrlport-0>.<dataport-0> ...
1497  *
1498  *****************************************************************************/
1499
1500 static void node_addresses_obtain()
1501 {
1502   ctrl_sendone(120, "aset addr %d %s.%d.%d.%d.%d\n",
1503                Cmi_mynode, Cmi_self_IP_str, ctrlport, dataport,
1504                Cmi_nodestart, Cmi_mynodesize);
1505   ctrl_sendone(120, "aget %s %d addr 0 %d\n",
1506                Cmi_self_IP_str,ctrlport,Cmi_numnodes-1);
1507   while (nodes == 0) {
1508     if (wait_readable(ctrlskt, 300)<0)
1509       { perror("waiting for data"); KillEveryoneCode(21323); }
1510     ctrl_getone();
1511   }
1512 }
1513
1514 static void node_addresses_store(addrs) char *addrs;
1515 {
1516   char *p, *e; int i, j, lo, hi;
1517   OtherNode ntab, *bype;
1518   if (strncmp(addrs,"aval addr ",10)!=0) KillEveryoneCode(83473);
1519   ntab = (OtherNode)calloc(Cmi_numnodes, sizeof(struct OtherNodeStruct));
1520   p = skipblanks(addrs+10);
1521   p = parseint(p,&lo);
1522   p = parseint(p,&hi);
1523   if ((lo!=0)||(hi!=Cmi_numnodes - 1)) KillEveryoneCode(824793);
1524   for (i=0; i<Cmi_numnodes; i++) {
1525     unsigned int ip0,ip1,ip2,ip3,cport,dport,nodestart,nodesize,ip;
1526     p = parseint(p,&ip0);
1527     p = parseint(p,&ip1);
1528     p = parseint(p,&ip2);
1529     p = parseint(p,&ip3);
1530     p = parseint(p,&cport);
1531     p = parseint(p,&dport);
1532     p = parseint(p,&nodestart);
1533     p = parseint(p,&nodesize);
1534     ip = (ip0<<24)+(ip1<<16)+(ip2<<8)+ip3;
1535     ntab[i].nodestart = nodestart;
1536     ntab[i].nodesize  = nodesize;
1537     ntab[i].IP = ip;
1538     ntab[i].ctrlport = cport;
1539     ntab[i].dataport = dport;
1540     ntab[i].addr.sin_family      = AF_INET;
1541     ntab[i].addr.sin_port        = htons(dport);
1542     ntab[i].addr.sin_addr.s_addr = htonl(ip);
1543   }
1544   p = skipblanks(p);
1545   if (*p!=0) KillEveryoneCode(82283);
1546   bype = (OtherNode*)malloc(Cmi_numpes * sizeof(OtherNode));
1547   for (i=0; i<Cmi_numnodes; i++) {
1548     OtherNode node = ntab + i;
1549     node->send_window =
1550       (ImplicitDgram*)calloc(Cmi_window_size, sizeof(ImplicitDgram));
1551     node->recv_window =
1552       (ExplicitDgram*)calloc(Cmi_window_size, sizeof(ExplicitDgram));
1553     for (j=0; j<node->nodesize; j++)
1554       bype[j + node->nodestart] = node;
1555   }
1556   nodes_by_pe = bype;
1557   nodes = ntab;
1558 }
1559
1560 /*****************************************************************************
1561  *
1562  * CmiPrintf, CmiError, CmiScanf
1563  *
1564  *****************************************************************************/
1565
1566 static void InternalPrintf(f, l) char *f; va_list l;
1567 {
1568   char *p, *buf;
1569   char buffer[8192];
1570   vsprintf(buffer, f, l);
1571   buf = buffer;
1572   while (*buf) {
1573     p = strchr(buf, '\n');
1574     if (p) {
1575       *p=0; ctrl_sendone(120, "print %s\n", buf);
1576       *p='\n'; buf=p+1;
1577     } else {
1578       ctrl_sendone(120, "princ %s\n", buf);
1579       break;
1580     }
1581   }
1582 }
1583
1584 static void InternalError(f, l) char *f; va_list l;
1585 {
1586   char *p, *buf;
1587   char buffer[8192];
1588   vsprintf(buffer, f, l);
1589   buf = buffer;
1590   while (*buf) {
1591     p = strchr(buf, '\n');
1592     if (p) {
1593       *p=0; ctrl_sendone(120, "printerr %s\n", buf);
1594       *p='\n'; buf = p+1;
1595     } else {
1596       ctrl_sendone(120, "princerr %s\n", buf);
1597       break;
1598     }
1599   }
1600 }
1601
1602 static int InternalScanf(fmt, l)
1603     char *fmt;
1604     va_list l;
1605 {
1606   char *ptr[20];
1607   char *p; int nargs, i;
1608   nargs=0;
1609   p=fmt;
1610   while (*p) {
1611     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1612     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1613     if (p[0]=='%') { nargs++; p++; continue; }
1614     if (*p=='\n') *p=' '; p++;
1615   }
1616   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1617   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1618   CmiLock(Cmi_scanf_mutex);
1619   ctrl_sendone(120, "scanf %s %d %s", Cmi_self_IP_str, ctrlport, fmt);
1620   while (Cmi_scanf_data == 0) jsleep(0, 250000);
1621   i = sscanf(Cmi_scanf_data, fmt,
1622              ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1623              ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1624              ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1625   free(Cmi_scanf_data);
1626   Cmi_scanf_data=0;
1627   CmiUnlock(Cmi_scanf_mutex);
1628   return i;
1629 }
1630
1631 void CmiPrintf(va_alist) va_dcl
1632 {
1633   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1634   InternalPrintf(f, p);
1635 }
1636
1637 void CmiError(va_alist) va_dcl
1638 {
1639   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1640   InternalError(f, p);
1641 }
1642
1643 int CmiScanf(va_alist) va_dcl
1644 {
1645   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1646   return InternalScanf(f, p);
1647 }
1648
1649
1650 /******************************************************************************
1651  *
1652  * Transmission Code
1653  *
1654  *****************************************************************************/
1655
1656 void DiscardImplicitDgram(ImplicitDgram dg)
1657 {
1658   OutgoingMsg ogm;
1659   ogm = dg->ogm;
1660   ogm->refcount--;
1661   if (ogm->refcount == 0) {
1662     if (ogm->freemode == 'A') {
1663       ogm->freemode = 'X';
1664     } else {
1665       CmiFree(ogm->data);
1666       FreeOutgoingMsg(ogm);
1667     }
1668   }
1669   FreeImplicitDgram(dg);
1670 }
1671
1672 int TransmitAckDatagram(OtherNode node)
1673 {
1674   DgramAck ack; unsigned int i, seqno, slot; ExplicitDgram dg;
1675   
1676   seqno = node->recv_next;
1677   DgramHeaderMake(&ack, DGRAM_ACKNOWLEDGE, Cmi_nodestart, Cmi_host_pid, seqno);
1678   LOG(Cmi_clock, Cmi_nodestart, 'A', node->nodestart, seqno);
1679   for (i=0; i<Cmi_window_size; i++) {
1680     slot = seqno % Cmi_window_size;
1681     dg = node->recv_window[slot];
1682     ack.window[i] = (dg && (dg->seqno == seqno));
1683     seqno = ((seqno+1) & DGRAM_SEQNO_MASK);
1684   }
1685   sendto(dataskt, (char *)&ack,
1686          DGRAM_HEADER_SIZE + Cmi_window_size, 0,
1687          (struct sockaddr *)&(node->addr),
1688          sizeof(struct sockaddr_in));
1689 }
1690
1691 void TransmitImplicitDgram(ImplicitDgram dg)
1692 {
1693   char *data; DgramHeader *head; int len; DgramHeader temp;
1694   OtherNode dest;
1695
1696   len = dg->datalen;
1697   data = dg->dataptr;
1698   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
1699   temp = *head;
1700   dest = dg->dest;
1701   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_host_pid, dg->seqno);
1702   LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
1703   sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
1704               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
1705   *head = temp;
1706 }
1707
1708 void TransmitImplicitDgram1(ImplicitDgram dg)
1709 {
1710   char *data; DgramHeader *head; int len; DgramHeader temp;
1711   OtherNode dest;
1712
1713   len = dg->datalen;
1714   data = dg->dataptr;
1715   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
1716   temp = *head;
1717   dest = dg->dest;
1718   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_host_pid, dg->seqno);
1719   LOG(Cmi_clock, Cmi_nodestart, 'P', dest->nodestart, dg->seqno);
1720   sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
1721               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
1722   *head = temp;
1723 }
1724
1725 int TransmitAcknowledgement()
1726 {
1727   int skip; static int nextnode=0; OtherNode node;
1728   
1729   for (skip=0; skip<Cmi_numnodes; skip++) {
1730     node = nodes+nextnode;
1731     nextnode = (nextnode + 1) % Cmi_numnodes;
1732     if (node->recv_ack_cnt) {
1733       if ((node->recv_ack_cnt > Cmi_half_window) ||
1734           (Cmi_clock >= node->recv_ack_time)) {
1735         TransmitAckDatagram(node);
1736         if (node->recv_winsz) {
1737           node->recv_ack_cnt  = 1;
1738           node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
1739         } else {
1740           node->recv_ack_cnt  = 0;
1741           node->recv_ack_time = 0.0;
1742         }
1743         return 1;
1744       }
1745     }
1746   }
1747   return 0;
1748 }
1749
1750 int TransmitDatagram()
1751 {
1752   ImplicitDgram dg; OtherNode node;
1753   static int nextnode=0; unsigned int skip, count, slot, seqno;
1754   
1755   for (skip=0; skip<Cmi_numnodes; skip++) {
1756     node = nodes+nextnode;
1757     nextnode = (nextnode + 1) % Cmi_numnodes;
1758     dg = node->send_queue_h;
1759     if (dg) {
1760       seqno = dg->seqno;
1761       slot = seqno % Cmi_window_size;
1762       if (node->send_window[slot] == 0) {
1763         node->send_queue_h = dg->next;
1764         node->send_window[slot] = dg;
1765         TransmitImplicitDgram(dg);
1766         if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK))
1767           node->send_last = seqno;
1768         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
1769         return 1;
1770       }
1771     }
1772     if (Cmi_clock > node->send_primer) {
1773       slot = (node->send_last % Cmi_window_size);
1774       for (count=0; count<Cmi_window_size; count++) {
1775         dg = node->send_window[slot];
1776         if (dg) break;
1777         slot = ((slot-1) % Cmi_window_size);
1778       }
1779       if (dg) {
1780         TransmitImplicitDgram1(node->send_window[slot]);
1781         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
1782         return 1;
1783       }
1784     }
1785   }
1786   return 0;
1787 }
1788
1789 void EnqueueOutgoingDgram
1790         (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank)
1791 {
1792   unsigned int seqno, slot; int dst, src, dstrank; ImplicitDgram dg;
1793   src = ogm->src;
1794   dst = ogm->dst;
1795   seqno = node->send_next;
1796   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
1797   MallocImplicitDgram(dg);
1798   dg->dest = node;
1799   dg->srcpe = src;
1800   dg->rank = rank;
1801   dg->seqno = seqno;
1802   dg->dataptr = ptr;
1803   dg->datalen = len;
1804   dg->ogm = ogm;
1805   ogm->refcount++;
1806   dg->next = 0;
1807   if (node->send_queue_h == 0) {
1808     node->send_queue_h = dg;
1809     node->send_queue_t = dg;
1810   } else {
1811     node->send_queue_t->next = dg;
1812     node->send_queue_t = dg;
1813   }
1814 }
1815
1816 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank)
1817 {
1818   int size, seqno; char *data;
1819   
1820   size = ogm->size - DGRAM_HEADER_SIZE;
1821   data = ogm->data + DGRAM_HEADER_SIZE;
1822   while (size > Cmi_dgram_max_data) {
1823     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank);
1824     data += Cmi_dgram_max_data;
1825     size -= Cmi_dgram_max_data;
1826   }
1827   EnqueueOutgoingDgram(ogm, data, size, node, rank);
1828 }
1829
1830 void DeliverOutgoingMessage(OutgoingMsg ogm)
1831 {
1832   int i, rank, dst; OtherNode node;
1833   
1834   dst = ogm->dst;
1835   switch (dst) {
1836   case PE_BROADCAST_ALL:
1837     for (rank = 0; rank<Cmi_mynodesize; rank++)
1838       PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1839     for (i=0; i<Cmi_numnodes; i++)
1840       if (i!=Cmi_mynode)
1841         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1842     break;
1843   case PE_BROADCAST_OTHERS:
1844     for (rank = 0; rank<Cmi_mynodesize; rank++)
1845       if (rank + Cmi_nodestart != ogm->src)
1846         PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1847     for (i = 0; i<Cmi_numnodes; i++)
1848       if (i!=Cmi_mynode)
1849         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1850     break;
1851   default:
1852     node = nodes_by_pe[dst];
1853     rank = dst - node->nodestart;
1854     if (node->nodestart != Cmi_nodestart)
1855       DeliverViaNetwork(ogm, node, rank);
1856     else {
1857       if (ogm->freemode == 'A') {
1858         PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1859         ogm->freemode = 'X';
1860       } else {
1861         PCQueuePush(CmiGetStateN(rank)->recv, ogm->data);
1862         FreeOutgoingMsg(ogm);
1863       }
1864     }
1865   }
1866 }
1867
1868 void AssembleDatagram(OtherNode node, ExplicitDgram dg)
1869 {
1870   int i, size; char *msg;
1871   
1872   msg = node->asm_msg;
1873   if (msg == 0) {
1874     size = CmiMsgHeaderGetLength(dg->data);
1875     msg = (char *)CmiAlloc(size);
1876     if (size < dg->len) KillEveryoneCode(4559312);
1877     jmemcpy(msg, (char*)(dg->data), dg->len);
1878     node->asm_rank = dg->rank;
1879     node->asm_total = size;
1880     node->asm_fill = dg->len;
1881     node->asm_msg = msg;
1882   } else {
1883     size = dg->len - DGRAM_HEADER_SIZE;
1884     jmemcpy(msg + node->asm_fill, ((char*)(dg->data))+DGRAM_HEADER_SIZE, size);
1885     node->asm_fill += size;
1886   }
1887   if (node->asm_fill == node->asm_total) {
1888     if (node->asm_rank == DGRAM_BROADCAST) {
1889       int len = node->asm_total;
1890       for (i=1; i<Cmi_mynodesize; i++)
1891         PCQueuePush(CmiGetStateN(i)->recv, CopyMsg(msg, len));
1892       PCQueuePush(CmiGetStateN(0)->recv, msg);
1893     } else PCQueuePush(CmiGetStateN(node->asm_rank)->recv, msg);
1894     node->asm_msg = 0;
1895   }
1896   FreeExplicitDgram(dg);
1897 }
1898
1899 void AssembleReceivedDatagrams(OtherNode node)
1900 {
1901   unsigned int next, slot; ExplicitDgram dg;
1902   next = node->recv_next;
1903   while (1) {
1904     slot = (next % Cmi_window_size);
1905     dg = node->recv_window[slot];
1906     if (dg == 0) break;
1907     AssembleDatagram(node, dg);
1908     node->recv_window[slot] = 0;
1909     node->recv_winsz--;
1910     next = ((next + 1) & DGRAM_SEQNO_MASK);
1911   }
1912   node->recv_next = next;
1913 }
1914
1915 void IntegrateMessageDatagram(ExplicitDgram dg)
1916 {
1917   unsigned int seqno, slot; OtherNode node;
1918
1919   LOG(Cmi_clock, Cmi_nodestart, 'M', dg->srcpe, dg->seqno);
1920   node = nodes_by_pe[dg->srcpe];
1921   seqno = dg->seqno;
1922   node->recv_ack_cnt++;
1923   if (node->recv_ack_time == 0.0)
1924     node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
1925   if (((seqno - node->recv_next) & DGRAM_SEQNO_MASK) < Cmi_window_size) {
1926     slot = (seqno % Cmi_window_size);
1927     if (node->recv_window[slot] == 0) {
1928       node->recv_window[slot] = dg;
1929       node->recv_winsz++;
1930       if (seqno == node->recv_next)
1931         AssembleReceivedDatagrams(node);
1932       if (seqno > node->recv_expect)
1933         node->recv_ack_time = 0.0;
1934       if (seqno >= node->recv_expect)
1935         node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
1936       return;
1937     }
1938   }
1939   FreeExplicitDgram(dg);
1940 }
1941
1942 void IntegrateAckDatagram(ExplicitDgram dg)
1943 {
1944   OtherNode node; DgramAck *ack; ImplicitDgram idg;
1945   int i; unsigned int slot, rxing, dgseqno, seqno;
1946   
1947   node = nodes_by_pe[dg->srcpe];
1948   ack = ((DgramAck*)(dg->data));
1949   dgseqno = dg->seqno;
1950   seqno = (dgseqno + Cmi_window_size) & DGRAM_SEQNO_MASK;
1951   slot = seqno % Cmi_window_size;
1952   rxing = 0;
1953   LOG(Cmi_clock, Cmi_nodestart, 'R', node->nodestart, dg->seqno);
1954   for (i=Cmi_window_size-1; i>=0; i--) {
1955     slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size;
1956     seqno = (seqno-1) & DGRAM_SEQNO_MASK;
1957     idg = node->send_window[slot];
1958     if (idg) {
1959       if (idg->seqno == seqno) {
1960         if (ack->window[i]) {
1961           /*
1962           LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, seqno);
1963           */
1964           node->send_window[slot] = 0;
1965           DiscardImplicitDgram(idg);
1966           rxing = 1;
1967         } else if (rxing) {
1968           node->send_window[slot] = 0;
1969           idg->next = node->send_queue_h;
1970           if (node->send_queue_h == 0) {
1971             node->send_queue_t = idg;
1972           }
1973           node->send_queue_h = idg;
1974         }
1975       } else if (((idg->seqno - dgseqno) & DGRAM_SEQNO_MASK)>=Cmi_window_size){
1976         /*
1977         LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, idg->seqno);
1978         */
1979         node->send_window[slot] = 0;
1980         DiscardImplicitDgram(idg);
1981       }
1982     }
1983   }
1984   FreeExplicitDgram(dg);  
1985 }
1986
1987 void ReceiveDatagram()
1988 {
1989   ExplicitDgram dg; int ok, magic;
1990   MallocExplicitDgram(dg);
1991   ok = recv(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0);
1992   if (ok<0) KillEveryoneCode(37489437);
1993   dg->len = ok;
1994   if (ok >= DGRAM_HEADER_SIZE) {
1995     DgramHeaderBreak(dg->data, dg->rank, dg->srcpe, magic, dg->seqno);
1996     if (magic == Cmi_host_pid) {
1997       if (dg->rank == DGRAM_ACKNOWLEDGE)
1998         IntegrateAckDatagram(dg);
1999       else IntegrateMessageDatagram(dg);
2000     } else FreeExplicitDgram(dg);
2001   } else FreeExplicitDgram(dg);
2002 }
2003
2004 static void CommunicationServer()
2005 {
2006   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
2007   while (1) {
2008     Cmi_clock = GetClock();
2009     CheckSocketsReady();
2010     if (ctrlskt_ready_read) { ctrl_getone(); continue; }
2011     if (dataskt_ready_write) { if (TransmitAcknowledgement()) continue; }
2012     if (dataskt_ready_read) { ReceiveDatagram(); continue; }
2013     if (dataskt_ready_write) { if (TransmitDatagram()) continue; }
2014     break;
2015   }
2016 }
2017
2018 /******************************************************************************
2019  *
2020  * CmiGetNonLocal
2021  *
2022  * The design of this system is that the communication thread does all the
2023  * work, to eliminate as many locking issues as possible.  This is the only
2024  * part of the code that happens in the receiver-thread.
2025  *
2026  * This operation is fairly cheap, it might be worthwhile to inline
2027  * the code into CmiDeliverMsgs to reduce function call overhead.
2028  *
2029  *****************************************************************************/
2030
2031 char *CmiGetNonLocal()
2032 {
2033   CmiState cs = CmiGetState();
2034   void *result = PCQueuePop(cs->recv);
2035   return result;
2036 }
2037
2038 /******************************************************************************
2039  *
2040  * CmiNotifyIdle()
2041  *
2042  *****************************************************************************/
2043
2044 void CmiNotifyIdle()
2045 {
2046 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2047   struct timeval tv;
2048   tv.tv_sec=0; tv.tv_usec=5000;
2049   select(0,0,0,0,&tv);
2050 #else
2051   CmiCommLock();
2052   CommunicationServer();
2053   CmiCommUnlock();
2054 #endif
2055 }
2056
2057 /******************************************************************************
2058  *
2059  * CmiGeneralSend
2060  *
2061  *****************************************************************************/
2062
2063 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2064 {
2065   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2066
2067   if (freemode == 'S') {
2068     char *copy = (char *)CmiAlloc(size);
2069     memcpy(copy, data, size);
2070     data = copy; freemode = 'F';
2071   }
2072
2073   if (pe == cs->pe) {
2074     FIFO_EnQueue(cs->localqueue, data);
2075     if (freemode == 'A') {
2076       MallocOutgoingMsg(ogm);
2077       ogm->freemode = 'X';
2078       return ogm;
2079     } else return 0;
2080   }
2081   
2082   MallocOutgoingMsg(ogm);
2083   CmiMsgHeaderSetLength(data, size);
2084   ogm->size = size;
2085   ogm->data = data;
2086   ogm->src = cs->pe;
2087   ogm->dst = pe;
2088   ogm->freemode = freemode;
2089   ogm->refcount = 0;
2090   CmiCommLock();
2091   DeliverOutgoingMessage(ogm);
2092   CommunicationServer();
2093   CmiCommUnlock();
2094   return (CmiCommHandle)ogm;
2095 }
2096
2097 void CmiSyncSendFn(int p, int s, char *m)
2098 { CmiGeneralSend(p,s,'S',m); }
2099
2100 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2101 { return CmiGeneralSend(p,s,'A',m); }
2102
2103 void CmiFreeSendFn(int p, int s, char *m)
2104 { CmiGeneralSend(p,s,'F',m); }
2105
2106 void CmiSyncBroadcastFn(int s, char *m)
2107 { CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); }
2108
2109 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2110 { return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); }
2111
2112 void CmiFreeBroadcastFn(int s, char *m)
2113 { CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); }
2114
2115 void CmiSyncBroadcastAllFn(int s, char *m)
2116 { CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); }
2117
2118 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2119 { return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); }
2120
2121 void CmiFreeBroadcastAllFn(int s, char *m)
2122 { CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); }
2123
2124 /******************************************************************************
2125  *
2126  * Comm Handle manipulation.
2127  *
2128  *****************************************************************************/
2129
2130 int CmiAsyncMsgSent(CmiCommHandle handle)
2131 {
2132   return (((OutgoingMsg)handle)->freemode == 'X');
2133 }
2134
2135 void CmiReleaseCommHandle(CmiCommHandle handle)
2136 {
2137   FreeOutgoingMsg(((OutgoingMsg)handle));
2138 }
2139
2140 /******************************************************************************
2141  *
2142  * Main code, Init, and Exit
2143  *
2144  *****************************************************************************/
2145
2146 void ConverseInitPE()
2147 {
2148   CmiState cs = CmiGetState();
2149   CthInit(Cmi_argv);
2150   ConverseCommonInit(Cmi_argv);
2151   CpvInitialize(void *,CmiLocalQueue);
2152   CpvAccess(CmiLocalQueue) = cs->localqueue;
2153 }
2154
2155 void ConverseExit()
2156 {
2157   ctrl_sendone(120,"aset done %d TRUE\n",CmiMyPe());
2158   while (Cmi_shutdown_initiated == 0) CmiYield();
2159   ctrl_sendone(120,"ending\n");
2160   if (CmiMyRank()==0) {
2161     log_done();
2162     ConverseCommonExit();
2163     CmiJoinThreads();
2164   }
2165 }
2166
2167 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int ret)
2168 {
2169 #if CMK_USE_HP_MAIN_FIX
2170 #if FOR_CPLUS
2171   _main(argc,argv);
2172 #endif
2173 #endif
2174   Cmi_argv = argv;
2175   Cmi_startfn = fn;
2176   Cmi_usrsched = usc;
2177   parse_netstart();
2178   extract_args(argv);
2179   log_init();
2180   skt_datagram(&dataport, &dataskt, Cmi_os_buffer_size);
2181   skt_server(&ctrlport, &ctrlskt);
2182   KillInit();
2183   ctrl_sendone(120,"notify-die %s %d\n",Cmi_self_IP_str,ctrlport);
2184   ctrl_sendone(120,"aget %s %d done 0 %d\n",
2185                Cmi_self_IP_str,ctrlport,Cmi_numpes-1);
2186   node_addresses_obtain();
2187   Cmi_scanf_mutex = CmiCreateLock();
2188   CmiTimerInit();
2189   CmiStartThreads();
2190   ConverseInitPE();
2191   if (ret==0) {
2192     fn(CountArgs(argv), argv);
2193     if (usc==0) CsdScheduler(-1);
2194     ConverseExit();
2195   }
2196 }