*** empty log message ***
[charm.git] / src / arch / net / machine.c
1
2 /******************************************************************************
3  *
4  * THE DATAGRAM STREAM
5  *
6  * Messages are sent using UDP datagrams.  The sender allocates a
7  * struct for each datagram to be sent.  These structs stick around
8  * until slightly after the datagram is acknowledged.
9  *
10  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
11  * Each node has an OtherNode struct for every other node in the
12  * system.  The OtherNode struct contains:
13  *
14  *   send_queue   (all datagram-structs not yet transmitted)
15  *   send_window  (all datagram-structs transmitted but not ack'd)
16  *
17  * When an acknowledgement comes in, all packets in the send-window
18  * are either marked as acknowledged or pushed back into the send
19  * queue for retransmission.
20  *
21  * THE OUTGOING MESSAGE
22  *
23  * When you send or broadcast a message, the first thing the system
24  * does is system creates an OutgoingMsg struct to represent the
25  * operation.  The OutgoingMsg contains a very direct expression
26  * of what you want to do:
27  *
28  * OutgoingMsg:
29  *
30  *   size      --- size of message in bytes
31  *   data      --- pointer to the buffer containing the message
32  *   src       --- processor which sent the message
33  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
34  *   freemode  --- see below.
35  *   refcount  --- see below.
36  *
37  * The OutgoingMsg is kept around until the transmission is done, then
38  * it is garbage collected --- the refcount and freemode fields are
39  * to assist garbage collection.
40  *
41  * The freemode indicates which kind of buffer-management policy was
42  * used (sync, async, or freeing).  The sync policy is handled
43  * superficially by immediately converting sync sends into freeing
44  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
45  * (freeing).  If the freemode is 'F', then garbage collection
46  * involves freeing the data and the OutgoingMsg structure itself.  If
47  * the freemode is 'A', then the only cleanup is to change the
48  * freemode to 'X', a condition which is then detectable by
49  * CmiAsyncMsgSent.  In this case, the actual freeing of the
50  * OutgoingMsg is done by CmiReleaseCommHandle.
51  *
52  * When the transmission is initiated, the system computes how many
53  * datagrams need to be sent, total.  This number is stored in the
54  * refcount field.  Each time a datagram is delivered, the refcount
55  * is decremented, when it reaches zero, cleanup is performed.  There
56  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
57  * is a send (not a broadcast) and can be performed with shared
58  * memory, the entire datagram system is bypassed, the message is
59  * simply delivered and freed, not using the refcount mechanism at
60  * all.  Exception 2: If the message is a broadcast, then part of the
61  * broadcast that can be done via shared memory is performed prior to
62  * initiating the datagram/refcount system.
63  *
64  * DATAGRAM FORMATS AND MESSAGE FORMATS
65  *
66  * Datagrams have this format:
67  *
68  *   srcpe   (16 bits) --- source processor number.
69  *   magic   (16 bits) --- magic number to make sure DG is good.
70  *   dstrank ( 8 bits) --- destination processor rank.
71  *   seqno   (24 bits) --- packet sequence number.
72  *   data    (XX byte) --- user data.
73  *
74  * The only reason the srcpe is in there is because the receiver needs
75  * to know which receive window to use.  The dstrank field is needed
76  * because transmission is node-to-node.  Once the message is
77  * assembled by the node, it must be delivered to the appropriate PE.
78  * The dstrank field is used to encode certain special-case scenarios.
79  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
80  * and should be delivered to all processors in the node.  If the dstrank
81  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
82  * which case the srcpe is the number of the acknowledger, the seqno is
83  * always zero, and the user data is a list of the seqno's being
84  * acknowledged.  There may be other dstrank codes for special functions.
85  *
86  * To send a message, one chops it up into datagrams and stores those
87  * datagrams in a send-queue.  These outgoing datagrams aren't stored
88  * in the explicit format shown above.  Instead, they are stored as
89  * ImplicitDgrams, which contain the datagram header and a pointer to
90  * the user data (which is in the user message buffer, which is in the
91  * OutgoingMsg).  At transmission time these are combined together.
92
93  * The combination of the datagram header with the user's data is
94  * performed right in the user's message buffer.  Note that the
95  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
96  * of the user's message with a datagram header, sends the datagram
97  * straight from the user's message buffer, then restores the user's
98  * buffer to its original state.  There is a small problem with the
99  * first datagram of the message: one needs 64 bits of space to store
100  * the datagram header.  To make sure this space is there, we added a
101  * 64-bit unused space to the front of the Cmi message header.  In
102  * addition to this, we also add 32 bits to the Cmi message header
103  * to make room for a length-field, making it possible to identify
104  * message boundaries.
105  *
106  * CONCURRENCY CONTROL
107  *
108  * This has changed recently.
109  *
110  * EFFICIENCY NOTES
111  *
112  * The sender-side does little copying.  The async and freeing send
113  * routines do no copying at all.  The sync send routines copy the
114  * message, then use the freeing-send routines.  The other alternative
115  * is to not copy the message, and use the async send mechanism
116  * combined with a blocking wait.  Blocking wait seems like a bad
117  * idea, since it could take a VERY long time to get all those
118  * datagrams out the door.
119  *
120  * The receiver side, unfortunately, must copy.  To avoid copying,
121  * it would have to receive directly into a preallocated message buffer.
122  * Unfortunately, this can't work: there's no way to know how much
123  * memory to preallocate, and there's no way to know which datagram
124  * is coming next.  Thus, we receive into fixed-size (large) datagram
125  * buffers.  These are then inspected, and the messages extracted from
126  * them.
127  *
128  * Note that we are allocating a large number of structs: OutgoingMsg's,
129  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
130  * is a fixed-size structure.  Thus, we can do memory allocation by
131  * simply keeping a linked-list of unused structs around.  The only
132  * place where expensive memory allocation is performed is in the
133  * sync routines.
134  *
135  * Since the datagrams from one node to another are fully ordered,
136  * there is slightly more ordering than is needed: in theory, the
137  * datagrams of one message don't need to be ordered relative to the
138  * datagrams of another.  This was done to simplify the sequencing
139  * mechanisms: implementing a fully-ordered stream is much simpler
140  * than a partially-ordered one.  It also makes it possible to
141  * modularize, layering the message transmitter on top of the
142  * datagram-sequencer.  In other words, it was just easier this way.
143  * Hopefully, this won't cause serious degradation: LAN's rarely get
144  * datagrams out of order anyway.
145  *
146  * A potential efficiency problem is the lack of message-combining.
147  * One datagram could conceivably contain several messages.  This
148  * might be more efficient, it's not clear how much overhead is
149  * involved in sending a short datagram.  Message-combining isn't
150  * really ``integrated'' into the design of this software, but you
151  * could fudge it as follows.  Whenever you pull a short datagram from
152  * the send-queue, check the next one to see if it's also a short
153  * datagram.  If so, pack them together into a ``combined'' datagram.
154  * At the receive side, simply check for ``combined'' datagrams, and
155  * treat them as if they were simply two datagrams.  This would
156  * require extra copying.  I have no idea if this would be worthwhile.
157  *
158  *****************************************************************************/
159
160 /*****************************************************************************
161  *
162  * Include Files
163  *
164  ****************************************************************************/
165
166 /*
167  * I_Hate_C because the ansi prototype for a varargs function is incompatible
168  * with the K&R definition of that varargs function.  Eg, this doesn't compile:
169  *
170  * void CmiPrintf(char *, ...);
171  *
172  * void CmiPrintf(va_alist) va_dcl
173  * {
174  *    ...
175  * }
176  *
177  * I can't define the function in an ANSI way, because our stupid SUNs dont
178  * yet have stdarg.h, even though they have gcc (which is ANSI).  So I have
179  * to leave the definition of CmiPrintf as a K&R form, but I have to
180  * deactivate the protos or the compiler barfs.  That's why I_Hate_C.
181  *
182  */
183
184 #define CmiPrintf I_Hate_C_1
185 #define CmiError  I_Hate_C_2
186 #define CmiScanf  I_Hate_C_3
187 #include "converse.h"
188 #undef CmiPrintf
189 #undef CmiError
190 #undef CmiScanf
191 void CmiPrintf();
192 void CmiError();
193 int  CmiScanf();
194
195 #include <sys/types.h>
196 #include <stdio.h>
197 #include <ctype.h>
198 #include <fcntl.h>
199 #include <netinet/in.h>
200 #include <arpa/inet.h>
201 #include <netdb.h>
202 #include <rpc/rpc.h>
203 #include <errno.h>
204 #include <setjmp.h>
205 #include <pwd.h>
206 #include <stdlib.h>
207 #include <signal.h>
208 #include <varargs.h>
209 #include <unistd.h>
210 #include <sys/file.h>
211 #include <sys/param.h>
212 #include <sys/resource.h>
213 #include <sys/socket.h>
214 #include <sys/stat.h>
215 #include <sys/time.h>
216 #include <varargs.h>
217
218 #if CMK_STRINGS_USE_STRINGS_H
219 #include <strings.h>
220 #endif
221
222 #if CMK_STRINGS_USE_STRING_H
223 #include <string.h>
224 #endif
225
226 #if CMK_STRINGS_USE_OWN_DECLARATIONS
227 char *strchr(), *strrchr(), *strdup();
228 #endif
229
230 #if CMK_RSH_IS_A_COMMAND
231 #define RSH_CMD "rsh"
232 #endif
233 #if CMK_RSH_USE_REMSH
234 #define RSH_CMD "remsh"
235 #endif
236
237 static void KillEveryone();
238 static void KillEveryoneCode();
239 static void CommunicationServer();
240 extern int CmemInsideMem();
241 extern void CmemCallWhenMemAvail();
242 void ConverseInitPE(void);
243 void *FIFO_Create(void);
244 int   FIFO_Fill(void *);
245 void *FIFO_Peek(void *);
246 void  FIFO_Pop(void *);
247 void  FIFO_EnQueue(void *, void *);
248 void  FIFO_EnQueue_Front(void *, void *);
249
250 /*****************************************************************************
251  *
252  *     Utility routines for network machine interface.
253  *
254  *
255  * zap_newline(char *s)
256  *
257  *   - Remove the '\n' from the end of a string.
258  *
259  * char *skipblanks(char *s)
260  *
261  *   - advance pointer over blank characters
262  *
263  * char *skipstuff(char *s)
264  *
265  *   - advance pointer over nonblank characters
266  *
267  * char *strdupl(char *s)
268  *
269  *   - return a freshly-allocated duplicate of a string
270  *
271  *****************************************************************************/
272
273 static void zap_newline(s) char *s;
274 {
275   char *p;
276   p = s + strlen(s)-1;
277   if (*p == '\n') *p = '\0';
278 }
279
280 static char *skipblanks(p) char *p;
281 {
282   while ((*p==' ')||(*p=='\t')||(*p=='\n')) p++;
283   return p;
284 }
285
286 static char *skipstuff(p) char *p;
287 {
288   while ((*p)&&(*p!=' ')&&(*p!='\t')) p++;
289   return p;
290 }
291
292 static char *strdupl(s) char *s;
293 {
294   int len = strlen(s);
295   char *res = (char *)malloc(len+1);
296   strcpy(res, s);
297   return res;
298 }
299
300 double GetClock()
301 {
302   struct timeval tv; int ok;
303   ok = gettimeofday(&tv, NULL);
304   if (ok<0) KillEveryoneCode(9343112);
305   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
306 }
307
308 void jmemcpy(char *dst, char *src, int len)
309 {
310   char *sdend = (char *)(((int)(src + len)) & ~(sizeof(double)-1));
311   while (src != sdend) {
312     *((double*)dst) = *((double*)src);
313     dst+=sizeof(double); src+=sizeof(double);
314   }
315   len &= (sizeof(double)-1);
316   while (len) { *dst++ = *src++; len--; }
317 }
318
319 char *CopyMsg(char *msg, int len)
320 {
321   char *copy = (char *)CmiAlloc(len);
322   jmemcpy(copy, msg, len);
323   return copy;
324 }
325
326 static char *parseint(p, value) char *p; int *value;
327 {
328   int val = 0;
329   while (((*p)==' ')||((*p)=='.')) p++;
330   if (((*p)<'0')||((*p)>'9')) KillEveryone("badly-formed number");
331   while ((*p>='0')&&(*p<='9')) { val*=10; val+=(*p)-'0'; p++; }
332   *value = val;
333   return p;
334 }
335
336 static char *DeleteArg(argv)
337 char **argv;
338 {
339   char *res = argv[0];
340   if (res==0) KillEveryone("Illegal Arglist");
341   while (*argv) { argv[0]=argv[1]; argv++; }
342   return res;
343 }
344
345 static int CountArgs(char **argv)
346 {
347   int count=0;
348   while (argv[0]) { count++; argv++; }
349   return count;
350 }
351
352
353 static void jsleep(int sec, int usec)
354 {
355   int ntimes,i;
356   struct timeval tm;
357
358   ntimes = sec*200 + usec/5000;
359   for(i=0;i<ntimes;i++) {
360     tm.tv_sec = 0;
361     tm.tv_usec = 5000;
362     while(1) {
363       if (select(0,NULL,NULL,NULL,&tm)==0) break;
364       if (errno!=EINTR) return;
365     }
366   }
367 }
368
369 static void writeall(int fd, char *buf, int size)
370 {
371   int ok;
372   while (size) {
373     ok = write(fd, buf, size);
374     if (ok<=0) KillEveryone("write on tcp socket failed.");
375     size-=ok; buf+=ok;
376   }
377 }
378
379 static int wait_readable(fd, sec) int fd; int sec;
380 {
381   fd_set rfds;
382   struct timeval tmo;
383   int begin, nreadable;
384   
385   begin = time(0);
386   FD_ZERO(&rfds);
387   FD_SET(fd, &rfds);
388   while(1) {
389     tmo.tv_sec = (time(0) - begin) + sec;
390     tmo.tv_usec = 0;
391     nreadable = select(FD_SETSIZE, &rfds, NULL, NULL, &tmo);
392     if ((nreadable<0)&&(errno==EINTR)) continue;
393     if (nreadable == 0) { errno=ETIMEDOUT; return -1; }
394     return 0;
395   }
396 }
397
398 /**************************************************************************
399  *
400  * SKT - socket routines
401  *
402  *
403  * void skt_server(unsigned int *ppo, unsigned int *pfd)
404  *
405  *   - create a tcp server socket.  Performs the whole socket/bind/listen
406  *     procedure.  Returns the the port of the socket and the file descriptor.
407  *
408  * void skt_datagram(unsigned int *ppo, unsigned int *pfd, int bufsize)
409  *
410  *   - creates a UDP datagram socket.  Performs the whole socket/bind/
411  *     getsockname procedure.  Returns the port of the socket and
412  *     the file descriptor.  Bufsize, if nonzero, controls the amount
413  *     of buffer space the kernel sets aside for the socket.
414  *
415  * void skt_accept(int src,
416  *                 unsigned int *pip, unsigned int *ppo, unsigned int *pfd)
417  *
418  *   - accepts a connection to the specified socket.  Returns the
419  *     IP of the caller, the port number of the caller, and the file
420  *     descriptor to talk to the caller.
421  *
422  * int skt_connect(unsigned int ip, int port, int timeout)
423  *
424  *   - Opens a connection to the specified server.  Returns a socket for
425  *     communication.
426  *
427  *
428  **************************************************************************/
429
430 static void skt_server(ppo, pfd)
431 unsigned int *ppo;
432 unsigned int *pfd;
433 {
434   int fd= -1;
435   int ok, len;
436   struct sockaddr_in addr;
437   
438   retry: fd = socket(PF_INET, SOCK_STREAM, 0);
439   if ((fd<0)&&(errno==EINTR)) goto retry;
440   if (fd < 0) { perror("socket"); KillEveryoneCode(93483); }
441   memset(&addr, 0, sizeof(addr));
442   addr.sin_family = AF_INET;
443   ok = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
444   if (ok < 0) { perror("bind"); KillEveryoneCode(22933); }
445   ok = listen(fd,5);
446   if (ok < 0) { perror("listen"); KillEveryoneCode(3948); }
447   len = sizeof(addr);
448   ok = getsockname(fd, (struct sockaddr *)&addr, &len);
449   if (ok < 0) { perror("getsockname"); KillEveryoneCode(93583); }
450
451   *pfd = fd;
452   *ppo = ntohs(addr.sin_port);
453 }
454
455 static void skt_datagram(ppo, pfd, bufsize)
456 unsigned int *ppo;
457 unsigned int *pfd;
458 unsigned int  bufsize;
459 {
460   struct sockaddr_in name;
461   int length, ok, skt;
462   
463   /* Create data socket */
464   retry: skt = socket(AF_INET,SOCK_DGRAM,0);
465   if ((skt<0)&&(errno==EINTR)) goto retry;
466   if (skt < 0)
467     { perror("socket"); KillEveryoneCode(8934); }
468   name.sin_family = AF_INET;
469   name.sin_port = 0;
470   name.sin_addr.s_addr = htonl(INADDR_ANY);
471   if (bind(skt, (struct sockaddr *)&name, sizeof(name)) == -1)
472     { perror("binding data socket"); KillEveryoneCode(2983); }
473   length = sizeof(name);
474   if (getsockname(skt, (struct sockaddr *)&name , &length))
475     { perror("getting socket name"); KillEveryoneCode(39483); }
476
477   if (bufsize) {
478     int len = sizeof(int);
479     ok = setsockopt(skt, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len);
480     if (ok < 0) KillEveryoneCode(35782);
481     ok = setsockopt(skt, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len);
482     if (ok < 0) KillEveryoneCode(35783);
483   }     
484
485   *pfd = skt;
486   *ppo = htons(name.sin_port);
487 }
488
489 static void skt_accept(src, pip, ppo, pfd)
490 int src;
491 unsigned int *pip;
492 unsigned int *ppo;
493 unsigned int *pfd;
494 {
495   int i, fd, ok;
496   struct sockaddr_in remote;
497   i = sizeof(remote);
498  acc:
499   fd = accept(src, (struct sockaddr *)&remote, &i);
500   if ((fd<0)&&(errno==EINTR)) goto acc;
501   if (fd<0) { perror("accept"); KillEveryoneCode(39489); }
502   *pip=htonl(remote.sin_addr.s_addr);
503   *ppo=htons(remote.sin_port);
504   *pfd=fd;
505 }
506
507 static int skt_connect(ip, port, seconds)
508 unsigned int ip; int port; int seconds;
509 {
510   struct sockaddr_in remote; short sport=port;
511   int fd, ok, len, retry, begin;
512     
513   /* create an address structure for the server */
514   memset(&remote, 0, sizeof(remote));
515   remote.sin_family = AF_INET;
516   remote.sin_port = htons(sport);
517   remote.sin_addr.s_addr = htonl(ip);
518     
519   begin = time(0); ok= -1;
520   while (time(0)-begin < seconds) {
521   sock:
522     fd = socket(AF_INET, SOCK_STREAM, 0);
523     if ((fd<0)&&(errno==EINTR)) goto sock;
524     if (fd < 0) { perror("socket"); exit(1); }
525     
526   conn:
527     ok = connect(fd, (struct sockaddr *)&(remote), sizeof(remote));
528     if (ok>=0) break;
529     close(fd);
530     switch (errno) {
531     case EINTR: break;
532     case ECONNREFUSED: jsleep(1,0); break;
533     case EADDRINUSE: jsleep(1,0); break;
534     case EADDRNOTAVAIL: jsleep(5,0); break;
535     default: KillEveryone(strerror(errno));
536     }
537   }
538   if (ok<0) {
539     KillEveryone(strerror(errno)); exit(1);
540   }
541   return fd;
542 }
543
544 /*****************************************************************************
545  *
546  * Producer-Consumer Queues
547  *
548  * This queue implementation enables a producer and a consumer to
549  * communicate via a queue.  The queues are optimized for this situation,
550  * they don't require any operating system locks (they do require 32-bit
551  * reads and writes to be atomic.)  Cautions: there can only be one
552  * producer, and one consumer.  These queues cannot store null pointers.
553  *
554  ****************************************************************************/
555
556 #define PCQueueSize 0x100
557
558 typedef struct CircQueueStruct
559 {
560   struct CircQueueStruct *next;
561   int push;
562   int pull;
563   char *data[PCQueueSize];
564 }
565 *CircQueue;
566
567 typedef struct PCQueueStruct
568 {
569   CircQueue head;
570   CircQueue tail;
571 }
572 *PCQueue;
573
574 PCQueue PCQueueCreate()
575 {
576   CircQueue circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
577   PCQueue Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
578   Q->head = circ;
579   Q->tail = circ;
580   return Q;
581 }
582
583 char *PCQueuePop(PCQueue Q)
584 {
585   CircQueue circ; int pull; char *data;
586
587   while (1) {
588     circ = Q->head;
589     pull = circ->pull;
590     data = circ->data[pull];
591     if (data) {
592       circ->pull = (pull + 1) & (PCQueueSize-1);
593       circ->data[pull] = 0;
594       return data;
595     }
596     if (Q->tail == circ)
597       return 0;
598     Q->head = circ->next;
599     free(circ);
600   }
601 }
602
603 void PCQueuePush(PCQueue Q, char *data)
604 {
605   CircQueue circ; int push;
606   
607   circ = Q->tail;
608   push = circ->push;
609   if (circ->data[push] == 0) {
610     circ->data[push] = data;
611     circ->push = (push + 1) & (PCQueueSize-1);
612     return;
613   }
614   circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
615   circ->push = 1;
616   circ->data[0] = data;
617   Q->tail->next = circ;
618   Q->tail = circ;
619 }
620
621 /*****************************************************************************
622  *
623  * CmiAlloc, CmiSize, and CmiFree
624  *
625  * Note: this allocator is only used for messages.  Everything else
626  * is allocated with the less-expensive ``malloc''.
627  *
628  *****************************************************************************/
629
630 void *CmiAlloc(size)
631 int size;
632 {
633   char *res;
634   res =(char *)malloc(size+8);
635   if (res==0) KillEveryone("Memory allocation failed.");
636   ((int *)res)[0]=size;
637   return (void *)(res+8);
638 }
639
640 int CmiSize(blk)
641 void *blk;
642 {
643   return ((int *)(((char *)blk)-8))[0];
644 }
645
646 void CmiFree(blk)
647 void *blk;
648 {
649   free(((char *)blk)-8);
650 }
651
652
653 /*****************************************************************************
654  *                                                                           
655  * Neighbour-Lookup functions.                                               
656  *                                                                           
657  * the neighbour information is computed dynamically.  It imposes a
658  * (maybe partial) hypercube on the machine.
659  *                                                                           
660  *****************************************************************************/
661  
662 long CmiNumNeighbours(node)
663 int node;
664 {
665   int bit, count=0;
666   bit = 1;
667   while (1) {
668     int neighbour = node ^ bit;
669     if (neighbour < CmiNumPes()) count++;
670     bit = bit<<1; 
671     if (bit > CmiNumPes()) break;
672   }
673   return count;
674 }
675  
676 int CmiGetNodeNeighbours(node, neighbours)
677 int node, *neighbours;
678 {
679   int bit, count=0;
680   bit = 1;
681   while (1) {
682     int neighbour = node ^ bit;
683     if (neighbour < CmiNumPes()) neighbours[count++] = neighbour;
684     bit = bit<<1; 
685     if (bit > CmiNumPes()) break;
686   }
687   return count;
688 }
689  
690 int CmiNeighboursIndex(node, nbr)
691 int node, nbr;
692 {
693   int bit, count=0;
694   bit = 1;
695   while (1) {
696     int neighbour = node ^ bit;
697     if (neighbour < CmiNumPes()) { if (nbr==neighbour) return count; count++; }
698     bit = bit<<=1; 
699     if (bit > CmiNumPes()) break;
700   }
701   return(-1);
702 }
703
704 /*****************************************************************************
705  *
706  * Communication Structures
707  *
708  *****************************************************************************/
709
710 #define DGRAM_HEADER_SIZE 8
711
712 #define CmiMsgHeaderSetLength(msg, len) (((int*)(msg))[2] = (len))
713 #define CmiMsgHeaderGetLength(msg)      (((int*)(msg))[2])
714 #define CmiMsgNext(msg) (*((void**)(msg)))
715
716 #define DGRAM_SRCPE_MASK    (0xFFFF)
717 #define DGRAM_MAGIC_MASK    (0xFFFF)
718 #define DGRAM_SEQNO_MASK    (0xFFFFFF)
719
720 #define DGRAM_DSTRANK_MAX   (0xFC)
721 #define DGRAM_SIMPLEKILL    (0xFD)
722 #define DGRAM_BROADCAST     (0xFE)
723 #define DGRAM_ACKNOWLEDGE   (0xFF)
724
725
726
727 typedef struct { char data[DGRAM_HEADER_SIZE]; } DgramHeader;
728
729 typedef struct { DgramHeader head; char window[1024]; } DgramAck;
730
731 #define DgramHeaderMake(ptr, dstrank, srcpe, magic, seqno) { \
732    ((unsigned short *)ptr)[0] = srcpe; \
733    ((unsigned short *)ptr)[1] = magic; \
734    ((unsigned int *)ptr)[1] = (seqno<<8) | dstrank; \
735 }
736
737 #define DgramHeaderBreak(ptr, dstrank, srcpe, magic, seqno) { \
738    unsigned int tmp; \
739    srcpe = ((unsigned short *)ptr)[0]; \
740    magic = ((unsigned short *)ptr)[1]; \
741    tmp = ((unsigned int *)ptr)[1]; \
742    dstrank = (tmp&0xFF); seqno = (tmp>>8); \
743 }
744
745 #define PE_BROADCAST_OTHERS (-1)
746 #define PE_BROADCAST_ALL    (-2)
747
748
749 typedef struct OutgoingMsgStruct
750 {
751   struct OutgoingMsgStruct *next;
752   int   src, dst;
753   int   size;
754   char *data;
755   int   refcount;
756   int   freemode;
757 }
758 *OutgoingMsg;
759
760 typedef struct ExplicitDgramStruct
761 {
762   struct ExplicitDgramStruct *next;
763   int  srcpe, rank, seqno;
764   unsigned int len, dummy; /* dummy to fix bug in rs6k alignment */
765   double data[1];
766 }
767 *ExplicitDgram;
768
769 typedef struct ImplicitDgramStruct
770 {
771   struct ImplicitDgramStruct *next;
772   struct OtherNodeStruct *dest;
773   int srcpe, rank, seqno;
774   char  *dataptr;
775   int    datalen;
776   OutgoingMsg ogm;
777 }
778 *ImplicitDgram;
779
780 typedef struct OtherNodeStruct
781 {
782   int nodestart, nodesize;
783   unsigned int IP, dataport, ctrlport;
784   struct sockaddr_in addr;
785   
786   double                   send_primer;    /* time to send primer packet */
787   unsigned int             send_last;      /* seqno of last dgram sent */
788   ImplicitDgram           *send_window;    /* datagrams sent, not acked */
789   ImplicitDgram            send_queue_h;   /* head of send queue */
790   ImplicitDgram            send_queue_t;   /* tail of send queue */
791   unsigned int             send_next;      /* next seqno to go into queue */
792   
793   int                      asm_rank;
794   int                      asm_total;
795   int                      asm_fill;
796   char                    *asm_msg;
797   
798   int                      recv_ack_cnt; /* number of unacked dgrams */
799   double                   recv_ack_time;/* time when ack should be sent */
800   unsigned int             recv_expect;  /* next dgram to expect */
801   ExplicitDgram           *recv_window;  /* Packets received, not integrated */
802   int                      recv_winsz;   /* Number of packets in recv window */
803   unsigned int             recv_next;    /* Seqno of first missing packet */
804 }
805 *OtherNode;
806
807 typedef struct CmiStateStruct
808 {
809   int pe, rank;
810   PCQueue recv;
811   void *localqueue;
812 }
813 *CmiState;
814
815 void CmiStateInit(int pe, int rank, CmiState state)
816 {
817   state->pe = pe;
818   state->rank = rank;
819   state->recv = PCQueueCreate();
820   state->localqueue = FIFO_Create();
821 }
822
823
824 static ImplicitDgram Cmi_freelist_implicit;
825 static ExplicitDgram Cmi_freelist_explicit;
826 static OutgoingMsg   Cmi_freelist_outgoing;
827
828 #define FreeImplicitDgram(dg) {\
829   ImplicitDgram d=(dg);\
830   d->next = Cmi_freelist_implicit;\
831   Cmi_freelist_implicit = d;\
832 }
833
834 #define MallocImplicitDgram(dg) {\
835   ImplicitDgram d = Cmi_freelist_implicit;\
836   if (d==0) d = ((ImplicitDgram)malloc(sizeof(struct ImplicitDgramStruct)));\
837   else Cmi_freelist_implicit = d->next;\
838   dg = d;\
839 }
840
841 #define FreeExplicitDgram(dg) {\
842   ExplicitDgram d=(dg);\
843   d->next = Cmi_freelist_explicit;\
844   Cmi_freelist_explicit = d;\
845 }
846
847 #define MallocExplicitDgram(dg) {\
848   ExplicitDgram d = Cmi_freelist_explicit;\
849   if (d==0) d = ((ExplicitDgram)malloc \
850                    (sizeof(struct ExplicitDgramStruct) + Cmi_max_dgram_size));\
851   else Cmi_freelist_explicit = d->next;\
852   dg = d;\
853 }
854
855 /* Careful with these next two, need concurrency control */
856
857 #define FreeOutgoingMsg(m) (free(m))
858 #define MallocOutgoingMsg(m)\
859    (m=(OutgoingMsg)malloc(sizeof(struct OutgoingMsgStruct)))
860
861 /******************************************************************************
862  *
863  * Configuration Data
864  *
865  * This data is all read in from the NETSTART variable (provided by the
866  * host) and from the command-line arguments.  Once read in, it is never
867  * modified.
868  *
869  *****************************************************************************/
870
871
872 int               Cmi_numpes;    /* Total number of processors */
873 int               Cmi_mynodesize;/* Number of processors in my address space */
874 static int        Cmi_mynode;    /* Which address space am I */
875 static int        Cmi_numnodes;  /* Total number of address spaces */
876 static int        Cmi_nodestart; /* First processor in this address space */
877 static CmiStartFn Cmi_startfn;   /* The start function */
878 static int        Cmi_usrsched;  /* Continue after start function finishes? */
879 static char     **Cmi_argv;
880 static int        Cmi_host_IP;
881 static int        Cmi_self_IP;
882 static int        Cmi_host_port;
883 static int        Cmi_host_pid;
884 static char       Cmi_host_IP_str[16];
885 static char       Cmi_self_IP_str[16];
886
887 static int    Cmi_max_dgram_size;
888 static int    Cmi_os_buffer_size;
889 static int    Cmi_window_size;
890 static int    Cmi_half_window;
891 static double Cmi_delay_retransmit;
892 static double Cmi_ack_delay;
893 static int    Cmi_dgram_max_data;
894 static int    Cmi_tickspeed;
895
896 static void setspeed_atm()
897 {
898   Cmi_max_dgram_size   = 2048;
899   Cmi_os_buffer_size   = 50000;
900   Cmi_window_size      = 20;
901   Cmi_delay_retransmit = 0.0150;
902   Cmi_ack_delay        = 0.0035;
903   Cmi_tickspeed        = 10000;
904 }
905
906 static void setspeed_eth()
907 {
908   Cmi_max_dgram_size   = 2048;
909   Cmi_os_buffer_size   = 50000;
910   Cmi_window_size      = 20;
911   Cmi_delay_retransmit = 0.0400;
912   Cmi_ack_delay        = 0.0100;
913   Cmi_tickspeed        = 10000;
914 }
915
916 static void parse_netstart()
917 {
918   char *ns;
919   int nread;
920   ns = getenv("NETSTART");
921   if (ns==0) goto abort;
922   nread = sscanf(ns, "%d%d%d%d%d%d%d%d",
923                  &Cmi_numnodes, &Cmi_mynode,
924                  &Cmi_nodestart, &Cmi_mynodesize, &Cmi_numpes,
925                  &Cmi_self_IP, &Cmi_host_IP, &Cmi_host_port, &Cmi_host_pid);
926   if (nread!=8) goto abort;
927   sprintf(Cmi_self_IP_str,"%d.%d.%d.%d",
928           (Cmi_self_IP>>24)&0xFF,(Cmi_self_IP>>16)&0xFF,
929           (Cmi_self_IP>>8)&0xFF,Cmi_self_IP&0xFF);
930   sprintf(Cmi_host_IP_str,"%d.%d.%d.%d",
931           (Cmi_host_IP>>24)&0xFF,(Cmi_host_IP>>16)&0xFF,
932           (Cmi_host_IP>>8)&0xFF,Cmi_host_IP&0xFF);
933   return;
934  abort:
935   KillEveryone("program not started using 'conv-host' utility. aborting.\n");
936   exit(1);
937 }
938
939 static void extract_args(argv)
940 char **argv;
941 {
942   setspeed_eth();
943   while (*argv) {
944     if (strcmp(*argv,"++atm")==0) {
945       setspeed_atm();
946       DeleteArg(argv);
947     } else if (strcmp(*argv,"++eth")==0) {
948       setspeed_eth();
949       DeleteArg(argv);
950     } else argv++;
951   }
952   Cmi_dgram_max_data = Cmi_max_dgram_size - DGRAM_HEADER_SIZE;
953   Cmi_half_window = Cmi_window_size >> 1;
954   if ((Cmi_window_size * Cmi_max_dgram_size) > Cmi_os_buffer_size)
955     KillEveryone("Window size too big for OS buffer.");
956 }
957
958 /******************************************************************************
959  *
960  * Packet Performance Logging
961  *
962  * This module is designed to give a detailed log of the packets and their
963  * acknowledgements, for performance tuning.  It can be disabled.
964  *
965  *****************************************************************************/
966
967 #define LOGGING 0
968
969 #if LOGGING
970
971 typedef struct logent {
972   double time;
973   int seqno;
974   int srcpe;
975   int dstpe;
976   int kind;
977 } *logent;
978
979
980 logent log;
981 int    log_pos;
982 int    log_wrap;
983
984 static void log_init()
985 {
986   log = (logent)malloc(50000 * sizeof(struct logent));
987   log_pos = 0;
988   log_wrap = 0;
989 }
990
991 static void log_done()
992 {
993   char logname[100]; FILE *f; int i, size;
994   sprintf(logname, "log.%d", Cmi_mynode);
995   f = fopen(logname, "w");
996   if (f==0) { perror("fopen"); exit(1); }
997   if (log_wrap) size = 50000; else size=log_pos;
998   for (i=0; i<size; i++) {
999     logent ent = log+i;
1000     fprintf(f, "%1.4f %d %c %d %d\n",
1001             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
1002   }
1003   fclose(f);
1004 }
1005
1006 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
1007
1008 #endif
1009
1010
1011 #if !LOGGING
1012
1013 #define log_init() 0
1014 #define log_done() 0
1015 #define LOG(t,s,k,d,q) 0
1016
1017 #endif
1018
1019 /******************************************************************************
1020  *
1021  * Node state
1022  *
1023  *****************************************************************************/
1024
1025 static int        ctrlport, dataport, ctrlskt, dataskt;
1026
1027 static OtherNode *nodes_by_pe;  /* OtherNodes indexed by processor number */
1028 static OtherNode  nodes;        /* Indexed only by ``node number'' */
1029
1030 static int          Cmi_shutdown_done;
1031 static CmiNodeLock  Cmi_scanf_mutex;
1032 static char        *Cmi_scanf_data;
1033 static double       Cmi_clock;
1034
1035 /****************************************************************************
1036  *                                                                          
1037  * CheckSocketsReady
1038  *
1039  * Checks both sockets to see which are readable and which are writeable.
1040  * We check all these things at the same time since this can be done for
1041  * free with ``select.'' The result is stored in global variables, since
1042  * this is essentially global state information and several routines need it.
1043  *
1044  ***************************************************************************/
1045
1046 static int ctrlskt_ready_read;
1047 static int ctrlskt_ready_write;
1048 static int dataskt_ready_read;
1049 static int dataskt_ready_write;
1050
1051 void CheckSocketsReady()
1052 {
1053   static fd_set rfds; 
1054   static fd_set wfds; 
1055   struct timeval tmo;
1056   int nreadable;
1057   
1058   FD_SET(dataskt, &rfds);
1059   FD_SET(ctrlskt, &rfds);
1060   FD_SET(dataskt, &wfds);
1061   FD_SET(ctrlskt, &wfds);
1062   tmo.tv_sec = 0;
1063   tmo.tv_usec = 0;
1064   nreadable = select(FD_SETSIZE, &rfds, &wfds, NULL, &tmo);
1065   if (nreadable <= 0) {
1066     ctrlskt_ready_read = 0;
1067     ctrlskt_ready_write = 0;
1068     dataskt_ready_read = 0;
1069     dataskt_ready_write = 0;
1070     return;
1071   }
1072   ctrlskt_ready_read = (FD_ISSET(ctrlskt, &rfds));
1073   dataskt_ready_read = (FD_ISSET(dataskt, &rfds));
1074   ctrlskt_ready_write = (FD_ISSET(ctrlskt, &wfds));
1075   dataskt_ready_write = (FD_ISSET(dataskt, &wfds));
1076 }
1077
1078 /******************************************************************************
1079  *
1080  * OS Threads
1081  *
1082  * This version of converse is for multiple-processor workstations,
1083  * and we assume that the OS provides threads to gain access to those
1084  * multiple processors.  This section contains an interface layer for
1085  * the OS specific threads package.  It contains routines to start
1086  * the threads, routines to access their thread-specific state, and
1087  * routines to control mutual exclusion between them.
1088  *
1089  * In addition, we wish to support nonthreaded operation.  To do this,
1090  * we provide a version of these functions that uses the main/only thread
1091  * as a single PE, and simulates a communication thread using interrupts.
1092  *
1093  *
1094  * CmiStartThreads()
1095  *
1096  *    Allocates one CmiState structure per PE.  Initializes all of
1097  *    the CmiState structures using the function CmiStateInit.
1098  *    Starts processor threads 1..N (not 0, that's the one
1099  *    that calls CmiStartThreads), as well as the communication
1100  *    thread.  Each processor thread (other than 0) must call ConverseInitPE
1101  *    followed by Cmi_startfn.  The communication thread must be an infinite
1102  *    loop that calls the function CommunicationServer over and over,
1103  *    with a short delay between each call (ideally, the delay should end
1104  *    when a datagram arrives or when somebody notifies: see below).
1105  *
1106  * CmiGetState()
1107  *
1108  *    When called by a PE-thread, returns the processor-specific state
1109  *    structure for that PE.
1110  *
1111  * CmiGetStateN(int n)
1112  *
1113  *    returns processor-specific state structure for the PE of rank n.
1114  *
1115  * CmiMemLock() and CmiMemUnlock()
1116  *
1117  *    The memory module calls these functions to obtain mutual exclusion
1118  *    in the memory routines, and to keep interrupts from reentering malloc.
1119  *
1120  * CmiCommLock() and CmiCommUnlock()
1121  *
1122  *    These functions lock a mutex that insures mutual exclusion in the
1123  *    communication routines.
1124  *
1125  * CmiMyPe() and CmiMyRank()
1126  *
1127  *    The usual.  Implemented here, since a highly-optimized version
1128  *    is possible in the nonthreaded case.
1129  *
1130  *****************************************************************************/
1131
1132 #if CMK_SHARED_VARS_SUN_THREADS
1133
1134 static mutex_t memmutex;
1135 void CmiMemLock() { mutex_lock(&memmutex); }
1136 void CmiMemUnlock() { mutex_unlock(&memmutex); }
1137
1138 static thread_key_t Cmi_state_key;
1139 static CmiState     Cmi_state_vector;
1140
1141 CmiState CmiGetState()
1142 {
1143   CmiState result = 0;
1144   thr_getspecific(Cmi_state_key, (void **)&result);
1145   return result;
1146 }
1147
1148 int CmiMyPe()
1149 {
1150   CmiState result = 0;
1151   thr_getspecific(Cmi_state_key, (void **)&result);
1152   return result->pe;
1153 }
1154
1155 int CmiMyRank()
1156 {
1157   CmiState result = 0;
1158   thr_getspecific(Cmi_state_key, (void **)&result);
1159   return result->rank;
1160 }
1161
1162 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1163 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1164 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1165 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1166
1167 CmiNodeLock CmiCreateLock()
1168 {
1169   CmiNodeLock lk = (CmiNodeLock)malloc(sizeof(mutex_t));
1170   mutex_init(lk,0,0);
1171 }
1172
1173 void CmiDestroyLock(CmiNodeLock lk)
1174 {
1175   mutex_destroy(lk);
1176   free(lk);
1177 }
1178
1179 #define CmiYield() (thr_yield())
1180
1181 #define CmiGetStateN(n) (Cmi_state_vector+(n))
1182
1183 static mutex_t comm_mutex;
1184 #define CmiCommLock() (mutex_lock(&comm_mutex))
1185 #define CmiCommUnlock() (mutex_unlock(&comm_mutex))
1186
1187 static void comm_thread(void)
1188 {
1189   struct timeval tmo; fd_set rfds;
1190   while (Cmi_shutdown_done == 0) {
1191     CmiCommLock();
1192     CommunicationServer();
1193     CmiCommUnlock();
1194     tmo.tv_sec = 0;
1195     tmo.tv_usec = Cmi_tickspeed;
1196     FD_ZERO(&rfds);
1197     FD_SET(dataskt, &rfds);
1198     FD_SET(ctrlskt, &rfds);
1199     select(FD_SETSIZE, &rfds, 0, 0, &tmo);
1200   }
1201   thr_exit(0);
1202 }
1203
1204 static void *call_startfn(void *vindex)
1205 {
1206   int index = (int)vindex;
1207   CmiState state = Cmi_state_vector + index;
1208   thr_setspecific(Cmi_state_key, state);
1209   ConverseInitPE();
1210   Cmi_startfn(CountArgs(Cmi_argv), Cmi_argv);
1211   if (Cmi_usrsched == 0) CsdScheduler(-1);
1212   ConverseExit();
1213   thr_exit(0);
1214 }
1215
1216 static void CmiStartThreads()
1217 {
1218   int i, pid, ok;
1219   
1220   thr_setconcurrency(Cmi_mynodesize);
1221   thr_keycreate(&Cmi_state_key, 0);
1222   Cmi_state_vector =
1223     (CmiState)calloc(Cmi_mynodesize, sizeof(struct CmiStateStruct));
1224   for (i=0; i<Cmi_mynodesize; i++)
1225     CmiStateInit(i+Cmi_nodestart, i, CmiGetStateN(i));
1226   for (i=1; i<Cmi_mynodesize; i++) {
1227     ok = thr_create(0, 256000, call_startfn, (void *)i,
1228                     THR_DETACHED|THR_BOUND, &pid);
1229     if (ok<0) { perror("thr_create"); exit(1); }
1230   }
1231   thr_setspecific(Cmi_state_key, Cmi_state_vector);
1232   ok = thr_create(0, 256000, (void *(*)(void *))comm_thread,
1233                   0, THR_DETACHED, &pid);
1234   if (ok<0) { perror("thr_create"); exit(1); }
1235 }
1236
1237 #endif
1238
1239 #if CMK_SHARED_VARS_UNAVAILABLE
1240
1241 static int memflag;
1242 void CmiMemLock() { memflag=1; }
1243 void CmiMemUnlock() { memflag=0; }
1244
1245 static struct CmiStateStruct Cmi_state;
1246 int Cmi_mype;
1247 int Cmi_myrank;
1248 #define CmiGetState() (&Cmi_state)
1249 #define CmiGetStateN(n) (&Cmi_state)
1250
1251 static int comm_flag;
1252 #define CmiCommLock() (comm_flag=1)
1253 #define CmiCommUnlock() (comm_flag=0)
1254
1255 #define CmiYield() (sleep(1))
1256
1257 static void CommunicationInterrupt()
1258 {
1259   if (comm_flag) return;
1260   if (memflag) return;
1261   CommunicationServer();
1262 }
1263
1264 static void CmiStartThreads()
1265 {
1266   struct itimerval i;
1267   
1268   if ((Cmi_numpes != Cmi_numnodes) || (Cmi_mynodesize != 1))
1269     KillEveryone
1270       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
1271   
1272   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
1273   Cmi_mype = Cmi_nodestart;
1274   Cmi_myrank = 0;
1275   
1276 #if CMK_ASYNC_NOT_NEEDED
1277   CmiSignal(SIGALRM, 0, 0, CommunicationInterrupt);
1278 #else
1279   CmiSignal(SIGALRM, SIGIO, 0, CommunicationInterrupt);
1280   CmiEnableAsyncIO(dataskt);
1281   CmiEnableAsyncIO(ctrlskt);
1282 #endif
1283
1284   i.it_interval.tv_sec = 0;
1285   i.it_interval.tv_usec = Cmi_tickspeed;
1286   i.it_value.tv_sec = 0;
1287   i.it_value.tv_usec = Cmi_tickspeed;
1288   setitimer(ITIMER_REAL, &i, NULL);
1289 }
1290
1291 #endif
1292
1293 CpvDeclare(void *, CmiLocalQueue);
1294
1295 /****************************************************************************
1296  *                                                                          
1297  * Fast shutdown                                                            
1298  *                                                                          
1299  ****************************************************************************/
1300
1301 static void KillIndividual(int ip,int port,int timeout,char *cmd,char *flag)
1302 {
1303   int fd;
1304   if (*flag) return;
1305   fd = skt_connect(ip, port, timeout);
1306   if (fd>=0) {
1307     writeall(fd, cmd, strlen(cmd));
1308     close(fd);
1309     *flag = 1;
1310   }
1311 }
1312
1313 static void KillEveryone(msg)
1314 char *msg;
1315 {
1316   char cmd[1024]; char *killed; char host=0; int i;
1317   if (nodes == 0) {
1318     fprintf(stderr,"%s\n", msg);
1319     exit(1);
1320   }
1321   sprintf(cmd,"die %s",msg);
1322   killed = (char *)calloc(1, Cmi_numnodes);
1323   KillIndividual(Cmi_host_IP, Cmi_host_port, 2, cmd, &host);
1324   for (i=0; i<Cmi_numnodes; i++)
1325     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 2, cmd, killed+i);
1326   KillIndividual(Cmi_host_IP, Cmi_host_port, 10, cmd, &host);
1327   for (i=0; i<Cmi_numnodes; i++)
1328     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 10, cmd, killed+i);
1329   KillIndividual(Cmi_host_IP, Cmi_host_port, 30, cmd, &host);
1330   for (i=0; i<Cmi_numnodes; i++)
1331     KillIndividual(nodes[i].IP, nodes[i].ctrlport, 30, cmd, killed+i);
1332   log_done();
1333   exit(1);
1334 }
1335
1336 static void KillEveryoneCode(n)
1337 int n;
1338 {
1339   char buffer[1024];
1340   sprintf(buffer,"Internal error #%d (node %d)\n(Contact CHARM developers)\n",
1341           n,CmiMyPe());
1342   KillEveryone(buffer);
1343 }
1344
1345 static void KillOnSegv()
1346 {
1347   char buffer[1024];
1348   sprintf(buffer, "Node %d: Segmentation Fault.\n",CmiMyPe());
1349   KillEveryone(buffer);
1350 }
1351
1352 static void KillOnBus()
1353 {
1354   char buffer[1024];
1355   sprintf(buffer, "Node %d: Bus Error.\n",CmiMyPe());
1356   KillEveryone(buffer);
1357 }
1358
1359 static void KillOnIntr()
1360 {
1361   char buffer[1024];
1362   sprintf(buffer, "Node %d: Interrupted.\n",CmiMyPe());
1363   KillEveryone(buffer);
1364 }
1365
1366 static void KillOnCrash()
1367 {
1368   char buffer[1024];
1369   sprintf(buffer, "Node %d: Crashed.\n",CmiMyPe());
1370   KillEveryone(buffer);
1371 }
1372
1373 static void KillInit()
1374 {
1375   CmiSignal(SIGSEGV, 0, 0, KillOnSegv);
1376   CmiSignal(SIGBUS,  0, 0, KillOnBus);
1377   CmiSignal(SIGILL,  0, 0, KillOnCrash);
1378   CmiSignal(SIGABRT, 0, 0, KillOnCrash);
1379   CmiSignal(SIGFPE,  0, 0, KillOnCrash);
1380   CmiSignal(SIGPIPE, 0, 0, KillOnCrash);
1381   CmiSignal(SIGURG,  0, 0, KillOnCrash);
1382
1383 #ifdef SIGSYS
1384   CmiSignal(SIGSYS,  0, 0, KillOnCrash);
1385 #endif
1386
1387   CmiSignal(SIGTERM, 0, 0, KillOnIntr);
1388   CmiSignal(SIGQUIT, 0, 0, KillOnIntr);
1389   CmiSignal(SIGINT,  0, 0, KillOnIntr);
1390 }
1391
1392 /****************************************************************************
1393  *
1394  * ctrl_sendone
1395  *
1396  * Any thread can call this.  There's no need for concurrency control.
1397  *
1398  ****************************************************************************/
1399
1400 static void ctrl_sendone(va_alist) va_dcl
1401 {
1402   char buffer[1024];
1403   char *f; int fd, delay;
1404   va_list p;
1405   va_start(p);
1406   delay = va_arg(p, int);
1407   f = va_arg(p, char *);
1408   vsprintf(buffer, f, p);
1409   fd = skt_connect(Cmi_host_IP, Cmi_host_port, delay);
1410   if (fd<0) KillEveryone("cannot contact host");
1411   writeall(fd, buffer, strlen(buffer));
1412   close(fd);
1413 }
1414
1415 /****************************************************************************
1416  *
1417  * ctrl_getone
1418  *
1419  * This is handled only by the communication interrupt.
1420  *
1421  ****************************************************************************/
1422
1423 static void node_addresses_store();
1424
1425 static void ctrl_getone()
1426 {
1427   char line[10000];
1428   int ok, ip, port, fd;  FILE *f;
1429   skt_accept(ctrlskt, &ip, &port, &fd);
1430   f = fdopen(fd,"r");
1431   while (fgets(line, 9999, f)) {
1432     if (strncmp(line,"aval addr ",10)==0) {
1433       node_addresses_store(line);
1434     }
1435     else if (strncmp(line,"aval done ",10)==0) {
1436       Cmi_shutdown_done = 1;
1437     }
1438     else if (strncmp(line,"scanf-data ",11)==0) {
1439       Cmi_scanf_data=strdupl(line+11);
1440     } else if (strncmp(line,"die ",4)==0) {
1441       fprintf(stderr,"aborting: %s\n",line+4);
1442       log_done();
1443       exit(0);
1444     }
1445     else KillEveryoneCode(2932);
1446   }
1447   fclose(f);
1448   close(fd);
1449 }
1450
1451 /*****************************************************************************
1452  *
1453  * node_addresses
1454  *
1455  *  These two functions fill the node-table.
1456  *
1457  *
1458  *   This node, like all others, first sends its own address to the host
1459  *   using this command:
1460  *
1461  *     aset addr <my-nodeno> <ip-addr>.<ctrlport>.<dataport>.<nodestart>.<nodesize>
1462  *
1463  *   Then requests all addresses from the host using this command:
1464  *
1465  *     aget <my-ip-addr> <my-ctrlport> addr 0 <numnodes>
1466  *
1467  *   when the host has all the addresses, he sends a table to me:
1468  *
1469  *     aval addr <ip-addr-0>.<ctrlport-0>.<dataport-0> ...
1470  *
1471  *****************************************************************************/
1472
1473 static void node_addresses_obtain()
1474 {
1475   ctrl_sendone(120, "aset addr %d %s.%d.%d.%d.%d\n",
1476                Cmi_mynode, Cmi_self_IP_str, ctrlport, dataport,
1477                Cmi_nodestart, Cmi_mynodesize);
1478   ctrl_sendone(120, "aget %s %d addr 0 %d\n",
1479                Cmi_self_IP_str,ctrlport,Cmi_numnodes-1);
1480   while (nodes == 0) {
1481     if (wait_readable(ctrlskt, 300)<0)
1482       { perror("waiting for data"); KillEveryoneCode(21323); }
1483     ctrl_getone();
1484   }
1485 }
1486
1487 static void node_addresses_store(addrs) char *addrs;
1488 {
1489   char *p, *e; int i, j, lo, hi;
1490   OtherNode ntab, *bype;
1491   if (strncmp(addrs,"aval addr ",10)!=0) KillEveryoneCode(83473);
1492   ntab = (OtherNode)calloc(Cmi_numnodes, sizeof(struct OtherNodeStruct));
1493   p = skipblanks(addrs+10);
1494   p = parseint(p,&lo);
1495   p = parseint(p,&hi);
1496   if ((lo!=0)||(hi!=Cmi_numnodes - 1)) KillEveryoneCode(824793);
1497   for (i=0; i<Cmi_numnodes; i++) {
1498     unsigned int ip0,ip1,ip2,ip3,cport,dport,nodestart,nodesize,ip;
1499     p = parseint(p,&ip0);
1500     p = parseint(p,&ip1);
1501     p = parseint(p,&ip2);
1502     p = parseint(p,&ip3);
1503     p = parseint(p,&cport);
1504     p = parseint(p,&dport);
1505     p = parseint(p,&nodestart);
1506     p = parseint(p,&nodesize);
1507     ip = (ip0<<24)+(ip1<<16)+(ip2<<8)+ip3;
1508     ntab[i].nodestart = nodestart;
1509     ntab[i].nodesize  = nodesize;
1510     ntab[i].IP = ip;
1511     ntab[i].ctrlport = cport;
1512     ntab[i].dataport = dport;
1513     ntab[i].addr.sin_family      = AF_INET;
1514     ntab[i].addr.sin_port        = htons(dport);
1515     ntab[i].addr.sin_addr.s_addr = htonl(ip);
1516   }
1517   p = skipblanks(p);
1518   if (*p!=0) KillEveryoneCode(82283);
1519   bype = (OtherNode*)malloc(Cmi_numpes * sizeof(OtherNode));
1520   for (i=0; i<Cmi_numnodes; i++) {
1521     OtherNode node = ntab + i;
1522     node->send_window =
1523       (ImplicitDgram*)calloc(Cmi_window_size, sizeof(ImplicitDgram));
1524     node->recv_window =
1525       (ExplicitDgram*)calloc(Cmi_window_size, sizeof(ExplicitDgram));
1526     for (j=0; j<node->nodesize; j++)
1527       bype[j + node->nodestart] = node;
1528   }
1529   nodes_by_pe = bype;
1530   nodes = ntab;
1531 }
1532
1533 /*****************************************************************************
1534  *
1535  * CmiPrintf, CmiError, CmiScanf
1536  *
1537  *****************************************************************************/
1538
1539 static void InternalPrintf(f, l) char *f; va_list l;
1540 {
1541   char *p, *buf;
1542   char buffer[8192];
1543   vsprintf(buffer, f, l);
1544   buf = buffer;
1545   while (*buf) {
1546     p = strchr(buf, '\n');
1547     if (p) {
1548       *p=0; ctrl_sendone(120, "print %s\n", buf);
1549       *p='\n'; buf=p+1;
1550     } else {
1551       ctrl_sendone(120, "princ %s\n", buf);
1552       break;
1553     }
1554   }
1555 }
1556
1557 static void InternalError(f, l) char *f; va_list l;
1558 {
1559   char *p, *buf;
1560   char buffer[8192];
1561   vsprintf(buffer, f, l);
1562   buf = buffer;
1563   while (*buf) {
1564     p = strchr(buf, '\n');
1565     if (p) {
1566       *p=0; ctrl_sendone(120, "printerr %s\n", buf);
1567       *p='\n'; buf = p+1;
1568     } else {
1569       ctrl_sendone(120, "princerr %s\n", buf);
1570       break;
1571     }
1572   }
1573 }
1574
1575 static int InternalScanf(fmt, l)
1576     char *fmt;
1577     va_list l;
1578 {
1579   char *ptr[20];
1580   char *p; int nargs, i;
1581   nargs=0;
1582   p=fmt;
1583   while (*p) {
1584     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1585     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1586     if (p[0]=='%') { nargs++; p++; continue; }
1587     if (*p=='\n') *p=' '; p++;
1588   }
1589   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1590   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1591   CmiLock(Cmi_scanf_mutex);
1592   ctrl_sendone(120, "scanf %s %d %s", Cmi_self_IP_str, ctrlport, fmt);
1593   while (Cmi_scanf_data == 0) jsleep(0, 250000);
1594   i = sscanf(Cmi_scanf_data, fmt,
1595              ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1596              ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1597              ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1598   free(Cmi_scanf_data);
1599   Cmi_scanf_data=0;
1600   CmiUnlock(Cmi_scanf_mutex);
1601   return i;
1602 }
1603
1604 void CmiPrintf(va_alist) va_dcl
1605 {
1606   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1607   InternalPrintf(f, p);
1608 }
1609
1610 void CmiError(va_alist) va_dcl
1611 {
1612   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1613   InternalError(f, p);
1614 }
1615
1616 int CmiScanf(va_alist) va_dcl
1617 {
1618   va_list p; char *f; va_start(p); f = va_arg(p, char *);
1619   return InternalScanf(f, p);
1620 }
1621
1622
1623 /******************************************************************************
1624  *
1625  * Transmission Code
1626  *
1627  *****************************************************************************/
1628
1629 void DiscardImplicitDgram(ImplicitDgram dg)
1630 {
1631   OutgoingMsg ogm;
1632   ogm = dg->ogm;
1633   ogm->refcount--;
1634   if (ogm->refcount == 0) {
1635     if (ogm->freemode == 'A') {
1636       ogm->freemode = 'X';
1637     } else {
1638       CmiFree(ogm->data);
1639       FreeOutgoingMsg(ogm);
1640     }
1641   }
1642   FreeImplicitDgram(dg);
1643 }
1644
1645 int TransmitAckDatagram(OtherNode node)
1646 {
1647   DgramAck ack; unsigned int i, seqno, slot; ExplicitDgram dg;
1648   
1649   seqno = node->recv_next;
1650   DgramHeaderMake(&ack, DGRAM_ACKNOWLEDGE, Cmi_nodestart, Cmi_host_pid, seqno);
1651   LOG(Cmi_clock, Cmi_nodestart, 'A', node->nodestart, seqno);
1652   for (i=0; i<Cmi_window_size; i++) {
1653     slot = seqno % Cmi_window_size;
1654     dg = node->recv_window[slot];
1655     ack.window[i] = (dg && (dg->seqno == seqno));
1656     seqno = ((seqno+1) & DGRAM_SEQNO_MASK);
1657   }
1658   sendto(dataskt, (char *)&ack,
1659          DGRAM_HEADER_SIZE + Cmi_window_size, 0,
1660          (struct sockaddr *)&(node->addr),
1661          sizeof(struct sockaddr_in));
1662 }
1663
1664 void TransmitImplicitDgram(ImplicitDgram dg)
1665 {
1666   char *data; DgramHeader *head; int len; DgramHeader temp;
1667   OtherNode dest;
1668
1669   len = dg->datalen;
1670   data = dg->dataptr;
1671   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
1672   temp = *head;
1673   dest = dg->dest;
1674   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_host_pid, dg->seqno);
1675   LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
1676   sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
1677               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
1678   *head = temp;
1679 }
1680
1681 void TransmitImplicitDgram1(ImplicitDgram dg)
1682 {
1683   char *data; DgramHeader *head; int len; DgramHeader temp;
1684   OtherNode dest;
1685
1686   len = dg->datalen;
1687   data = dg->dataptr;
1688   head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
1689   temp = *head;
1690   dest = dg->dest;
1691   DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_host_pid, dg->seqno);
1692   LOG(Cmi_clock, Cmi_nodestart, 'P', dest->nodestart, dg->seqno);
1693   sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
1694               (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
1695   *head = temp;
1696 }
1697
1698 int TransmitAcknowledgement()
1699 {
1700   int skip; static int nextnode=0; OtherNode node;
1701   
1702   for (skip=0; skip<Cmi_numnodes; skip++) {
1703     node = nodes+nextnode;
1704     nextnode = (nextnode + 1) % Cmi_numnodes;
1705     if (node->recv_ack_cnt) {
1706       if ((node->recv_ack_cnt > Cmi_half_window) ||
1707           (Cmi_clock >= node->recv_ack_time)) {
1708         TransmitAckDatagram(node);
1709         if (node->recv_winsz) {
1710           node->recv_ack_cnt  = 1;
1711           node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
1712         } else {
1713           node->recv_ack_cnt  = 0;
1714           node->recv_ack_time = 0.0;
1715         }
1716         return 1;
1717       }
1718     }
1719   }
1720   return 0;
1721 }
1722
1723 int TransmitDatagram()
1724 {
1725   ImplicitDgram dg; OtherNode node;
1726   static int nextnode=0; unsigned int skip, count, slot, seqno;
1727   
1728   for (skip=0; skip<Cmi_numnodes; skip++) {
1729     node = nodes+nextnode;
1730     nextnode = (nextnode + 1) % Cmi_numnodes;
1731     dg = node->send_queue_h;
1732     if (dg) {
1733       seqno = dg->seqno;
1734       slot = seqno % Cmi_window_size;
1735       if (node->send_window[slot] == 0) {
1736         node->send_queue_h = dg->next;
1737         node->send_window[slot] = dg;
1738         TransmitImplicitDgram(dg);
1739         if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK))
1740           node->send_last = seqno;
1741         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
1742         return 1;
1743       }
1744     }
1745     if (Cmi_clock > node->send_primer) {
1746       slot = (node->send_last % Cmi_window_size);
1747       for (count=0; count<Cmi_window_size; count++) {
1748         dg = node->send_window[slot];
1749         if (dg) break;
1750         slot = ((slot-1) % Cmi_window_size);
1751       }
1752       if (dg) {
1753         TransmitImplicitDgram1(node->send_window[slot]);
1754         node->send_primer = Cmi_clock + Cmi_delay_retransmit;
1755         return 1;
1756       }
1757     }
1758   }
1759   return 0;
1760 }
1761
1762 void EnqueueOutgoingDgram
1763         (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank)
1764 {
1765   unsigned int seqno, slot; int dst, src, dstrank; ImplicitDgram dg;
1766   src = ogm->src;
1767   dst = ogm->dst;
1768   seqno = node->send_next;
1769   node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
1770   MallocImplicitDgram(dg);
1771   dg->dest = node;
1772   dg->srcpe = src;
1773   dg->rank = rank;
1774   dg->seqno = seqno;
1775   dg->dataptr = ptr;
1776   dg->datalen = len;
1777   dg->ogm = ogm;
1778   ogm->refcount++;
1779   dg->next = 0;
1780   if (node->send_queue_h == 0) {
1781     node->send_queue_h = dg;
1782     node->send_queue_t = dg;
1783   } else {
1784     node->send_queue_t->next = dg;
1785     node->send_queue_t = dg;
1786   }
1787 }
1788
1789 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank)
1790 {
1791   int size, seqno; char *data;
1792   
1793   size = ogm->size - DGRAM_HEADER_SIZE;
1794   data = ogm->data + DGRAM_HEADER_SIZE;
1795   while (size > Cmi_dgram_max_data) {
1796     EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank);
1797     data += Cmi_dgram_max_data;
1798     size -= Cmi_dgram_max_data;
1799   }
1800   EnqueueOutgoingDgram(ogm, data, size, node, rank);
1801 }
1802
1803 void DeliverOutgoingMessage(OutgoingMsg ogm)
1804 {
1805   int i, rank, dst; OtherNode node;
1806   
1807   dst = ogm->dst;
1808   switch (dst) {
1809   case PE_BROADCAST_ALL:
1810     for (rank = 0; rank<Cmi_mynodesize; rank++)
1811       PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1812     for (i=0; i<Cmi_numnodes; i++)
1813       if (i!=Cmi_mynode)
1814         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1815     break;
1816   case PE_BROADCAST_OTHERS:
1817     for (rank = 0; rank<Cmi_mynodesize; rank++)
1818       if (rank + Cmi_nodestart != ogm->src)
1819         PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1820     for (i = 0; i<Cmi_numnodes; i++)
1821       if (i!=Cmi_mynode)
1822         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1823     break;
1824   default:
1825     node = nodes_by_pe[dst];
1826     rank = dst - node->nodestart;
1827     if (node->nodestart != Cmi_nodestart)
1828       DeliverViaNetwork(ogm, node, rank);
1829     else {
1830       if (ogm->freemode == 'A') {
1831         PCQueuePush(CmiGetStateN(rank)->recv,CopyMsg(ogm->data,ogm->size));
1832         ogm->freemode = 'X';
1833       } else {
1834         PCQueuePush(CmiGetStateN(rank)->recv, ogm->data);
1835         FreeOutgoingMsg(ogm);
1836       }
1837     }
1838   }
1839 }
1840
1841 void AssembleDatagram(OtherNode node, ExplicitDgram dg)
1842 {
1843   int i, size; char *msg;
1844   
1845   msg = node->asm_msg;
1846   if (msg == 0) {
1847     size = CmiMsgHeaderGetLength(dg->data);
1848     msg = (char *)CmiAlloc(size);
1849     if (size < dg->len) KillEveryoneCode(4559312);
1850     jmemcpy(msg, (char*)(dg->data), dg->len);
1851     node->asm_rank = dg->rank;
1852     node->asm_total = size;
1853     node->asm_fill = dg->len;
1854     node->asm_msg = msg;
1855   } else {
1856     size = dg->len - DGRAM_HEADER_SIZE;
1857     jmemcpy(msg + node->asm_fill, ((char*)(dg->data))+DGRAM_HEADER_SIZE, size);
1858     node->asm_fill += size;
1859   }
1860   if (node->asm_fill == node->asm_total) {
1861     if (node->asm_rank == DGRAM_BROADCAST) {
1862       int len = node->asm_total;
1863       for (i=1; i<Cmi_mynodesize; i++)
1864         PCQueuePush(CmiGetStateN(i)->recv, CopyMsg(msg, len));
1865       PCQueuePush(CmiGetStateN(0)->recv, msg);
1866     } else PCQueuePush(CmiGetStateN(node->asm_rank)->recv, msg);
1867     node->asm_msg = 0;
1868   }
1869   FreeExplicitDgram(dg);
1870 }
1871
1872 void AssembleReceivedDatagrams(OtherNode node)
1873 {
1874   unsigned int next, slot; ExplicitDgram dg;
1875   next = node->recv_next;
1876   while (1) {
1877     slot = (next % Cmi_window_size);
1878     dg = node->recv_window[slot];
1879     if (dg == 0) break;
1880     AssembleDatagram(node, dg);
1881     node->recv_window[slot] = 0;
1882     node->recv_winsz--;
1883     next = ((next + 1) & DGRAM_SEQNO_MASK);
1884   }
1885   node->recv_next = next;
1886 }
1887
1888 void IntegrateMessageDatagram(ExplicitDgram dg)
1889 {
1890   unsigned int seqno, slot; OtherNode node;
1891
1892   LOG(Cmi_clock, Cmi_nodestart, 'M', dg->srcpe, dg->seqno);
1893   node = nodes_by_pe[dg->srcpe];
1894   seqno = dg->seqno;
1895   node->recv_ack_cnt++;
1896   if (node->recv_ack_time == 0.0)
1897     node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
1898   if (((seqno - node->recv_next) & DGRAM_SEQNO_MASK) < Cmi_window_size) {
1899     slot = (seqno % Cmi_window_size);
1900     if (node->recv_window[slot] == 0) {
1901       node->recv_window[slot] = dg;
1902       node->recv_winsz++;
1903       if (seqno == node->recv_next)
1904         AssembleReceivedDatagrams(node);
1905       if (seqno > node->recv_expect)
1906         node->recv_ack_time = 0.0;
1907       if (seqno >= node->recv_expect)
1908         node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
1909       return;
1910     }
1911   }
1912   FreeExplicitDgram(dg);
1913 }
1914
1915 void IntegrateAckDatagram(ExplicitDgram dg)
1916 {
1917   OtherNode node; DgramAck *ack; ImplicitDgram idg;
1918   int i; unsigned int slot, rxing, dgseqno, seqno;
1919   
1920   node = nodes_by_pe[dg->srcpe];
1921   ack = ((DgramAck*)(dg->data));
1922   dgseqno = dg->seqno;
1923   seqno = (dgseqno + Cmi_window_size) & DGRAM_SEQNO_MASK;
1924   slot = seqno % Cmi_window_size;
1925   rxing = 0;
1926   LOG(Cmi_clock, Cmi_nodestart, 'R', node->nodestart, dg->seqno);
1927   for (i=Cmi_window_size-1; i>=0; i--) {
1928     slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size;
1929     seqno = (seqno-1) & DGRAM_SEQNO_MASK;
1930     idg = node->send_window[slot];
1931     if (idg) {
1932       if (idg->seqno == seqno) {
1933         if (ack->window[i]) {
1934           /*
1935           LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, seqno);
1936           */
1937           node->send_window[slot] = 0;
1938           DiscardImplicitDgram(idg);
1939           rxing = 1;
1940         } else if (rxing) {
1941           node->send_window[slot] = 0;
1942           idg->next = node->send_queue_h;
1943           if (node->send_queue_h == 0) {
1944             node->send_queue_t = idg;
1945           }
1946           node->send_queue_h = idg;
1947         }
1948       } else if (((idg->seqno - dgseqno) & DGRAM_SEQNO_MASK)>=Cmi_window_size){
1949         /*
1950         LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, idg->seqno);
1951         */
1952         node->send_window[slot] = 0;
1953         DiscardImplicitDgram(idg);
1954       }
1955     }
1956   }
1957   FreeExplicitDgram(dg);  
1958 }
1959
1960 void ReceiveDatagram()
1961 {
1962   ExplicitDgram dg; int ok, magic;
1963   MallocExplicitDgram(dg);
1964   ok = recv(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0);
1965   if (ok<0) KillEveryoneCode(37489437);
1966   dg->len = ok;
1967   if (ok >= DGRAM_HEADER_SIZE) {
1968     DgramHeaderBreak(dg->data, dg->rank, dg->srcpe, magic, dg->seqno);
1969     if (magic == Cmi_host_pid) {
1970       if (dg->rank == DGRAM_ACKNOWLEDGE)
1971         IntegrateAckDatagram(dg);
1972       else IntegrateMessageDatagram(dg);
1973     } else FreeExplicitDgram(dg);
1974   } else FreeExplicitDgram(dg);
1975 }
1976
1977 static void CommunicationServer()
1978 {
1979   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
1980   while (1) {
1981     Cmi_clock = GetClock();
1982     CheckSocketsReady();
1983     if (ctrlskt_ready_read) { ctrl_getone(); continue; }
1984     if (dataskt_ready_write) { if (TransmitAcknowledgement()) continue; }
1985     if (dataskt_ready_read) { ReceiveDatagram(); continue; }
1986     if (dataskt_ready_write) { if (TransmitDatagram()) continue; }
1987     break;
1988   }
1989 }
1990
1991 /******************************************************************************
1992  *
1993  * CmiGetNonLocal
1994  *
1995  * The design of this system is that the communication thread does all the
1996  * work, to eliminate as many locking issues as possible.  This is the only
1997  * part of the code that happens in the receiver-thread.
1998  *
1999  * This operation is fairly cheap, it might be worthwhile to inline
2000  * the code into CmiDeliverMsgs to reduce function call overhead.
2001  *
2002  *****************************************************************************/
2003
2004 char *CmiGetNonLocal()
2005 {
2006   CmiState cs = CmiGetState();
2007   void *result = PCQueuePop(cs->recv);
2008   return result;
2009 }
2010
2011 /******************************************************************************
2012  *
2013  * CmiNotifyIdle()
2014  *
2015  *****************************************************************************/
2016
2017 void CmiNotifyIdle()
2018 {
2019 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2020   struct timeval tv;
2021   tv.tv_sec=0; tv.tv_usec=5000;
2022   select(0,0,0,0,&tv);
2023 #else
2024   CmiCommLock();
2025   CommunicationServer();
2026   CmiCommUnlock();
2027 #endif
2028 }
2029
2030 /******************************************************************************
2031  *
2032  * CmiGeneralSend
2033  *
2034  *****************************************************************************/
2035
2036 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2037 {
2038   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2039
2040   if (freemode == 'S') {
2041     char *copy = (char *)CmiAlloc(size);
2042     memcpy(copy, data, size);
2043     data = copy; freemode = 'F';
2044   }
2045
2046   if (pe == cs->pe) {
2047     FIFO_EnQueue(cs->localqueue, data);
2048     if (freemode == 'A') {
2049       MallocOutgoingMsg(ogm);
2050       ogm->freemode = 'X';
2051       return ogm;
2052     } else return 0;
2053   }
2054   
2055   MallocOutgoingMsg(ogm);
2056   CmiMsgHeaderSetLength(data, size);
2057   ogm->size = size;
2058   ogm->data = data;
2059   ogm->src = cs->pe;
2060   ogm->dst = pe;
2061   ogm->freemode = freemode;
2062   ogm->refcount = 0;
2063   CmiCommLock();
2064   DeliverOutgoingMessage(ogm);
2065   CommunicationServer();
2066   CmiCommUnlock();
2067   return (CmiCommHandle)ogm;
2068 }
2069
2070 void CmiSyncSendFn(int p, int s, char *m)
2071 { CmiGeneralSend(p,s,'S',m); }
2072
2073 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2074 { return CmiGeneralSend(p,s,'A',m); }
2075
2076 void CmiFreeSendFn(int p, int s, char *m)
2077 { CmiGeneralSend(p,s,'F',m); }
2078
2079 void CmiSyncBroadcastFn(int s, char *m)
2080 { CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); }
2081
2082 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2083 { return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); }
2084
2085 void CmiFreeBroadcastFn(int s, char *m)
2086 { CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); }
2087
2088 void CmiSyncBroadcastAllFn(int s, char *m)
2089 { CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); }
2090
2091 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2092 { return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); }
2093
2094 void CmiFreeBroadcastAllFn(int s, char *m)
2095 { CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); }
2096
2097 /******************************************************************************
2098  *
2099  * Comm Handle manipulation.
2100  *
2101  *****************************************************************************/
2102
2103 int CmiAsyncMsgSent(CmiCommHandle handle)
2104 {
2105   return (((OutgoingMsg)handle)->freemode == 'X');
2106 }
2107
2108 void CmiReleaseCommHandle(CmiCommHandle handle)
2109 {
2110   FreeOutgoingMsg(((OutgoingMsg)handle));
2111 }
2112
2113 /******************************************************************************
2114  *
2115  * Main code, Init, and Exit
2116  *
2117  *****************************************************************************/
2118
2119 void ConverseInitPE()
2120 {
2121   CmiState cs = CmiGetState();
2122   ConverseCommonInit(Cmi_argv);
2123   CthInit(Cmi_argv);
2124   CpvInitialize(void *,CmiLocalQueue);
2125   CpvAccess(CmiLocalQueue) = cs->localqueue;
2126 }
2127
2128 void ConverseExit()
2129 {
2130   ctrl_sendone(120,"aset done %d TRUE\n",CmiMyPe());
2131   while (Cmi_shutdown_done == 0) CmiYield();
2132   ctrl_sendone(120,"ending\n");
2133   if (CmiMyRank()==0) log_done();
2134 }
2135
2136 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int ret)
2137 {
2138 #if CMK_USE_HP_MAIN_FIX
2139 #if FOR_CPLUS
2140   _main(argc,argv);
2141 #endif
2142 #endif
2143   Cmi_argv = argv;
2144   Cmi_startfn = fn;
2145   Cmi_usrsched = usc;
2146   parse_netstart();
2147   extract_args(argv);
2148   log_init();
2149   skt_datagram(&dataport, &dataskt, Cmi_os_buffer_size);
2150   skt_server(&ctrlport, &ctrlskt);
2151   KillInit();
2152   ctrl_sendone(120,"notify-die %s %d\n",Cmi_self_IP_str,ctrlport);
2153   ctrl_sendone(120,"aget %s %d done 0 %d\n",
2154                Cmi_self_IP_str,ctrlport,Cmi_numpes-1);
2155   node_addresses_obtain();
2156   Cmi_scanf_mutex = CmiCreateLock();
2157   CmiTimerInit();
2158   CmiStartThreads();
2159   ConverseInitPE();
2160   if (ret==0) {
2161     fn(argc, argv);
2162     if (usc==0) CsdScheduler(-1);
2163     ConverseExit();
2164   }
2165 }