3cfb6b3e85e5b79900b3f9c702c2890651fc3e26
[charm.git] / src / arch / net / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8
9 /******************************************************************************
10  *
11  * THE DATAGRAM STREAM
12  *
13  * Messages are sent using UDP datagrams.  The sender allocates a
14  * struct for each datagram to be sent.  These structs stick around
15  * until slightly after the datagram is acknowledged.
16  *
17  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18  * Each node has an OtherNode struct for every other node in the
19  * system.  The OtherNode struct contains:
20  *
21  *   send_queue   (all datagram-structs not yet transmitted)
22  *   send_window  (all datagram-structs transmitted but not ack'd)
23  *
24  * When an acknowledgement comes in, all packets in the send-window
25  * are either marked as acknowledged or pushed back into the send
26  * queue for retransmission.
27  *
28  * THE OUTGOING MESSAGE
29  *
30  * When you send or broadcast a message, the first thing the system
31  * does is system creates an OutgoingMsg struct to represent the
32  * operation.  The OutgoingMsg contains a very direct expression
33  * of what you want to do:
34  *
35  * OutgoingMsg:
36  *
37  *   size      --- size of message in bytes
38  *   data      --- pointer to the buffer containing the message
39  *   src       --- processor which sent the message
40  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
41  *   freemode  --- see below.
42  *   refcount  --- see below.
43  *
44  * The OutgoingMsg is kept around until the transmission is done, then
45  * it is garbage collected --- the refcount and freemode fields are
46  * to assist garbage collection.
47  *
48  * The freemode indicates which kind of buffer-management policy was
49  * used (sync, async, or freeing).  The sync policy is handled
50  * superficially by immediately converting sync sends into freeing
51  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
52  * (freeing).  If the freemode is 'F', then garbage collection
53  * involves freeing the data and the OutgoingMsg structure itself.  If
54  * the freemode is 'A', then the only cleanup is to change the
55  * freemode to 'X', a condition which is then detectable by
56  * CmiAsyncMsgSent.  In this case, the actual freeing of the
57  * OutgoingMsg is done by CmiReleaseCommHandle.
58  *
59  * When the transmission is initiated, the system computes how many
60  * datagrams need to be sent, total.  This number is stored in the
61  * refcount field.  Each time a datagram is delivered, the refcount
62  * is decremented, when it reaches zero, cleanup is performed.  There
63  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
64  * is a send (not a broadcast) and can be performed with shared
65  * memory, the entire datagram system is bypassed, the message is
66  * simply delivered and freed, not using the refcount mechanism at
67  * all.  Exception 2: If the message is a broadcast, then part of the
68  * broadcast that can be done via shared memory is performed prior to
69  * initiating the datagram/refcount system.
70  *
71  * DATAGRAM FORMATS AND MESSAGE FORMATS
72  *
73  * Datagrams have this format:
74  *
75  *   srcpe   (16 bits) --- source processor number.
76  *   magic   ( 8 bits) --- magic number to make sure DG is good.
77  *   dstrank ( 8 bits) --- destination processor rank.
78  *   seqno   (32 bits) --- packet sequence number.
79  *   data    (XX byte) --- user data.
80  *
81  * The only reason the srcpe is in there is because the receiver needs
82  * to know which receive window to use.  The dstrank field is needed
83  * because transmission is node-to-node.  Once the message is
84  * assembled by the node, it must be delivered to the appropriate PE.
85  * The dstrank field is used to encode certain special-case scenarios.
86  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87  * and should be delivered to all processors in the node.  If the dstrank
88  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89  * which case the srcpe is the number of the acknowledger, the seqno is
90  * always zero, and the user data is a list of the seqno's being
91  * acknowledged.  There may be other dstrank codes for special functions.
92  *
93  * To send a message, one chops it up into datagrams and stores those
94  * datagrams in a send-queue.  These outgoing datagrams aren't stored
95  * in the explicit format shown above.  Instead, they are stored as
96  * ImplicitDgrams, which contain the datagram header and a pointer to
97  * the user data (which is in the user message buffer, which is in the
98  * OutgoingMsg).  At transmission time these are combined together.
99
100  * The combination of the datagram header with the user's data is
101  * performed right in the user's message buffer.  Note that the
102  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
103  * of the user's message with a datagram header, sends the datagram
104  * straight from the user's message buffer, then restores the user's
105  * buffer to its original state.  There is a small problem with the
106  * first datagram of the message: one needs 64 bits of space to store
107  * the datagram header.  To make sure this space is there, we added a
108  * 64-bit unused space to the front of the Cmi message header.  In
109  * addition to this, we also add 32 bits to the Cmi message header
110  * to make room for a length-field, making it possible to identify
111  * message boundaries.
112  *
113  * CONCURRENCY CONTROL
114  *
115  * This has changed recently.
116  *
117  * EFFICIENCY NOTES
118  *
119  * The sender-side does little copying.  The async and freeing send
120  * routines do no copying at all.  The sync send routines copy the
121  * message, then use the freeing-send routines.  The other alternative
122  * is to not copy the message, and use the async send mechanism
123  * combined with a blocking wait.  Blocking wait seems like a bad
124  * idea, since it could take a VERY long time to get all those
125  * datagrams out the door.
126  *
127  * The receiver side, unfortunately, must copy.  To avoid copying,
128  * it would have to receive directly into a preallocated message buffer.
129  * Unfortunately, this can't work: there's no way to know how much
130  * memory to preallocate, and there's no way to know which datagram
131  * is coming next.  Thus, we receive into fixed-size (large) datagram
132  * buffers.  These are then inspected, and the messages extracted from
133  * them.
134  *
135  * Note that we are allocating a large number of structs: OutgoingMsg's,
136  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
137  * is a fixed-size structure.  Thus, we can do memory allocation by
138  * simply keeping a linked-list of unused structs around.  The only
139  * place where expensive memory allocation is performed is in the
140  * sync routines.
141  *
142  * Since the datagrams from one node to another are fully ordered,
143  * there is slightly more ordering than is needed: in theory, the
144  * datagrams of one message don't need to be ordered relative to the
145  * datagrams of another.  This was done to simplify the sequencing
146  * mechanisms: implementing a fully-ordered stream is much simpler
147  * than a partially-ordered one.  It also makes it possible to
148  * modularize, layering the message transmitter on top of the
149  * datagram-sequencer.  In other words, it was just easier this way.
150  * Hopefully, this won't cause serious degradation: LAN's rarely get
151  * datagrams out of order anyway.
152  *
153  * A potential efficiency problem is the lack of message-combining.
154  * One datagram could conceivably contain several messages.  This
155  * might be more efficient, it's not clear how much overhead is
156  * involved in sending a short datagram.  Message-combining isn't
157  * really ``integrated'' into the design of this software, but you
158  * could fudge it as follows.  Whenever you pull a short datagram from
159  * the send-queue, check the next one to see if it's also a short
160  * datagram.  If so, pack them together into a ``combined'' datagram.
161  * At the receive side, simply check for ``combined'' datagrams, and
162  * treat them as if they were simply two datagrams.  This would
163  * require extra copying.  I have no idea if this would be worthwhile.
164  *
165  *****************************************************************************/
166
167 /*****************************************************************************
168  *
169  * Include Files
170  *
171  ****************************************************************************/
172
173 #include <stdarg.h> /*<- was <varargs.h>*/
174
175 #define CMK_USE_PRINTF_HACK 0
176 #if CMK_USE_PRINTF_HACK
177 /*HACK: turn printf into CmiPrintf, by just defining our own
178 external symbol "printf".  This may be more trouble than it's worth,
179 since the only advantage is that it works properly with +syncprint.
180
181 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
182 because they don't call printf.  Has to be defined up here because we probably 
183 haven't properly guessed this compiler's prototype for "printf".
184 */
185 static void InternalPrintf(const char *f, va_list l);
186 int printf(const char *fmt, ...) {
187         int nChar;
188         va_list p; va_start(p, fmt);
189         InternalPrintf(fmt,p);
190         va_end(p);
191         return 10;
192 }
193 #endif
194
195
196 #include "converse.h"
197 #include "memory-isomalloc.h"
198
199 #include <stdio.h>
200 #include <stdlib.h>
201 #include <ctype.h>
202 #include <fcntl.h>
203 #include <errno.h>
204 #include <setjmp.h>
205 #include <signal.h>
206 #include <string.h>
207
208 /* define machine debug */
209 #include "machine.h"
210
211 /******************* Producer-Consumer Queues ************************/
212 #include "pcqueue.h"
213
214 #include "machine-smp.h"
215
216 #if CMK_USE_POLL
217 #include <poll.h>
218 #endif
219
220 #if CMK_USE_GM
221 #include "gm.h"
222 struct gm_port *gmport = NULL;
223 int  portFinish = 0;
224 #endif
225
226 #include "conv-ccs.h"
227 #include "ccs-server.h"
228 #include "sockRoutines.h"
229
230 #if defined(_WIN32) && ! defined(__CYGWIN__)
231 /*For windows systems:*/
232 #  include <windows.h>
233 #  include <wincon.h>
234 #  include <sys/types.h>
235 #  include <sys/timeb.h>
236 #  define fdopen _fdopen
237 #  define SIGBUS -1  /*These signals don't exist in Win32*/
238 #  define SIGKILL -1
239 #  define SIGQUIT -1
240 #  define SIGTERM -1
241
242 #else /*UNIX*/
243 #  include <pwd.h>
244 #  include <unistd.h>
245 #  include <fcntl.h>
246 #  include <sys/file.h>
247 #endif
248
249 #if CMK_PERSISTENT_COMM
250 #include "persist_impl.h"
251 #endif
252
253 #define PRINTBUFSIZE 16384
254
255 static void CommunicationServer(int withDelayMs);
256 static void CommunicationServerThread(int withDelayMs);
257 void CmiHandleImmediate();
258 extern int CmemInsideMem();
259 extern void CmemCallWhenMemAvail();
260 static void ConverseRunPE(int everReturn);
261 void CmiYield(void);
262 void ConverseCommonExit(void);
263
264 static unsigned int dataport=0;
265 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
266 static SOCKET       dataskt;
267
268 /****************************************************************************
269  *
270  * Handling Errors
271  *
272  * Errors should be handled by printing a message on stderr and
273  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
274  * communication should be made.  The other processes will notice the
275  * abnormal termination and will deal with it.
276  *
277  * Rationale: if an error triggers an attempt to send a message,
278  * the attempt to send a message is likely to trigger another error,
279  * leading to an infinite loop and a process that spins instead of
280  * shutting down.
281  *
282  *****************************************************************************/
283
284 static int machine_initiated_shutdown=0;
285 static int already_in_signal_handler=0;
286
287 static void CmiDestoryLocks();
288
289 static void machine_exit(int status)
290 {
291   machine_initiated_shutdown=1;
292
293   CmiDestoryLocks();            /* destory locks to prevent dead locking */
294
295 #if CMK_USE_GM
296   if (gmport) { 
297     gm_close(gmport); gmport = 0;
298     gm_finalize();
299   }
300 #endif
301   exit(status);
302 }
303
304 static void charmrun_abort(const char*);
305
306 static void KillEveryone(const char *msg)
307 {
308   charmrun_abort(msg);
309   machine_exit(1);
310 }
311
312 static void KillEveryoneCode(n)
313 int n;
314 {
315   char _s[100];
316   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
317   charmrun_abort(_s);
318   machine_exit(1);
319 }
320
321 static void KillOnAllSigs(int sigNo)
322 {
323   const char *sig="unknown signal";
324   const char *suggestion="";
325   if (machine_initiated_shutdown ||
326       already_in_signal_handler) 
327         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
328   already_in_signal_handler=1;
329
330   CmiDestoryLocks();            /* destory locks */
331
332   if (sigNo==SIGSEGV) {
333      sig="segmentation violation";
334      suggestion="Try running with '++debug', or linking with '-memory paranoid'.\n";
335   }
336   if (sigNo==SIGFPE) {
337      sig="floating point exception";
338      suggestion="Check for integer or floating-point division by zero.\n";
339   }
340   if (sigNo==SIGBUS) {
341      sig="bus error";
342      suggestion="Check for misaligned reads or writes to memory.\n";
343   }
344   if (sigNo==SIGILL) {
345      sig="illegal instruction";
346      suggestion="Check for calls to uninitialized function pointers.\n";
347   }
348   if (sigNo==SIGKILL) sig="caught signal KILL";
349   if (sigNo==SIGQUIT) sig="caught signal QUIT";
350   if (sigNo==SIGTERM) sig="caught signal TERM";
351   MACHSTATE1(5,"     Caught signal %s ",sig);
352 /*ifdef this part*/
353 #ifdef __FAULT__
354   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
355                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
356   }else{
357 #else
358         {
359 #endif
360    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
361         "Signal: %s\n",CmiMyPe(),sig);
362         if (0!=suggestion[0])
363                 CmiError("Suggestion: %s",suggestion);
364         CmiPrintStackTrace(1);
365         charmrun_abort(sig);
366         machine_exit(1);                
367         }       
368 }
369
370 static void machine_atexit_check(void)
371 {
372   if (!machine_initiated_shutdown)
373     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
374   printf("Program finished.\n");
375 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
376   fgetc(stdin);
377 #endif
378 }
379
380 #if !defined(_WIN32) || defined(__CYGWIN__)
381 static void HandleUserSignals(int signum)
382 {
383   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
384   CcdRaiseCondition(condnum);
385 }
386 #endif
387
388 static void PerrorExit(const char *msg)
389 {
390   perror(msg);
391   machine_exit(1);
392 }
393
394 /*****************************************************************************
395  *
396  *     Utility routines for network machine interface.
397  *
398  *****************************************************************************/
399
400 /*
401 Horrific #defines to hide the differences between select() and poll().
402  */
403 #if CMK_USE_POLL /*poll() version*/
404 # define CMK_PIPE_DECL(delayMs) \
405         struct pollfd fds[10]; \
406         int nFds_sto=0; int *nFds=&nFds_sto; \
407         int pollDelayMs=delayMs;
408 # define CMK_PIPE_SUB fds,nFds
409 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
410
411 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
412 # define CMK_PIPE_ADDREAD(rd_fd) \
413         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
414 # define CMK_PIPE_ADDWRITE(wr_fd) \
415         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
416 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
417 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
418
419 #else /*select() version*/
420
421 # define CMK_PIPE_DECL(delayMs) \
422         fd_set rfds_sto,wfds_sto;\
423         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
424         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
425 # define CMK_PIPE_SUB rfds,wfds
426 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
427
428 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
429 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
430 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
431 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
432 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
433 #endif
434
435 static void CMK_PIPE_CHECKERR(void) {
436 #if defined(_WIN32) && !defined(__CYGWIN__)
437 /* Win32 socket seems to randomly return inexplicable errors
438 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
439         int err=WSAGetLastError();
440         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
441 o,err);
442 */
443 #else /*UNIX machine*/
444         if (errno!=EINTR)
445                 KillEveryone("Socket error in CheckSocketsReady!\n");
446 #endif
447 }
448
449
450 static void CmiStdoutFlush(void);
451 static int  CmiStdoutNeedsService(void);
452 static void CmiStdoutService(void);
453 static void CmiStdoutAdd(CMK_PIPE_PARAM);
454 static void CmiStdoutCheck(CMK_PIPE_PARAM);
455
456
457 double GetClock(void)
458 {
459 #if defined(_WIN32) && !defined(__CYGWIN__)
460   struct _timeb tv; 
461   _ftime(&tv);
462   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
463 #else
464   struct timeval tv; int ok;
465   ok = gettimeofday(&tv, NULL);
466   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
467   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
468 #endif
469 }
470
471 char *CopyMsg(char *msg, int len)
472 {
473   char *copy = (char *)CmiAlloc(len);
474   if (!copy)
475       fprintf(stderr, "Out of memory\n");
476   memcpy(copy, msg, len);
477   return copy;
478 }
479
480 /***********************************************************************
481  *
482  * Abort function:
483  *
484  ************************************************************************/
485
486 static int  Cmi_truecrash;
487 static int already_aborting=0;
488 void CmiAbort(const char *message)
489 {
490   if (already_aborting) machine_exit(1);
491   already_aborting=1;
492   MACHSTATE1(5,"CmiAbort(%s)",message);
493   
494   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
495         "Reason: %s\n",CmiMyPe(),message);
496   CmiPrintStackTrace(0);
497   
498   /*Send off any remaining prints*/
499   CmiStdoutFlush();
500   
501   if(Cmi_truecrash) {
502     printf("CHARM++ FATAL ERROR: %s\n", message);
503     *(int *)NULL = 0; /*Write to null, causing bus error*/
504   } else {
505     charmrun_abort(message);
506     machine_exit(1);
507   }
508 }
509
510
511 /******************************************************************************
512  *
513  * CmiEnableAsyncIO
514  *
515  * The net and tcp versions use a bunch of unix processes talking to each
516  * other via file descriptors.  We need for a signal SIGIO to be generated
517  * each time a message arrives, making it possible to write a signal
518  * handler to handle the messages.  The vast majority of unixes can,
519  * in fact, do this.  However, there isn't any standard for how this is
520  * supposed to be done, so each version of UNIX has a different set of
521  * calls to turn this signal on.  So, there is like one version here for
522  * every major brand of UNIX.
523  *
524  *****************************************************************************/
525
526 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
527 #include <fcntl.h>
528 void CmiEnableAsyncIO(int fd)
529 {
530   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
531     CmiError("setting socket owner: %s\n", strerror(errno)) ;
532     exit(1);
533   }
534   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
535     CmiError("setting socket async: %s\n", strerror(errno)) ;
536     exit(1);
537   }
538 }
539 #else
540 void CmiEnableAsyncIO(int fd) { }
541 #endif
542
543 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
544 #if !defined(_WIN32) || defined(__CYGWIN__)
545 void CmiEnableNonblockingIO(int fd) {
546   int on=1;
547   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
548     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
549     exit(1);
550   }
551 }
552 #else
553 void CmiEnableNonblockingIO(int fd) { }
554 #endif
555
556
557 /******************************************************************************
558  *
559  * Configuration Data
560  *
561  * This data is all read in from the NETSTART variable (provided by the
562  * charmrun) and from the command-line arguments.  Once read in, it is never
563  * modified.
564  *
565  *****************************************************************************/
566
567 int               _Cmi_mynode;    /* Which address space am I */
568 int               _Cmi_mynodesize;/* Number of processors in my address space */
569 int               _Cmi_numnodes;  /* Total number of address spaces */
570 int               _Cmi_numpes;    /* Total number of processors */
571 static int        Cmi_nodestart; /* First processor in this address space */
572 static skt_ip_t   Cmi_self_IP;
573 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
574 static int        Cmi_charmrun_port;
575 static int        Cmi_charmrun_pid;
576 static int        Cmi_charmrun_fd=-1;
577
578 static int    Cmi_netpoll;
579 static int    Cmi_idlepoll;
580 static int    Cmi_syncprint;
581 static int Cmi_print_stats = 0;
582
583 static void parse_netstart(void)
584 {
585   char *ns;
586   int nread;
587   int port;
588   ns = getenv("NETSTART");
589   if (ns!=0) 
590   {/*Read values set by Charmrun*/
591         char Cmi_charmrun_name[1024];
592         nread = sscanf(ns, "%d%s%d%d%d",
593                  &_Cmi_mynode,
594                  Cmi_charmrun_name, &Cmi_charmrun_port,
595                  &Cmi_charmrun_pid, &port);
596         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
597
598         if (nread!=5) {
599                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
600                 exit(1);
601         }
602   } else 
603   {/*No charmrun-- set flag values for standalone operation*/
604         _Cmi_mynode=0;
605         Cmi_charmrun_IP=_skt_invalid_ip;
606         Cmi_charmrun_port=0;
607         Cmi_charmrun_pid=0;
608         dataport = -1;
609   }
610 }
611
612 static void extract_common_args(char **argv)
613 {
614   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
615     Cmi_print_stats = 1;
616 }
617
618 /* for SMP */
619 #include "machine-smp.c"
620
621 CsvDeclare(CmiNodeState, NodeState);
622
623 /* Immediate message support */
624 #define CMI_DEST_RANK(msg)      *(int *)(msg)
625 #include "immediate.c"
626
627 /******************************************************************************
628  *
629  * Packet Performance Logging
630  *
631  * This module is designed to give a detailed log of the packets and their
632  * acknowledgements, for performance tuning.  It can be disabled.
633  *
634  *****************************************************************************/
635
636 #define LOGGING 0
637
638 #if LOGGING
639
640 typedef struct logent {
641   double time;
642   int seqno;
643   int srcpe;
644   int dstpe;
645   int kind;
646 } *logent;
647
648
649 logent log;
650 int    log_pos;
651 int    log_wrap;
652
653 static void log_init(void)
654 {
655   log = (logent)malloc(50000 * sizeof(struct logent));
656   _MEMCHECK(log);
657   log_pos = 0;
658   log_wrap = 0;
659 }
660
661 static void log_done(void)
662 {
663   char logname[100]; FILE *f; int i, size;
664   sprintf(logname, "log.%d", _Cmi_mynode);
665   f = fopen(logname, "w");
666   if (f==0) KillEveryone("fopen problem");
667   if (log_wrap) size = 50000; else size=log_pos;
668   for (i=0; i<size; i++) {
669     logent ent = log+i;
670     fprintf(f, "%1.4f %d %c %d %d\n",
671             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
672   }
673   fclose(f);
674 }
675
676 void printLog(void)
677 {
678   char logname[100]; FILE *f; int i, j, size;
679   static int logged = 0;
680   if (logged)
681       return;
682   logged = 1;
683   CmiPrintf("Logging: %d\n", _Cmi_mynode);
684   sprintf(logname, "log.%d", _Cmi_mynode);
685   f = fopen(logname, "w");
686   if (f==0) KillEveryone("fopen problem");
687   for (i = 5000; i; i--)
688   {
689   /*for (i=0; i<size; i++) */
690     j = log_pos - i;
691     if (j < 0)
692     {
693         if (log_wrap)
694             j = 5000 + j;
695         else
696             j = 0;
697     };
698     {
699     logent ent = log+j;
700     fprintf(f, "%1.4f %d %c %d %d\n",
701             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
702     }
703   }
704   fclose(f);
705   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
706 }
707
708 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
709
710 #endif
711
712 #if !LOGGING
713
714 #define log_init() /*empty*/
715 #define log_done() /*empty*/
716 #define printLog() /*empty*/
717 #define LOG(t,s,k,d,q) /*empty*/
718
719 #endif
720
721 /******************************************************************************
722  *
723  * Node state
724  *
725  *****************************************************************************/
726
727
728 static CmiNodeLock    Cmi_scanf_mutex;
729 static double         Cmi_clock;
730 static double         Cmi_check_delay = 3.0;
731
732 /******************************************************************************
733  *
734  * OS Threads
735  * SMP implementation moved to machine-smp.c
736  *****************************************************************************/
737
738 /************************ No kernel SMP threads ***************/
739 #if CMK_SHARED_VARS_UNAVAILABLE
740
741 static volatile int memflag=0;
742 void CmiMemLock() { memflag++; }
743 void CmiMemUnlock() { memflag--; }
744
745 static volatile int comm_flag=0;
746 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
747 #ifndef MACHLOCK_DEBUG
748 #  define CmiCommLock() (comm_flag=1)
749 #  define CmiCommUnlock() (comm_flag=0)
750 #else /* Error-checking flag locks */
751 void CmiCommLock(void) {
752   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
753   comm_flag=1;
754 }
755 void CmiCommUnlock(void) {
756   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
757   comm_flag=0;
758 }
759 #endif
760
761 static struct CmiStateStruct Cmi_state;
762 int _Cmi_mype;
763 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
764 #define CmiGetState() (&Cmi_state)
765 #define CmiGetStateN(n) (&Cmi_state)
766
767 void CmiYield(void) { sleep(0); }
768
769 static void CommunicationInterrupt(int ignored)
770 {
771   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
772   if (memflag || comm_flag || _immRunning) 
773   { /* Already busy inside malloc, comm, or immediate messages */
774     MACHSTATE(5,"--SKIPPING SIGIO--");
775     return;
776   }
777   MACHSTATE(2,"--BEGIN SIGIO--")
778   {
779     /*Make sure any malloc's we do in here are NOT migratable:*/
780     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
781 /*    _Cmi_myrank=1; */
782     CommunicationServerThread(0);
783 /*    _Cmi_myrank=0; */
784     CmiIsomallocBlockListActivate(oldList);
785   }
786   MACHSTATE(2,"--END SIGIO--")
787 }
788
789 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
790
791 static void CmiStartThreads(char **argv)
792 {
793   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
794     KillEveryone
795       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
796   
797   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
798   _Cmi_mype = Cmi_nodestart;
799
800   /* Prepare Cpv's for immediate messages: */
801   _Cmi_myrank=1;
802   CommunicationServerInit();
803   _Cmi_myrank=0;
804   
805 #if !CMK_ASYNC_NOT_NEEDED
806   if (!Cmi_netpoll) {
807     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
808     if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
809     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
810   }
811 #endif
812 }
813
814 static void CmiDestoryLocks()
815 {
816   comm_flag = 0;
817   memflag = 0;
818 }
819
820 #endif
821
822 CpvDeclare(void *, CmiLocalQueue);
823
824
825 #ifndef CmiMyPe
826 int CmiMyPe(void) 
827
828   return CmiGetState()->pe; 
829 }
830 #endif
831 #ifndef CmiMyRank
832 int CmiMyRank(void)
833 {
834   return CmiGetState()->rank;
835 }
836 #endif
837
838 /*Add a message to this processor's receive queue 
839   Must be called while holding comm. lock
840 */
841 static void CmiPushPE(int pe,void *msg)
842 {
843   CmiState cs=CmiGetStateN(pe);
844   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
845   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
846
847 #if CMK_IMMEDIATE_MSG
848   if (CmiIsImmediate(msg)) {
849     CmiPushImmediateMsg(msg);
850     return;
851   }
852 #endif
853   PCQueuePush(cs->recv,msg);
854   CmiIdleLock_addMessage(&cs->idle);
855 }
856
857 #if CMK_NODE_QUEUE_AVAILABLE
858 /*Add a message to the node queue.  
859   Must be called while holding comm. lock
860 */
861 static void CmiPushNode(void *msg)
862 {
863   CmiState cs=CmiGetStateN(0);
864   MACHSTATE(2,"Pushing message into node queue");
865   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
866   
867 #if CMK_IMMEDIATE_MSG
868   if (CmiIsImmediate(msg)) {
869   MACHSTATE(2,"Pushing Immediate message into queue");
870     CmiPushImmediateMsg(msg);
871     return;
872   }
873 #endif 
874   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
875   /*Silly: always try to wake up processor 0, so at least *somebody*
876     will be awake to handle the message*/
877   CmiIdleLock_addMessage(&cs->idle);
878 }
879 #endif
880
881 /***************************************************************
882  Communication with charmrun:
883  We can send (ctrl_sendone) and receive (ctrl_getone)
884  messages on a TCP socket connected to charmrun.
885  This is used for printfs, CCS, etc; and also for
886  killing ourselves if charmrun dies.
887 */
888
889 /*This flag prevents simultanious outgoing
890 messages on the charmrun socket.  It is protected
891 by the commlock.*/
892 static int Cmi_charmrun_fd_sendflag=0;
893
894 /* ctrl_sendone */
895 static int sendone_abort_fn(int code,const char *msg) {
896         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
897         machine_exit(1);
898         return -1;
899 }
900
901 static void ctrl_sendone_nolock(const char *type,
902                                 const char *data1,int dataLen1,
903                                 const char *data2,int dataLen2)
904 {
905   const void *bufs[3]; int lens[3]; int nBuffers=0;
906   ChMessageHeader hdr;
907   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
908   if (Cmi_charmrun_fd==-1) 
909         charmrun_abort("ctrl_sendone called in standalone!\n");
910   Cmi_charmrun_fd_sendflag=1;
911   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
912   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
913   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
914   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
915   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
916   Cmi_charmrun_fd_sendflag=0;
917   skt_set_abort(oldAbort);
918 }
919
920 static void ctrl_sendone_locking(const char *type,
921                                 const char *data1,int dataLen1,
922                                 const char *data2,int dataLen2)
923 {
924   CmiCommLock();
925   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
926   CmiCommUnlock();
927 }
928
929 static double Cmi_check_last;
930
931 static void pingCharmrun(void *ignored) 
932 {
933   double clock=GetClock();
934   if (clock > Cmi_check_last + Cmi_check_delay) {
935     MACHSTATE(1,"CommunicationsClock pinging charmrun");       
936     Cmi_check_last = clock; 
937     CmiCommLockOrElse(return;); /*Already busy doing communication*/
938     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
939     CmiCommLock();
940     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
941     CmiCommUnlock();
942   }
943 #if 1
944   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
945 #endif
946 }
947
948 /* periodic charm ping, for gm and netpoll */
949 static void pingCharmrunPeriodic(void *ignored)
950 {
951   pingCharmrun(ignored);
952   CcdCallFnAfter(pingCharmrunPeriodic,NULL,1000);
953 }
954
955 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
956 static void charmrun_abort(const char *s)
957 {
958   if (Cmi_charmrun_fd==-1) {/*Standalone*/
959         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
960         abort();
961   } else {
962         char msgBuf[80];
963         skt_set_abort(ignore_further_errors);
964         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
965         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
966   }
967 }
968
969 /* ctrl_getone */
970
971 static void ctrl_getone(void)
972 {
973   ChMessage msg;
974   MACHSTATE(2,"ctrl_getone")
975   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
976   ChMessage_recv(Cmi_charmrun_fd,&msg);
977
978   if (strcmp(msg.header.type,"die")==0) {
979     fprintf(stderr,"aborting: %s\n",msg.data);
980     log_done();
981     ConverseCommonExit();
982     machine_exit(0);
983 #if CMK_CCS_AVAILABLE
984   } else if (strcmp(msg.header.type, "req_fw")==0) {
985     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
986         /*Sadly, I *can't* do a:
987       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
988         here, because I can't send converse messages in the
989         communication thread.  I *can* poke this message into 
990         any convenient processor's queue, though:  (OSL, 9/14/2000)
991         */
992         int pe=0;/*<- node-local processor number. Any one will do.*/
993         void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
994         MACHSTATE(2,"Incoming CCS request");
995         CmiPushPE(pe,cmsg);
996 #endif
997   }  else {
998   /* We do not use KillEveryOne here because it calls CmiMyPe(),
999    * which is not available to the communication thread on an SMP version.
1000    */
1001     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1002     machine_exit(1);
1003   }
1004   
1005   ChMessage_free(&msg);
1006 }
1007
1008 #if CMK_CCS_AVAILABLE
1009 /*Deliver this reply data to this reply socket.
1010   The data is forwarded to CCS server via charmrun.*/
1011 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1012 {
1013   MACHSTATE(2,"Outgoing CCS reply");
1014   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1015                        repData,repLen);
1016   MACHSTATE(1,"Outgoing CCS reply away");
1017 }
1018 #endif
1019
1020 /*****************************************************************************
1021  *
1022  * CmiPrintf, CmiError, CmiScanf
1023  *
1024  *****************************************************************************/
1025 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1026 static void InternalPrintf(const char *f, va_list l)
1027 {
1028   ChMessage replymsg;
1029   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1030   CmiStdoutFlush();
1031   vsprintf(buffer, f, l);
1032   if(Cmi_syncprint) {
1033           CmiCommLock();
1034           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1035           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1036           ChMessage_free(&replymsg);
1037           CmiCommUnlock();
1038   } else {
1039           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1040   }
1041   InternalWriteToTerminal(0,buffer,strlen(buffer));
1042   CmiTmpFree(buffer);
1043 }
1044
1045 static void InternalError(const char *f, va_list l)
1046 {
1047   ChMessage replymsg;
1048   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1049   CmiStdoutFlush();
1050   vsprintf(buffer, f, l);
1051   if(Cmi_syncprint) {
1052           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1053           CmiCommLock();
1054           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1055           ChMessage_free(&replymsg);
1056           CmiCommUnlock();
1057   } else {
1058           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1059   }
1060   InternalWriteToTerminal(1,buffer,strlen(buffer));
1061   CmiTmpFree(buffer);
1062 }
1063
1064 static int InternalScanf(char *fmt, va_list l)
1065 {
1066   ChMessage replymsg;
1067   char *ptr[20];
1068   char *p; int nargs, i;
1069   nargs=0;
1070   p=fmt;
1071   while (*p) {
1072     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1073     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1074     if (p[0]=='%') { nargs++; p++; continue; }
1075     if (*p=='\n') *p=' '; p++;
1076   }
1077   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1078   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1079   CmiLock(Cmi_scanf_mutex);
1080   if (Cmi_charmrun_fd!=-1)
1081   {/*Send charmrun the format string*/
1082         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1083         /*Wait for the reply (characters to scan) from charmrun*/
1084         CmiCommLock();
1085         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1086         i = sscanf((char*)replymsg.data, fmt,
1087                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1088                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1089                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1090         ChMessage_free(&replymsg);
1091         CmiCommUnlock();
1092   } else
1093   {/*Just do the scanf normally*/
1094         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1095                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1096                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1097   }
1098   CmiUnlock(Cmi_scanf_mutex);
1099   return i;
1100 }
1101
1102 #if CMK_CMIPRINTF_IS_A_BUILTIN
1103
1104 /*New stdarg.h declarations*/
1105 void CmiPrintf(const char *fmt, ...)
1106 {
1107   va_list p; va_start(p, fmt);
1108   if (Cmi_charmrun_fd!=-1)
1109     InternalPrintf(fmt, p);
1110   else
1111     vfprintf(stdout,fmt,p);
1112   va_end(p);
1113 }
1114
1115 void CmiError(const char *fmt, ...)
1116 {
1117   va_list p; va_start (p, fmt);
1118   if (Cmi_charmrun_fd!=-1)
1119     InternalError(fmt, p);
1120   else
1121     vfprintf(stderr,fmt,p);
1122   va_end(p);
1123 }
1124
1125 int CmiScanf(const char *fmt, ...)
1126 {
1127   va_list p; int i; va_start(p, fmt);
1128   i = InternalScanf((char *)fmt, p);
1129   va_end(p);
1130   return i;
1131 }
1132
1133 #endif
1134
1135 /***************************************************************************
1136  * Output redirection:
1137  *  When people don't use CkPrintf, like above, we'd still like to be able
1138  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1139  * which lets us read the characters sent to stdout at our lesiure.
1140  ***************************************************************************/
1141
1142 /*Can read from stdout or stderr using these fd's*/
1143 static int readStdout[2]; 
1144 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1145 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1146 #define readStdoutBufLen (16*1024)
1147 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1148 static int servicingStdout;
1149
1150 /*Initialization-- should only be called once per node*/
1151 static void CmiStdoutInit(void) {
1152         int i;
1153         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1154         
1155 /*There's some way to do this same thing in windows, but I don't know how*/
1156 #if !defined(_WIN32) || defined(__CYGWIN__)
1157         /*Prevent buffering in stdio library:*/
1158         setbuf(stdout,NULL); setbuf(stderr,NULL);
1159
1160         /*Reopen stdout and stderr fd's as new pipes:*/
1161         for (i=0;i<2;i++) {
1162                 int pair[2];
1163                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1164                 
1165                 /*First, save a copy of the original stdout*/
1166                 writeStdout[i]=dup(srcFd);
1167 #if 0
1168                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1169                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1170 #else
1171                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1172                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1173                         {perror("building stdio redirection socketpair"); exit(1);}
1174 #endif
1175                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1176                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1177                 
1178 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1179                 CmiEnableNonblockingIO(srcFd);
1180 #endif
1181 #if CMK_SHARED_VARS_UNAVAILABLE && !CMK_USE_GM
1182                 if (!Cmi_netpoll) {
1183   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1184                         CmiEnableAsyncIO(readStdout[i]);
1185                 }
1186 #endif
1187         }
1188 #else
1189 /*Windows system-- just fake reads for now*/
1190 # ifndef read
1191 #  define read(x,y,z) 0
1192 # endif
1193 # ifndef write
1194 #  define write(x,y,z) 
1195 # endif
1196 #endif
1197 }
1198
1199 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1200 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1201 {
1202         write(writeStdout[isStdErr],str,len);   
1203 }
1204
1205 /*
1206   Service this particular stdout pipe.  
1207   Must hold comm. lock.
1208 */
1209 static void CmiStdoutServiceOne(int i) {
1210         int nBytes;
1211         const static char *cmdName[2]={"print","printerr"};
1212         servicingStdout=1;
1213         while(1) {
1214                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1215                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1216                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1217                 if (nBytes<=0) break; /*Nothing to send*/
1218                 
1219                 /*Send these bytes off to charmrun*/
1220                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1221                 nBytes++; /*Include zero-terminator in message to charmrun*/
1222                 
1223                 if (nBytes>=readStdoutBufLen-100) 
1224                 { /*We must have filled up our output pipe-- most output libraries
1225                    don't handle this well (e.g., glibc printf just drops the line).*/
1226                         
1227                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1228                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1229                         nBytes--; /*Remove terminator from user's data*/
1230                         tooMuchLen=strlen(tooMuchWarn)+1;
1231                 }
1232                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1233                                     tooMuchWarn,tooMuchLen);
1234                 
1235                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1236         }
1237         servicingStdout=0;
1238         serviceStdout[i]=0; /*This pipe is now serviced*/
1239 }
1240
1241 /*Service all stdout pipes, whether it looks like they need it
1242   or not.  Used when you aren't sure if select() has been called recently.
1243   Must hold comm. lock.
1244 */
1245 static void CmiStdoutServiceAll(void) {
1246         int i;
1247         for (i=0;i<2;i++) {
1248                 if (readStdout[i]==0) continue; /*Pipe not open*/
1249                 CmiStdoutServiceOne(i);
1250         }
1251 }
1252
1253 /*Service any outstanding stdout pipes.
1254   Must hold comm. lock.
1255 */
1256 static void CmiStdoutService(void) {
1257         CmiStdoutServiceAll();
1258 }
1259
1260 /*Add our pipes to the pile for select() or poll().
1261   Both can be called with or without the comm. lock.
1262 */
1263 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1264         int i;
1265         for (i=0;i<2;i++) {
1266                 if (readStdout[i]==0) continue; /*Pipe not open*/
1267                 CMK_PIPE_ADDREAD(readStdout[i]);
1268         }
1269 }
1270 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1271         int i;
1272         for (i=0;i<2;i++) {
1273                 if (readStdout[i]==0) continue; /*Pipe not open*/
1274                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1275         }
1276 }
1277 static int CmiStdoutNeedsService(void) {
1278         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1279 }
1280
1281 /*Called every few milliseconds to flush the stdout pipes*/
1282 static void CmiStdoutFlush(void) {
1283         if (servicingStdout) return; /* might be called by SIGALRM */
1284         CmiCommLockOrElse( return; )
1285         CmiCommLock();
1286         CmiStdoutServiceAll();
1287         CmiCommUnlock();
1288 }
1289
1290 /***************************************************************************
1291  * Message Delivery:
1292  *
1293  ***************************************************************************/
1294
1295 #include "machine-dgram.c"
1296
1297
1298 #ifndef CmiNodeFirst
1299 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1300 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1301 #endif
1302
1303 #ifndef CmiNodeOf
1304 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1305 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1306 #endif
1307
1308
1309 /*****************************************************************************
1310  *
1311  * node_addresses
1312  *
1313  *  These two functions fill the node-table.
1314  *
1315  *
1316  *   This node, like all others, first sends its own address to charmrun
1317  *   using this command:
1318  *
1319  *     Type: nodeinfo
1320  *     Data: Big-endian 4-byte ints
1321  *           <my-node #><Dataport>
1322  *
1323  *   When charmrun has all the addresses, he sends this table to me:
1324  *
1325  *     Type: nodes
1326  *     Data: Big-endian 4-byte ints
1327  *           <number of nodes n>
1328  *           <#PEs><IP><Dataport> Node 0
1329  *           <#PEs><IP><Dataport> Node 1
1330  *           ...
1331  *           <#PEs><IP><Dataport> Node n-1
1332  *
1333  *****************************************************************************/
1334
1335 /*Note: node_addresses_obtain is called before starting
1336   threads, so no locks are needed (or valid!)*/
1337 static void node_addresses_obtain(char **argv)
1338 {
1339   ChMessage nodetabmsg; /* info about all nodes*/
1340   if (Cmi_charmrun_fd==-1) 
1341   {/*Standalone-- fake a single-node nodetab message*/
1342         int npes=1;
1343         ChSingleNodeinfo *fakeTab;
1344         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1345         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1346         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1347 #if CMK_SHARED_VARS_UNAVAILABLE
1348         if (npes!=1) {
1349                 fprintf(stderr,
1350                         "To use multiple processors, you must run this program as:\n"
1351                         " > charmrun +p%d %s <args>\n"
1352                         "or build the %s-smp version of Charm++.\n",
1353                         npes,argv[0],CMK_MACHINE_NAME);
1354                 exit(1);
1355         }
1356 #endif
1357         /*This is a stupid hack: we expect the *number* of nodes
1358         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1359         (which happens to have exactly that layout!) and stuff
1360         a 1 into the "node number" slot
1361         */
1362         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1363         fakeTab->info.nPE=ChMessageInt_new(npes);
1364         fakeTab->info.dataport=ChMessageInt_new(0);
1365         fakeTab->info.IP=_skt_invalid_ip;
1366   }
1367   else 
1368   { /*Contact charmrun for machine info.*/
1369         ChSingleNodeinfo me;
1370
1371         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1372         /*The nPE and IP fields are set by charmrun--
1373           these values don't matter.
1374         */
1375         me.info.nPE=ChMessageInt_new(0);
1376         me.info.IP=_skt_invalid_ip;
1377         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1378         me.info.dataport=ChMessageInt_new(dataport);
1379
1380         /*Send our node info. to charmrun.
1381         CommLock hasn't been initialized yet-- 
1382         use non-locking version*/
1383         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1384   
1385         /*We get the other node addresses from a message sent
1386           back via the charmrun control port.*/
1387         if (!skt_select1(Cmi_charmrun_fd,600*1000)) CmiAbort("Timeout waiting for nodetab!\n");
1388         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1389   }
1390   node_addresses_store(&nodetabmsg);
1391   ChMessage_free(&nodetabmsg);
1392 }
1393
1394 #if CMK_NODE_QUEUE_AVAILABLE
1395
1396 /***********************************************************************
1397  * DeliverOutgoingNodeMessage()
1398  *
1399  * This function takes care of delivery of outgoing node messages from the
1400  * sender end. Broadcast messages are divided into sets of messages that 
1401  * are bound to the local node, and to remote nodes. For local
1402  * transmission, the messages are directly pushed into the recv
1403  * queues. For non-local transmission, the function DeliverViaNetwork()
1404  * is called
1405  ***********************************************************************/
1406 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1407 {
1408   int i, rank, dst; OtherNode node;
1409
1410   dst = ogm->dst;
1411   switch (dst) {
1412   case NODE_BROADCAST_ALL:
1413     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1414     /*case-fallthrough (no break)-- deliver to all other processors*/
1415   case NODE_BROADCAST_OTHERS:
1416     for (i=0; i<_Cmi_numnodes; i++)
1417       if (i!=_Cmi_mynode)
1418         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE);
1419     GarbageCollectMsg(ogm);
1420     break;
1421   default:
1422     node = nodes+dst;
1423     rank=DGRAM_NODEMESSAGE;
1424     if (dst != _Cmi_mynode) {
1425       DeliverViaNetwork(ogm, node, rank);
1426       GarbageCollectMsg(ogm);
1427     } else {
1428       if (ogm->freemode == 'A') {
1429         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1430         ogm->freemode = 'X';
1431       } else {
1432         CmiPushNode(ogm->data);
1433         FreeOutgoingMsg(ogm);
1434       }
1435     }
1436   }
1437 }
1438
1439 #else
1440
1441 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1442
1443 #endif
1444
1445 /***********************************************************************
1446  * DeliverOutgoingMessage()
1447  *
1448  * This function takes care of delivery of outgoing messages from the
1449  * sender end. Broadcast messages are divided into sets of messages that 
1450  * are bound to the local node, and to remote nodes. For local
1451  * transmission, the messages are directly pushed into the recv
1452  * queues. For non-local transmission, the function DeliverViaNetwork()
1453  * is called
1454  ***********************************************************************/
1455 void DeliverOutgoingMessage(OutgoingMsg ogm)
1456 {
1457   int i, rank, dst; OtherNode node;
1458   
1459   dst = ogm->dst;
1460   switch (dst) {
1461   case PE_BROADCAST_ALL:
1462     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1463       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1464     }
1465     for (i=0; i<_Cmi_numnodes; i++)
1466       if (i!=_Cmi_mynode)
1467         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1468     GarbageCollectMsg(ogm);
1469     break;
1470   case PE_BROADCAST_OTHERS:
1471     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1472       if (rank + Cmi_nodestart != ogm->src) {
1473         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1474       }
1475     for (i = 0; i<_Cmi_numnodes; i++)
1476       if (i!=_Cmi_mynode)
1477         DeliverViaNetwork(ogm, nodes + i, DGRAM_BROADCAST);
1478     GarbageCollectMsg(ogm);
1479     break;
1480   default:
1481 #ifndef CMK_OPTIMIZE
1482     if (dst<0 || dst>=CmiNumPes())
1483       CmiAbort("Send to out-of-bounds processor!");
1484 #endif
1485     node = nodes_by_pe[dst];
1486     rank = dst - node->nodestart;
1487     if (node->nodestart != Cmi_nodestart) {
1488       DeliverViaNetwork(ogm, node, rank);
1489       GarbageCollectMsg(ogm);
1490     } else {
1491       if (ogm->freemode == 'A') {
1492         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1493         ogm->freemode = 'X';
1494       } else {
1495         CmiPushPE(rank, ogm->data);
1496         FreeOutgoingMsg(ogm);
1497       }
1498     }
1499   }
1500 }
1501
1502
1503 /******************************************************************************
1504  *
1505  * CmiGetNonLocal
1506  *
1507  * The design of this system is that the communication thread does all the
1508  * work, to eliminate as many locking issues as possible.  This is the only
1509  * part of the code that happens in the receiver-thread.
1510  *
1511  * This operation is fairly cheap, it might be worthwhile to inline
1512  * the code into CmiDeliverMsgs to reduce function call overhead.
1513  *
1514  *****************************************************************************/
1515
1516 #if CMK_NODE_QUEUE_AVAILABLE
1517 char *CmiGetNonLocalNodeQ(void)
1518 {
1519   char *result = 0;
1520   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1521     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1522     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1523     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1524   }
1525   return result;
1526 }
1527 #endif
1528
1529 void *CmiGetNonLocal(void)
1530 {
1531   CmiState cs = CmiGetState();
1532   CmiIdleLock_checkMessage(&cs->idle);
1533   return (void *) PCQueuePop(cs->recv);
1534 }
1535
1536
1537 /**
1538  * Set up and OutgoingMsg structure for this message.
1539  */
1540 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1541   OutgoingMsg ogm;
1542   MallocOutgoingMsg(ogm);
1543   CmiMsgHeaderSetLength(data, size);\r
1544   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1545   ogm->size = size;
1546   ogm->data = data;
1547   ogm->src = cs->pe;
1548   ogm->dst = pe;
1549   ogm->freemode = freemode;
1550   ogm->refcount = 0;
1551   return (CmiCommHandle)ogm;    
1552 }
1553
1554 #if CMK_NODE_QUEUE_AVAILABLE
1555
1556 /******************************************************************************
1557  *
1558  * CmiGeneralNodeSend
1559  *
1560  * Description: This is a generic message sending routine. All the
1561  * converse message send functions are implemented in terms of this
1562  * function. (By setting appropriate flags (eg freemode) that tell
1563  * CmiGeneralSend() how exactly to handle the particular case of
1564  * message send)
1565  *
1566  *****************************************************************************/
1567
1568 CmiCommHandle CmiGeneralNodeSend(int pe, int size, int freemode, char *data)
1569 {
1570   
1571   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1572
1573   if (freemode == 'S') {
1574     char *copy = (char *)CmiAlloc(size);
1575     if (!copy)
1576       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
1577     memcpy(copy, data, size);
1578     data = copy; freemode = 'F';
1579   }
1580
1581   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
1582   CmiCommLock();
1583   DeliverOutgoingNodeMessage(ogm);
1584   CmiCommUnlock();
1585   /* Check if any packets have arrived recently (preserves kernel network buffers). */
1586   CommunicationServer(0);
1587   return (CmiCommHandle)ogm;
1588 }
1589
1590 #endif
1591
1592
1593 /******************************************************************************
1594  *
1595  * CmiGeneralSend
1596  *
1597  * Description: This is a generic message sending routine. All the
1598  * converse message send functions are implemented in terms of this
1599  * function. (By setting appropriate flags (eg freemode) that tell
1600  * CmiGeneralSend() how exactly to handle the particular case of
1601  * message send)
1602  *
1603  *****************************************************************************/
1604
1605 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1606 {
1607   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1608
1609   if (freemode == 'S') {
1610     char *copy = (char *)CmiAlloc(size);
1611     if (!copy)
1612       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
1613     memcpy(copy, data, size);
1614     data = copy; freemode = 'F';
1615   }
1616   
1617   if (pe == cs->pe) {
1618 #if CMK_IMMEDIATE_MSG
1619       /* execute the immediate message right away */
1620     if (CmiIsImmediate(data)) {
1621       CmiHandleImmediateMessage(data);
1622       return 0;
1623     }
1624 #endif
1625 #if ! CMK_SMP
1626     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
1627                       The SMP comm thread never gets here, because of the pe test. */
1628 #endif
1629     {
1630     CdsFifo_Enqueue(cs->localqueue, data);
1631     if (freemode == 'A') {
1632       MallocOutgoingMsg(ogm);
1633       ogm->freemode = 'X';
1634       return ogm;
1635     } else return 0;
1636     }
1637   }
1638
1639 #if CMK_PERSISTENT_COMM
1640   if (phs) {
1641       CmiAssert(phsSize == 1);
1642       CmiSendPersistentMsg(*phs, pe, size, data);
1643       return NULL;
1644   }
1645 #endif
1646
1647   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
1648   CmiCommLock();
1649   DeliverOutgoingMessage(ogm);
1650   CmiCommUnlock();
1651   /* Check if any packets have arrived recently (preserves kernel network buffers). */
1652   CommunicationServer(0);
1653   return (CmiCommHandle)ogm;
1654 }
1655
1656
1657 void CmiSyncSendFn(int p, int s, char *m)
1658
1659   CQdCreate(CpvAccess(cQdState), 1);
1660   CmiGeneralSend(p,s,'S',m); 
1661 }
1662
1663 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
1664
1665   CQdCreate(CpvAccess(cQdState), 1);
1666   return CmiGeneralSend(p,s,'A',m); 
1667 }
1668
1669 void CmiFreeSendFn(int p, int s, char *m)
1670
1671   CQdCreate(CpvAccess(cQdState), 1);
1672   CmiGeneralSend(p,s,'F',m); 
1673 }
1674
1675 void CmiSyncBroadcastFn(int s, char *m)
1676
1677   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
1678   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
1679 }
1680
1681 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
1682
1683   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
1684   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
1685 }
1686
1687 void CmiFreeBroadcastFn(int s, char *m)
1688
1689   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
1690   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
1691 }
1692
1693 void CmiSyncBroadcastAllFn(int s, char *m)
1694
1695   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
1696   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
1697 }
1698
1699 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
1700
1701   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
1702   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
1703 }
1704
1705 void CmiFreeBroadcastAllFn(int s, char *m)
1706
1707   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
1708   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
1709 }
1710
1711 #if CMK_NODE_QUEUE_AVAILABLE
1712
1713 void CmiSyncNodeSendFn(int p, int s, char *m)
1714
1715   CQdCreate(CpvAccess(cQdState), 1);
1716   CmiGeneralNodeSend(p,s,'S',m); 
1717 }
1718
1719 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
1720
1721   CQdCreate(CpvAccess(cQdState), 1);
1722   return CmiGeneralNodeSend(p,s,'A',m); 
1723 }
1724
1725 void CmiFreeNodeSendFn(int p, int s, char *m)
1726
1727   CQdCreate(CpvAccess(cQdState), 1);
1728   CmiGeneralNodeSend(p,s,'F',m); 
1729 }
1730
1731 void CmiSyncNodeBroadcastFn(int s, char *m)
1732
1733   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1734   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
1735 }
1736
1737 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1738
1739   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1740   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
1741 }
1742
1743 void CmiFreeNodeBroadcastFn(int s, char *m)
1744
1745   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
1746   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
1747 }
1748
1749 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1750
1751   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
1752   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
1753 }
1754
1755 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1756
1757   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
1758   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
1759 }
1760
1761 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1762
1763   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
1764   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
1765 }
1766 #endif
1767
1768 /******************************************************************************
1769  *
1770  * Comm Handle manipulation.
1771  *
1772  *****************************************************************************/
1773
1774 int CmiAsyncMsgSent(CmiCommHandle handle)
1775 {
1776   return (((OutgoingMsg)handle)->freemode == 'X');
1777 }
1778
1779 void CmiReleaseCommHandle(CmiCommHandle handle)
1780 {
1781   FreeOutgoingMsg(((OutgoingMsg)handle));
1782 }
1783
1784
1785 #if CMK_IMMEDIATE_MSG
1786 void CmiProbeImmediateMsg()
1787 {
1788   CommunicationServerThread(0);
1789 }
1790 #endif
1791
1792 /******************************************************************************
1793  *
1794  * Main code, Init, and Exit
1795  *
1796  *****************************************************************************/
1797 extern void CthInit(char **argv);
1798 extern void ConverseCommonInit(char **);
1799
1800 static char     **Cmi_argv;
1801 static char     **Cmi_argvcopy;
1802 static CmiStartFn Cmi_startfn;   /* The start function */
1803 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1804
1805 static void ConverseRunPE(int everReturn)
1806 {
1807   CmiIdleState *s=CmiNotifyGetState();
1808   CmiState cs;
1809   char** CmiMyArgv;
1810   CmiNodeAllBarrier();
1811   cs = CmiGetState();
1812   CpvInitialize(void *,CmiLocalQueue);
1813   CpvAccess(CmiLocalQueue) = cs->localqueue;
1814
1815   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
1816   if (CmiMyRank())
1817     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
1818   else
1819     CmiMyArgv = Cmi_argv;
1820   CthInit(CmiMyArgv);
1821
1822 #if CMK_USE_GM
1823   CmiCheckGmStatus();
1824 #endif
1825
1826   ConverseCommonInit(CmiMyArgv);
1827
1828   /* better to show the status here */
1829   if (Cmi_netpoll == 1 && CmiMyPe() == 0)
1830     CmiPrintf("Charm++: scheduler running in netpoll mode.\n");
1831   
1832 #if CMK_USE_GM
1833   if (Cmi_charmrun_fd != -1)
1834 #endif
1835   {
1836   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
1837       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
1838   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
1839       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
1840   }
1841 #if CMK_SHARED_VARS_UNAVAILABLE
1842   if (Cmi_netpoll) /*Repeatedly call CommServer*/
1843     CcdCallOnConditionKeep(CcdPERIODIC, 
1844         (CcdVoidFn) CommunicationPeriodic, NULL);
1845   else /*Only need this for retransmits*/
1846     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
1847         (CcdVoidFn) CommunicationPeriodic, NULL);
1848 #endif
1849
1850   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
1851     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
1852 #if CMK_SHARED_VARS_UNAVAILABLE
1853     if (Cmi_netpoll == 1) {
1854     /* gm cannot live with setitimer */
1855     CcdCallFnAfter(pingCharmrunPeriodic,NULL,1000);
1856     }
1857     else {
1858     /*Occasionally ping charmrun, to test if it's dead*/
1859     struct itimerval i;
1860     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
1861     i.it_interval.tv_sec = 1;
1862     i.it_interval.tv_usec = 0;
1863     i.it_value.tv_sec = 1;
1864     i.it_value.tv_usec = 0;
1865     setitimer(ITIMER_REAL, &i, NULL);
1866     }
1867
1868 #if ! CMK_USE_GM && ! CMK_USE_TCP
1869     /*Occasionally check for retransmissions, outgoing acks, etc.*/
1870     /*no need in GM case */
1871     CcdCallFnAfter(CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
1872 #endif
1873 #endif
1874     
1875     /*Initialize the clock*/
1876     Cmi_clock=GetClock();
1877   }
1878   
1879 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
1880   srand((int)(1024.0*CmiWallTimer()));
1881   if (CmiMyPe()==0)
1882     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
1883         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
1884 #endif
1885
1886   /* Converse initialization finishes, immediate messages can be processed.
1887      node barrier previously should take care of the node synchronization */
1888   _immediateReady = 1;
1889
1890   /* communication thread */
1891   if (CmiMyRank() == CmiMyNodeSize()) {
1892     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1893     if (Cmi_charmrun_fd!=-1)
1894           while (1) CommunicationServerThread(5);
1895   }
1896   else
1897   if (!everReturn) {
1898     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1899     if (Cmi_usrsched==0) CsdScheduler(-1);
1900     ConverseExit();
1901   }
1902 }
1903
1904 void ConverseExit(void)
1905 {
1906   machine_initiated_shutdown=1;
1907   if (CmiMyRank()==0) {
1908     if(Cmi_print_stats)
1909       printNetStatistics();
1910     log_done();
1911     CmiStdoutFlush();
1912     ConverseCommonExit();
1913     if (Cmi_charmrun_fd==-1) exit(0); /*Standalone version-- just leave*/
1914   }
1915   if (Cmi_charmrun_fd!=-1) {
1916         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away */
1917 #if CMK_SHARED_VARS_UNAVAILABLE
1918         while (1) CommunicationServer(500);
1919 #endif
1920   }
1921 /*Comm. thread will kill us.*/
1922   while (1) CmiYield();
1923 }
1924
1925 static void set_signals(void)
1926 {
1927   if(!Cmi_truecrash) {
1928     signal(SIGSEGV, KillOnAllSigs);
1929     signal(SIGFPE, KillOnAllSigs);
1930     signal(SIGILL, KillOnAllSigs);
1931     signal(SIGINT, KillOnAllSigs);
1932     signal(SIGTERM, KillOnAllSigs);
1933     signal(SIGABRT, KillOnAllSigs);
1934 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1935     signal(SIGQUIT, KillOnAllSigs);
1936     signal(SIGBUS, KillOnAllSigs);
1937 #     if CMK_HANDLE_SIGUSR
1938     signal(SIGUSR1, HandleUserSignals);
1939     signal(SIGUSR2, HandleUserSignals);
1940 #     endif
1941 #   endif /*UNIX*/
1942   }
1943 }
1944
1945 /*Socket idle function to use before addresses have been
1946   obtained.  During the real program, we idle with CmiYield.
1947 */
1948 static void obtain_idleFn(void) {sleep(0);}
1949
1950 static int net_default_skt_abort(int code,const char *msg)
1951 {
1952   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
1953   machine_exit(1);
1954   return -1;
1955 }
1956
1957 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
1958 {
1959 #if MACHINE_DEBUG
1960   debugLog=NULL;
1961 #endif
1962 #if CMK_USE_HP_MAIN_FIX
1963 #if FOR_CPLUS
1964   _main(argc,argv);
1965 #endif
1966 #endif
1967   Cmi_argvcopy = CmiCopyArgs(argv);
1968   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usc;
1969   Cmi_netpoll = 0;
1970 #if CMK_NETPOLL
1971   Cmi_netpoll = 1;
1972 #endif
1973 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
1974   Cmi_idlepoll = 0;
1975 #else
1976   Cmi_idlepoll = 1;
1977 #endif
1978   Cmi_truecrash = 0;
1979   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
1980       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
1981     /* netpoll disable signal */
1982   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
1983     /* idlepoll use poll instead if sleep when idle */
1984   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
1985     /* idlesleep use sleep instead if busywait when idle */
1986   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
1987   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
1988
1989   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
1990
1991   skt_init();
1992   /* use special abort handler instead of default_skt_abort to 
1993      prevent exit trapped by atexit_check() due to the exit() call  */
1994   skt_set_abort(net_default_skt_abort);
1995   atexit(machine_atexit_check);
1996   parse_netstart();
1997   extract_args(argv);
1998   log_init();
1999   Cmi_scanf_mutex = CmiCreateLock();
2000
2001 #if MACHINE_DEBUG_LOG
2002   {
2003     char ln[200];
2004     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2005     debugLog=fopen(ln,"w");
2006   }
2007 #endif
2008
2009   skt_set_idle(obtain_idleFn);
2010   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2011         set_signals();
2012 #if CMK_USE_TCP
2013         dataskt=skt_server(&dataport);
2014 #elif !CMK_USE_GM
2015         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2016 #else
2017         dataskt=-1;
2018 #endif
2019         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2020         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2021         MACHSTATE1(5,"Opened data socket at port %d",dataport);
2022         CmiStdoutInit();
2023   } else {/*Standalone operation*/
2024         printf("Charm++: standalone mode (not using charmrun)\n");
2025         dataskt=-1;
2026         Cmi_charmrun_fd=-1;
2027   }
2028
2029   CmiMachineInit();
2030
2031   node_addresses_obtain(argv);
2032
2033 #if CMK_USE_TCP
2034   open_tcp_sockets();
2035 #endif
2036
2037   skt_set_idle(CmiYield);
2038   Cmi_check_delay = 2.0+0.5*_Cmi_numnodes;
2039   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2040         Cmi_check_delay=1.0e30;
2041
2042   CsvInitialize(CmiNodeState, NodeState);
2043   CmiNodeStateInit(&CsvAccess(NodeState));
2044
2045   CmiStartThreads(argv);
2046   ConverseRunPE(everReturn);
2047 }
2048
2049
2050 #if CMK_PERSISTENT_COMM
2051
2052 int persistentSendMsgHandlerIdx;
2053
2054 static void sendPerMsgHandler(char *msg)
2055 {
2056   int msgSize;
2057   void *destAddr, *destSizeAddr;
2058
2059   msgSize = CmiMsgHeaderGetLength(msg);
2060   msgSize -= 2*sizeof(void *);
2061   destAddr = *(void **)(msg + msgSize);
2062   destSizeAddr = *(void **)(msg + msgSize + sizeof(void*));
2063 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2064   CmiSetHandler(msg, CmiGetXHandler(msg));
2065   *((int *)destSizeAddr) = msgSize;
2066   memcpy(destAddr, msg, msgSize);
2067 }
2068
2069 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2070 {
2071   CmiAssert(h!=NULL);
2072   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2073   CmiAssert(slot->used == 1);
2074   CmiAssert(slot->destPE == destPE);
2075   if (size > slot->sizeMax) {
2076     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2077     CmiAbort("Abort: Invalid size\n");
2078   }
2079
2080 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2081
2082   if (slot->destAddress[0]) {
2083     int newsize = size + sizeof(void *)*2;
2084     char *newmsg = (char*)CmiAlloc(newsize);
2085     memcpy(newmsg, m, size);
2086     memcpy(newmsg+size, &slot->destAddress[0], sizeof(void *));
2087     memcpy(newmsg+size+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2088     CmiFree(m);
2089     CmiMsgHeaderSetLength(newmsg, size + sizeof(void *)*2);
2090     CmiSetXHandler(newmsg, CmiGetHandler(newmsg));
2091     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2092     phs = NULL; phsSize = 0;
2093     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2094   }
2095   else {
2096 #if 1
2097     if (slot->messageBuf != NULL) {
2098       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2099       CmiAbort("");
2100     }
2101     slot->messageBuf = m;
2102     slot->messageSize = size;
2103 #else
2104     /* normal send */
2105     PersistentHandle  *phs_tmp = phs;
2106     int phsSize_tmp = phsSize;
2107     phs = NULL; phsSize = 0;
2108     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2109     CmiSyncSendAndFree(slot->destPE, size, m);
2110     phs = phs_tmp; phsSize = phsSize_tmp;
2111 #endif
2112   }
2113 }
2114
2115 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2116 {
2117   char *dupmsg = (char *) CmiAlloc(size);
2118   memcpy(dupmsg, msg, size);
2119
2120   /*  CmiPrintf("Setting root to %d\n", 0); */
2121   if (CmiMyPe()==destPE) {
2122     CQdCreate(CpvAccess(cQdState), 1);
2123     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2124   }
2125   else
2126     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2127 }
2128
2129 /* called in PumpMsgs */
2130 int PumpPersistent()
2131 {
2132   PersistentReceivesTable *slot = persistentReceivesTableHead;
2133   int status = 0;
2134   while (slot) {
2135     unsigned int size = *(slot->recvSizePtr[0]);
2136     if (size > 0)
2137     {
2138       char *msg = slot->messagePtr[0];
2139 /*CmiPrintf("size: %d msg:%p %p\n", size, msg, slot->messagePtr);*/
2140
2141 #if 0
2142       void *dupmsg;
2143       dupmsg = CmiAlloc(size);
2144       
2145       _MEMCHECK(dupmsg);
2146       memcpy(dupmsg, msg, size);
2147       msg = dupmsg;
2148 #else
2149       /* return messagePtr directly and user MUST make sure not to delete it. */
2150       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2151
2152       CmiReference(msg);
2153 #endif
2154
2155       CmiPushPE(CMI_DEST_RANK(msg), msg);
2156 #if CMK_BROADCAST_SPANNING_TREE
2157       if (CMI_BROADCAST_ROOT(msg))
2158           SendSpanningChildren(size, msg);
2159 #endif
2160       *(slot->recvSizePtr[0]) = 0;
2161       status = 1;
2162     }
2163     slot = slot->next;
2164   }
2165   return status;
2166 }
2167
2168 void *PerAlloc(int size)
2169 {
2170   return CmiAlloc(size);
2171 }
2172                                                                                 
2173 void PerFree(char *msg)
2174 {
2175     CmiFree(msg);
2176 }
2177
2178 void persist_machine_init() 
2179 {
2180   persistentSendMsgHandlerIdx =
2181        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
2182 }
2183
2184 #endif