netlrts: replace some CMK_SHARED_VARS_UNAVAILABLE with not CMK_SMP
[charm.git] / src / arch / net / machine.c
1
2 /** @file
3  * Basic NET implementation of Converse machine layer
4  * @ingroup NET
5  */
6
7 /** @defgroup NET
8  * NET implementation of machine layer, ethernet in particular
9  * @ingroup Machine
10  *
11  * THE DATAGRAM STREAM
12  *
13  * Messages are sent using UDP datagrams.  The sender allocates a
14  * struct for each datagram to be sent.  These structs stick around
15  * until slightly after the datagram is acknowledged.
16  *
17  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18  * Each node has an OtherNode struct for every other node in the
19  * system.  The OtherNode struct contains:
20  *
21  *   send_queue   (all datagram-structs not yet transmitted)
22  *   send_window  (all datagram-structs transmitted but not ack'd)
23  *
24  * When an acknowledgement comes in, all packets in the send-window
25  * are either marked as acknowledged or pushed back into the send
26  * queue for retransmission.
27  *
28  * THE OUTGOING MESSAGE
29  *
30  * When you send or broadcast a message, the first thing the system
31  * does is system creates an OutgoingMsg struct to represent the
32  * operation.  The OutgoingMsg contains a very direct expression
33  * of what you want to do:
34  *
35  * OutgoingMsg:
36  *
37  *   size      --- size of message in bytes
38  *   data      --- pointer to the buffer containing the message
39  *   src       --- processor which sent the message
40  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
41  *   freemode  --- see below.
42  *   refcount  --- see below.
43  *
44  * The OutgoingMsg is kept around until the transmission is done, then
45  * it is garbage collected --- the refcount and freemode fields are
46  * to assist garbage collection.
47  *
48  * The freemode indicates which kind of buffer-management policy was
49  * used (sync, async, or freeing).  The sync policy is handled
50  * superficially by immediately converting sync sends into freeing
51  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
52  * (freeing).  If the freemode is 'F', then garbage collection
53  * involves freeing the data and the OutgoingMsg structure itself.  If
54  * the freemode is 'A', then the only cleanup is to change the
55  * freemode to 'X', a condition which is then detectable by
56  * CmiAsyncMsgSent.  In this case, the actual freeing of the
57  * OutgoingMsg is done by CmiReleaseCommHandle.
58  *
59  * When the transmission is initiated, the system computes how many
60  * datagrams need to be sent, total.  This number is stored in the
61  * refcount field.  Each time a datagram is delivered, the refcount
62  * is decremented, when it reaches zero, cleanup is performed.  There
63  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
64  * is a send (not a broadcast) and can be performed with shared
65  * memory, the entire datagram system is bypassed, the message is
66  * simply delivered and freed, not using the refcount mechanism at
67  * all.  Exception 2: If the message is a broadcast, then part of the
68  * broadcast that can be done via shared memory is performed prior to
69  * initiating the datagram/refcount system.
70  *
71  * DATAGRAM FORMATS AND MESSAGE FORMATS
72  *
73  * Datagrams have this format:
74  *
75  *   srcpe   (16 bits) --- source processor number.
76  *   magic   ( 8 bits) --- magic number to make sure DG is good.
77  *   dstrank ( 8 bits) --- destination processor rank.
78  *   seqno   (32 bits) --- packet sequence number.
79  *   data    (XX byte) --- user data.
80  *
81  * The only reason the srcpe is in there is because the receiver needs
82  * to know which receive window to use.  The dstrank field is needed
83  * because transmission is node-to-node.  Once the message is
84  * assembled by the node, it must be delivered to the appropriate PE.
85  * The dstrank field is used to encode certain special-case scenarios.
86  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87  * and should be delivered to all processors in the node.  If the dstrank
88  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89  * which case the srcpe is the number of the acknowledger, the seqno is
90  * always zero, and the user data is a list of the seqno's being
91  * acknowledged.  There may be other dstrank codes for special functions.
92  *
93  * To send a message, one chops it up into datagrams and stores those
94  * datagrams in a send-queue.  These outgoing datagrams aren't stored
95  * in the explicit format shown above.  Instead, they are stored as
96  * ImplicitDgrams, which contain the datagram header and a pointer to
97  * the user data (which is in the user message buffer, which is in the
98  * OutgoingMsg).  At transmission time these are combined together.
99
100  * The combination of the datagram header with the user's data is
101  * performed right in the user's message buffer.  Note that the
102  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
103  * of the user's message with a datagram header, sends the datagram
104  * straight from the user's message buffer, then restores the user's
105  * buffer to its original state.  There is a small problem with the
106  * first datagram of the message: one needs 64 bits of space to store
107  * the datagram header.  To make sure this space is there, we added a
108  * 64-bit unused space to the front of the Cmi message header.  In
109  * addition to this, we also add 32 bits to the Cmi message header
110  * to make room for a length-field, making it possible to identify
111  * message boundaries.
112  *
113  * CONCURRENCY CONTROL
114  *
115  * This has changed recently.
116  *
117  * EFFICIENCY NOTES
118  *
119  * The sender-side does little copying.  The async and freeing send
120  * routines do no copying at all.  The sync send routines copy the
121  * message, then use the freeing-send routines.  The other alternative
122  * is to not copy the message, and use the async send mechanism
123  * combined with a blocking wait.  Blocking wait seems like a bad
124  * idea, since it could take a VERY long time to get all those
125  * datagrams out the door.
126  *
127  * The receiver side, unfortunately, must copy.  To avoid copying,
128  * it would have to receive directly into a preallocated message buffer.
129  * Unfortunately, this can't work: there's no way to know how much
130  * memory to preallocate, and there's no way to know which datagram
131  * is coming next.  Thus, we receive into fixed-size (large) datagram
132  * buffers.  These are then inspected, and the messages extracted from
133  * them.
134  *
135  * Note that we are allocating a large number of structs: OutgoingMsg's,
136  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
137  * is a fixed-size structure.  Thus, we can do memory allocation by
138  * simply keeping a linked-list of unused structs around.  The only
139  * place where expensive memory allocation is performed is in the
140  * sync routines.
141  *
142  * Since the datagrams from one node to another are fully ordered,
143  * there is slightly more ordering than is needed: in theory, the
144  * datagrams of one message don't need to be ordered relative to the
145  * datagrams of another.  This was done to simplify the sequencing
146  * mechanisms: implementing a fully-ordered stream is much simpler
147  * than a partially-ordered one.  It also makes it possible to
148  * modularize, layering the message transmitter on top of the
149  * datagram-sequencer.  In other words, it was just easier this way.
150  * Hopefully, this won't cause serious degradation: LAN's rarely get
151  * datagrams out of order anyway.
152  *
153  * A potential efficiency problem is the lack of message-combining.
154  * One datagram could conceivably contain several messages.  This
155  * might be more efficient, it's not clear how much overhead is
156  * involved in sending a short datagram.  Message-combining isn't
157  * really ``integrated'' into the design of this software, but you
158  * could fudge it as follows.  Whenever you pull a short datagram from
159  * the send-queue, check the next one to see if it's also a short
160  * datagram.  If so, pack them together into a ``combined'' datagram.
161  * At the receive side, simply check for ``combined'' datagrams, and
162  * treat them as if they were simply two datagrams.  This would
163  * require extra copying.  I have no idea if this would be worthwhile.
164  *
165  *****************************************************************************/
166
167 /**
168  * @addtogroup NET
169  * @{
170  */
171
172 /*****************************************************************************
173  *
174  * Include Files
175  *
176  ****************************************************************************/
177
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
180
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf".  This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
186
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf.  Has to be defined up here because we probably 
189 haven't properly guessed this compiler's prototype for "printf".
190 */
191 static void InternalPrintf(const char *f, va_list l);
192 int printf(const char *fmt, ...) {
193         int nChar;
194         va_list p; va_start(p, fmt);
195         InternalPrintf(fmt,p);
196         va_end(p);
197         return 10;
198 }
199 #endif
200
201
202 #include "converse.h"
203 #include "memory-isomalloc.h"
204
205 #include <stdio.h>
206 #include <stdlib.h>
207 #include <ctype.h>
208 #include <fcntl.h>
209 #include <errno.h>
210 #include <setjmp.h>
211 #include <signal.h>
212 #include <string.h>
213 #include <unistd.h>
214
215 /* define machine debug */
216 #include "machine.h"
217
218 /******************* Producer-Consumer Queues ************************/
219 #include "pcqueue.h"
220
221 #include "machine-smp.h"
222
223 #include "machine-lrts.h"
224 #include "machine-common-core.c"
225
226 #if CMK_USE_KQUEUE
227 #include <sys/event.h>
228 int _kq = -1;
229 #endif
230
231 #if CMK_USE_POLL
232 #include <poll.h>
233 #endif
234
235 #if CMK_USE_GM
236 #include "gm.h"
237 struct gm_port *gmport = NULL;
238 int  portFinish = 0;
239 #endif
240
241 #if CMK_USE_MX
242 #include "myriexpress.h"
243 mx_endpoint_t      endpoint;
244 mx_endpoint_addr_t endpoint_addr;
245 int MX_FILTER   =  123456;
246 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
247 #endif
248
249 #if CMK_MULTICORE
250 int Cmi_commthread = 0;
251 #endif
252
253 #include "conv-ccs.h"
254 #include "ccs-server.h"
255 #include "sockRoutines.h"
256
257 #if defined(_WIN32) && ! defined(__CYGWIN__)
258 /*For windows systems:*/
259 #  include <windows.h>
260 #  include <wincon.h>
261 #  include <sys/types.h>
262 #  include <sys/timeb.h>
263 #  define fdopen _fdopen
264 #  define SIGBUS -1  /*These signals don't exist in Win32*/
265 #  define SIGKILL -1
266 #  define SIGQUIT -1
267 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
268
269 #else /*UNIX*/
270 #  include <pwd.h>
271 #  include <unistd.h>
272 #  include <fcntl.h>
273 #  include <sys/file.h>
274 #endif
275
276 #if CMK_PERSISTENT_COMM
277 #include "machine-persistent.c" 
278 #endif
279
280 #define PRINTBUFSIZE 16384
281
282 #ifdef __ONESIDED_IMPL
283 #ifdef __ONESIDED_NO_HARDWARE
284 int putSrcHandler;
285 int putDestHandler;
286 int getSrcHandler;
287 int getDestHandler;
288 #include "conv-onesided.c"
289 #endif
290 #endif
291
292
293 static void CommunicationServerNet(int withDelayMs, int where);
294 //static void CommunicationServer(int withDelayMs);
295
296 void CmiHandleImmediate();
297 extern int CmemInsideMem();
298 extern void CmemCallWhenMemAvail();
299
300 static unsigned int dataport=0;
301 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
302 static SOCKET       dataskt;
303
304 extern void TokenUpdatePeriodic();
305 extern void getAvailSysMem();
306
307 /****************************************************************************
308  *
309  * Handling Errors
310  *
311  * Errors should be handled by printing a message on stderr and
312  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
313  * communication should be made.  The other processes will notice the
314  * abnormal termination and will deal with it.
315  *
316  * Rationale: if an error triggers an attempt to send a message,
317  * the attempt to send a message is likely to trigger another error,
318  * leading to an infinite loop and a process that spins instead of
319  * shutting down.
320  *
321  *****************************************************************************/
322
323 static int machine_initiated_shutdown=0;
324 static int already_in_signal_handler=0;
325
326 static void CmiDestroyLocks();
327
328 void MachineExit();
329
330 static void machine_exit(int status)
331 {
332   MACHSTATE(3,"     machine_exit");
333   machine_initiated_shutdown=1;
334
335   CmiDestroyLocks();            /* destory locks to prevent dead locking */
336   EmergencyExit();
337
338 #if CMK_USE_GM
339   if (gmport) { 
340     gm_close(gmport); gmport = 0;
341     gm_finalize();
342   }
343 #endif
344   MachineExit();
345   exit(status);
346 }
347
348 static void charmrun_abort(const char*);
349
350 static void KillEveryone(const char *msg)
351 {
352   charmrun_abort(msg);
353   machine_exit(1);
354 }
355
356 static void KillEveryoneCode(n)
357 int n;
358 {
359   char _s[100];
360   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
361   charmrun_abort(_s);
362   machine_exit(1);
363 }
364
365 CpvExtern(int, freezeModeFlag);
366
367 static void KillOnAllSigs(int sigNo)
368 {
369   const char *sig="unknown signal";
370   const char *suggestion="";
371   if (machine_initiated_shutdown ||
372       already_in_signal_handler) 
373         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
374   already_in_signal_handler=1;
375
376 #if CMK_CCS_AVAILABLE
377   if (CpvAccess(cmiArgDebugFlag)) {
378     int reply = 0;
379     CpdNotify(CPD_SIGNAL,sigNo);
380 #if ! CMK_BIGSIM_CHARM
381     CcsSendReplyNoError(4,&reply);/*Send an empty reply if not*/
382     CpvAccess(freezeModeFlag) = 1;
383     CpdFreezeModeScheduler();
384 #else
385     CpdFreeze();
386 #endif
387   }
388 #endif
389   
390   CmiDestroyLocks();            /* destory locks */
391
392   if (sigNo==SIGSEGV) {
393      sig="segmentation violation";
394      suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
395   }
396   if (sigNo==SIGFPE) {
397      sig="floating point exception";
398      suggestion="Check for integer or floating-point division by zero.\n";
399   }
400   if (sigNo==SIGBUS) {
401      sig="bus error";
402      suggestion="Check for misaligned reads or writes to memory.\n";
403   }
404   if (sigNo==SIGILL) {
405      sig="illegal instruction";
406      suggestion="Check for calls to uninitialized function pointers.\n";
407   }
408   if (sigNo==SIGKILL) sig="caught signal KILL";
409   if (sigNo==SIGQUIT) sig="caught signal QUIT";
410   if (sigNo==SIGTERM) sig="caught signal TERM";
411   MACHSTATE1(5,"     Caught signal %s ",sig);
412 /*ifdef this part*/
413 #ifdef __FAULT__
414   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
415                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
416   }else{
417 #else
418         {
419 #endif
420    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
421         "Signal: %s\n",CmiMyPe(),sig);
422         if (0!=suggestion[0])
423                 CmiError("Suggestion: %s",suggestion);
424         CmiPrintStackTrace(1);
425         charmrun_abort(sig);
426         machine_exit(1);                
427         }       
428 }
429
430 static void machine_atexit_check(void)
431 {
432   if (!machine_initiated_shutdown)
433     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
434   printf("Program finished.\n");
435 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
436   fgetc(stdin);
437 #endif
438 }
439
440 #if !defined(_WIN32) || defined(__CYGWIN__)
441 static void HandleUserSignals(int signum)
442 {
443   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
444   CcdRaiseCondition(condnum);
445 }
446 #endif
447
448 /*****************************************************************************
449  *
450  *     Utility routines for network machine interface.
451  *
452  *****************************************************************************/
453
454 /*
455 Horrific #defines to hide the differences between select() and poll().
456  */
457 #if CMK_USE_POLL /*poll() version*/
458 # define CMK_PIPE_DECL(delayMs) \
459         struct pollfd fds[10]; \
460         int nFds_sto=0; int *nFds=&nFds_sto; \
461         int pollDelayMs=delayMs;
462 # define CMK_PIPE_SUB fds,nFds
463 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
464
465 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
466 # define CMK_PIPE_ADDREAD(rd_fd) \
467         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
468 # define CMK_PIPE_ADDWRITE(wr_fd) \
469         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
470 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
471 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
472
473 #elif CMK_USE_KQUEUE /* kqueue version */
474
475 # define CMK_PIPE_DECL(delayMs) \
476         if (_kq == -1) _kq = kqueue(); \
477     struct kevent ke_sto; \
478     struct kevent* ke = &ke_sto; \
479     struct timespec tmo; \
480     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
481 # define CMK_PIPE_SUB ke
482 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
483
484 # define CMK_PIPE_PARAM struct kevent* ke
485 # define CMK_PIPE_ADDREAD(rd_fd) \
486         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
487                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
488 # define CMK_PIPE_ADDWRITE(wr_fd) \
489         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
490                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
491 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
492 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
493
494 #else /*select() version*/
495
496 # define CMK_PIPE_DECL(delayMs) \
497         fd_set rfds_sto,wfds_sto;\
498         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
499         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
500 # define CMK_PIPE_SUB rfds,wfds
501 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
502
503 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
504 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
505 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
506 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
507 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
508 #endif
509
510 static void CMK_PIPE_CHECKERR(void) {
511 #if defined(_WIN32) && !defined(__CYGWIN__)
512 /* Win32 socket seems to randomly return inexplicable errors
513 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
514         int err=WSAGetLastError();
515         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
516 o,err);
517 */
518 #else /*UNIX machine*/
519         if (errno!=EINTR)
520                 KillEveryone("Socket error in CheckSocketsReady!\n");
521 #endif
522 }
523
524
525 static void CmiStdoutFlush(void);
526 static int  CmiStdoutNeedsService(void);
527 static void CmiStdoutService(void);
528 static void CmiStdoutAdd(CMK_PIPE_PARAM);
529 static void CmiStdoutCheck(CMK_PIPE_PARAM);
530
531
532 double GetClock(void)
533 {
534 #if defined(_WIN32) && !defined(__CYGWIN__)
535   struct _timeb tv; 
536   _ftime(&tv);
537   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
538 #else
539   struct timeval tv; int ok;
540   ok = gettimeofday(&tv, NULL);
541   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
542   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
543 #endif
544 }
545
546
547 /***********************************************************************
548  *
549  * Abort function:
550  *
551  ************************************************************************/
552
553 static int  Cmi_truecrash;
554 static int already_aborting=0;
555 void LrtsAbort(const char *message)
556 {
557   if (already_aborting) machine_exit(1);
558   already_aborting=1;
559   MACHSTATE1(5,"CmiAbort(%s)",message);
560
561 #if CMK_CCS_AVAILABLE
562   /* if CharmDebug is attached simply try to send a message to it */
563   if (CpvAccess(cmiArgDebugFlag)) {
564     CpdNotify(CPD_ABORT, message);
565     CpdFreeze();
566   }
567 #endif
568   
569   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
570         "Reason: %s\n",CmiMyPe(),message);
571   CmiPrintStackTrace(0);
572   
573   /*Send off any remaining prints*/
574   CmiStdoutFlush();
575   
576   if(Cmi_truecrash) {
577     printf("CHARM++ FATAL ERROR: %s\n", message);
578     *(int *)NULL = 0; /*Write to null, causing bus error*/
579   } else {
580     charmrun_abort(message);
581     machine_exit(1);
582   }
583 }
584
585
586 /******************************************************************************
587  *
588  * CmiEnableAsyncIO
589  *
590  * The net and tcp versions use a bunch of unix processes talking to each
591  * other via file descriptors.  We need for a signal SIGIO to be generated
592  * each time a message arrives, making it possible to write a signal
593  * handler to handle the messages.  The vast majority of unixes can,
594  * in fact, do this.  However, there isn't any standard for how this is
595  * supposed to be done, so each version of UNIX has a different set of
596  * calls to turn this signal on.  So, there is like one version here for
597  * every major brand of UNIX.
598  *
599  *****************************************************************************/
600
601 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
602 #include <fcntl.h>
603 void CmiEnableAsyncIO(int fd)
604 {
605   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
606     CmiError("setting socket owner: %s\n", strerror(errno)) ;
607     exit(1);
608   }
609   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
610     CmiError("setting socket async: %s\n", strerror(errno)) ;
611     exit(1);
612   }
613 }
614 #else
615 void CmiEnableAsyncIO(int fd) { }
616 #endif
617
618 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
619 #if !defined(_WIN32) || defined(__CYGWIN__)
620 void CmiEnableNonblockingIO(int fd) {
621   int on=1;
622   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
623     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
624     exit(1);
625   }
626 }
627 #else
628 void CmiEnableNonblockingIO(int fd) { }
629 #endif
630
631
632 /******************************************************************************
633 *
634 * Configuration Data
635 *
636 * This data is all read in from the NETSTART variable (provided by the
637 * charmrun) and from the command-line arguments.  Once read in, it is never
638  * modified.
639  *
640  *****************************************************************************/
641
642 static skt_ip_t   Cmi_self_IP;
643 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
644 static int        Cmi_charmrun_port;
645 static int        Cmi_charmrun_pid;
646 static int        Cmi_charmrun_fd=-1;
647 /* Magic number to be used for sanity check in messege header */
648 static int                              Cmi_net_magic;
649
650 static int    Cmi_netpoll;
651 static int    Cmi_asyncio;
652 static int    Cmi_idlepoll;
653 static int    Cmi_syncprint;
654 static int Cmi_print_stats = 0;
655
656 #if ! defined(_WIN32)\r
657 /* parse forks only used in non-smp mode */
658 static void parse_forks(void) {
659   char *forkstr;
660   int nread;
661   int forks;
662   int i,pid;
663   forkstr=getenv("CmiMyForks");
664   if(forkstr!=0) { /* charmrun */
665         nread = sscanf(forkstr,"%d",&forks);
666         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
667                 pid=fork();
668                 if(pid<0) CmiAbort("Fork returned an error");
669                 if(pid==0) { /* forked process */
670                         /* reset mynode,pe & exit loop */
671                         _Cmi_mynode+=i;
672 #if ! CMK_SMP\r
673                         _Cmi_mype+=i;
674 #endif\r
675                         break;
676                 }
677         }
678   }
679 }
680 #endif
681 static void parse_magic(void)
682 {
683         char* nm;       
684         int nread;
685   nm = getenv("NETMAGIC");
686   if (nm!=0) 
687   {/*Read values set by Charmrun*/
688         nread = sscanf(nm, "%d",&Cmi_net_magic);
689         }
690 }
691 static void parse_netstart(void)
692 {
693   char *ns;
694   int nread;
695   int port;
696   ns = getenv("NETSTART");
697   if (ns!=0) 
698   {/*Read values set by Charmrun*/
699         char Cmi_charmrun_name[1024];
700         nread = sscanf(ns, "%d%s%d%d%d",
701                  &_Cmi_mynode,
702                  Cmi_charmrun_name, &Cmi_charmrun_port,
703                  &Cmi_charmrun_pid, &port);
704         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
705
706         if (nread!=5) {
707                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
708                 exit(1);
709         }
710   } else 
711   {/*No charmrun-- set flag values for standalone operation*/
712         _Cmi_mynode=0;
713         Cmi_charmrun_IP=_skt_invalid_ip;
714         Cmi_charmrun_port=0;
715         Cmi_charmrun_pid=0;
716         dataport = -1;
717   }
718 #if CMK_USE_IBVERBS | CMK_USE_IBUD
719         char *cmi_num_nodes = getenv("CmiNumNodes");
720         if(cmi_num_nodes != NULL){
721                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
722         }
723 #endif  
724 }
725
726 static void extract_common_args(char **argv)
727 {
728   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
729     Cmi_print_stats = 1;
730 }
731
732
733 /******************************************************************************
734  *
735  * Packet Performance Logging
736  *
737  * This module is designed to give a detailed log of the packets and their
738  * acknowledgements, for performance tuning.  It can be disabled.
739  *
740  *****************************************************************************/
741
742 #define LOGGING 0
743
744 #if LOGGING
745
746 typedef struct logent {
747   double time;
748   int seqno;
749   int srcpe;
750   int dstpe;
751   int kind;
752 } *logent;
753
754
755 logent log;
756 int    log_pos;
757 int    log_wrap;
758
759 static void log_init(void)
760 {
761   log = (logent)malloc(50000 * sizeof(struct logent));
762   _MEMCHECK(log);
763   log_pos = 0;
764   log_wrap = 0;
765 }
766
767 static void log_done(void)
768 {
769   char logname[100]; FILE *f; int i, size;
770   sprintf(logname, "log.%d", _Cmi_mynode);
771   f = fopen(logname, "w");
772   if (f==0) KillEveryone("fopen problem");
773   if (log_wrap) size = 50000; else size=log_pos;
774   for (i=0; i<size; i++) {
775     logent ent = log+i;
776     fprintf(f, "%1.4f %d %c %d %d\n",
777             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
778   }
779   fclose(f);
780 }
781
782 void printLog(void)
783 {
784   char logname[100]; FILE *f; int i, j, size;
785   static int logged = 0;
786   if (logged)
787       return;
788   logged = 1;
789   CmiPrintf("Logging: %d\n", _Cmi_mynode);
790   sprintf(logname, "log.%d", _Cmi_mynode);
791   f = fopen(logname, "w");
792   if (f==0) KillEveryone("fopen problem");
793   for (i = 5000; i; i--)
794   {
795   /*for (i=0; i<size; i++) */
796     j = log_pos - i;
797     if (j < 0)
798     {
799         if (log_wrap)
800             j = 5000 + j;
801         else
802             j = 0;
803     };
804     {
805     logent ent = log+j;
806     fprintf(f, "%1.4f %d %c %d %d\n",
807             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
808     }
809   }
810   fclose(f);
811   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
812 }
813
814 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
815
816 #endif
817
818 #if !LOGGING
819
820 #define log_init() /*empty*/
821 #define log_done() /*empty*/
822 #define printLog() /*empty*/
823 #define LOG(t,s,k,d,q) /*empty*/
824
825 #endif
826
827 /******************************************************************************
828  *
829  * Node state
830  *
831  *****************************************************************************/
832
833
834 static CmiNodeLock    Cmi_scanf_mutex;
835 static double         Cmi_clock;
836 static double         Cmi_check_delay = 3.0;
837
838 /******************************************************************************
839  *
840  * OS Threads
841  * SMP implementation moved to machine-smp.c
842  *****************************************************************************/
843
844 /************************ No kernel SMP threads ***************/
845 #if !CMK_SMP 
846
847 static volatile int memflag=0;
848 void CmiMemLock() { memflag++; }
849 void CmiMemUnlock() { memflag--; }
850
851 static volatile int comm_flag=0;
852 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
853 #ifndef MACHLOCK_DEBUG
854 #  define CmiCommLock() (comm_flag=1)
855 #  define CmiCommUnlock() (comm_flag=0)
856 #else /* Error-checking flag locks */
857 void CmiCommLock(void) {
858   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
859   comm_flag=1;
860 }
861 void CmiCommUnlock(void) {
862   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
863   comm_flag=0;
864 }
865 #endif
866
867 //int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
868 _Cmi_myrank=0;
869
870 static void CommunicationInterrupt(int ignored)
871 {
872   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
873   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
874   { /* Already busy inside malloc, comm, or immediate messages */
875     MACHSTATE(5,"--SKIPPING SIGIO--");
876     return;
877   }
878   MACHSTATE1(2,"--BEGIN SIGIO comm_mutex_isLocked: %d--", comm_flag)
879   {
880     /*Make sure any malloc's we do in here are NOT migratable:*/
881     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
882 /*    _Cmi_myrank=1; */
883     CommunicationServerNet(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
884     //CommunicationServer(0);  /* from interrupt */
885 /*    _Cmi_myrank=0; */
886     CmiIsomallocBlockListActivate(oldList);
887   }
888   MACHSTATE(2,"--END SIGIO--")
889 }
890
891 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
892
893 static void CmiStartThreadsNet(char **argv)
894 {
895   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
896   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
897   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
898     KillEveryone
899       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
900
901   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
902   _Cmi_mype = Cmi_nodestart;
903
904   /* Prepare Cpv's for immediate messages: */
905   _Cmi_myrank=1;
906   CommunicationServerInit();
907   _Cmi_myrank=0;
908
909 #if !CMK_ASYNC_NOT_NEEDED
910   if (Cmi_asyncio)
911   {
912     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
913     if (!Cmi_netpoll) {
914       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
915       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
916     }
917 #if CMK_USE_GM || CMK_USE_MX
918       /* charmrun is serviced in interrupt for gm */
919     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
920 #endif
921   }
922 #endif
923 }
924
925 static void CmiDestroyLocks()
926 {
927   comm_flag = 0;
928   memflag = 0;
929 }
930
931 #endif
932
933 CpvExtern(int,_charmEpoch);
934
935 /*Add a message to this processor's receive queue 
936   Must be called while holding comm. lock
937 */
938
939 extern double evacTime;
940
941
942 /***************************************************************
943  Communication with charmrun:
944  We can send (ctrl_sendone) and receive (ctrl_getone)
945  messages on a TCP socket connected to charmrun.
946  This is used for printfs, CCS, etc; and also for
947  killing ourselves if charmrun dies.
948 */
949
950 /*This flag prevents simultanious outgoing
951 messages on the charmrun socket.  It is protected
952 by the commlock.*/
953 static int Cmi_charmrun_fd_sendflag=0;
954
955 /* ctrl_sendone */
956 static int sendone_abort_fn(int code,const char *msg) {
957         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
958         machine_exit(1);
959         return -1;
960 }
961
962 static void ctrl_sendone_nolock(const char *type,
963                                 const char *data1,int dataLen1,
964                                 const char *data2,int dataLen2)
965 {
966   const void *bufs[3]; int lens[3]; int nBuffers=0;
967   ChMessageHeader hdr;
968   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
969   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
970   if (Cmi_charmrun_fd==-1) 
971         charmrun_abort("ctrl_sendone called in standalone!\n");
972   Cmi_charmrun_fd_sendflag=1;
973   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
974   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
975   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
976   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
977   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
978   Cmi_charmrun_fd_sendflag=0;
979   skt_set_abort(oldAbort);
980   MACHSTATE(2,"} ctrl_sendone_nolock");
981 }
982
983 static void ctrl_sendone_locking(const char *type,
984                                 const char *data1,int dataLen1,
985                                 const char *data2,int dataLen2)
986 {
987   CmiCommLock();
988   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
989   CmiCommUnlock();
990 }
991
992 #ifndef MEMORYUSAGE_OUTPUT
993 #define MEMORYUSAGE_OUTPUT 0 
994 #endif
995 #if MEMORYUSAGE_OUTPUT 
996 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
997 static int memoryusage_counter;
998 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
999 #define memoryusage_output {\
1000   memoryusage_counter++;\
1001   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1002 #endif
1003
1004 static double Cmi_check_last;
1005
1006 /* if charmrun dies, we finish */
1007 static void pingCharmrun(void *ignored) 
1008 {
1009 #if MEMORYUSAGE_OUTPUT
1010   memoryusage_output;
1011   if(memoryusage_isOutput){
1012     memoryusage_counter = 0;
1013 #else
1014   {
1015 #endif 
1016
1017   double clock=GetClock();
1018   if (clock > Cmi_check_last + Cmi_check_delay) {
1019     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1020     Cmi_check_last = clock; 
1021 #if CMK_USE_GM || CMK_USE_MX
1022     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1023 #endif
1024     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1025     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1026     CmiCommLock();
1027     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1028     CmiCommUnlock();
1029   }
1030 #if 1
1031 #if CMK_USE_GM || CMK_USE_MX
1032   if (!Cmi_netpoll)
1033 #endif
1034   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1035 #endif
1036   }
1037 }
1038
1039 /* periodic charm ping, for gm and netpoll */
1040 static void pingCharmrunPeriodic(void *ignored)
1041 {
1042   pingCharmrun(ignored);
1043   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1044 }
1045
1046 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1047 static void charmrun_abort(const char *s)
1048 {
1049   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1050         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1051 CmiPrintStackTrace(0);
1052         abort();
1053   } else {
1054         char msgBuf[80];
1055         skt_set_abort(ignore_further_errors);
1056         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1057         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1058   }
1059 }
1060
1061 /* ctrl_getone */
1062
1063 #ifdef __FAULT__
1064 #include "machine-recover.c"
1065 #endif
1066
1067 static void node_addresses_store(ChMessage *msg);
1068
1069 static int barrierReceived = 0;
1070
1071 static void ctrl_getone(void)
1072 {
1073   ChMessage msg;
1074   MACHSTATE(2,"ctrl_getone")
1075   MACHLOCK_ASSERT(comm_mutex_isLocked,"ctrl_getone")
1076   ChMessage_recv(Cmi_charmrun_fd,&msg);
1077   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1078
1079   if (strcmp(msg.header.type,"die")==0) {
1080     MACHSTATE(2,"ctrl_getone bye bye")
1081     fprintf(stderr,"aborting: %s\n",msg.data);
1082     log_done();
1083     ConverseCommonExit();
1084     machine_exit(0);
1085   } 
1086 #if CMK_CCS_AVAILABLE
1087   else if (strcmp(msg.header.type, "req_fw")==0) {
1088     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1089         /*Sadly, I *can't* do a:
1090       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1091         here, because I can't send converse messages in the
1092         communication thread.  I *can* poke this message into 
1093         any convenient processor's queue, though:  (OSL, 9/14/2000)
1094         */
1095     int pe=0;/*<- node-local processor number. Any one will do.*/
1096     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1097     MACHSTATE(2,"Incoming CCS request");
1098     if (cmsg!=NULL) CmiPushPE(pe,cmsg);
1099   }
1100 #endif
1101 #ifdef __FAULT__        
1102   else if(strcmp(msg.header.type,"crashnode")==0) {
1103         crash_node_handle(&msg);
1104   }
1105   else if(strcmp(msg.header.type,"initnodetab")==0) {
1106         /** A processor crashed and got recreated. So charmrun sent 
1107           across the whole nodetable data to update this processor*/
1108         node_addresses_store(&msg);
1109         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1110   }
1111 #endif
1112   else if(strcmp(msg.header.type,"barrier")==0) {
1113         barrierReceived = 1;
1114   }
1115   else if(strcmp(msg.header.type,"barrier0")==0) {
1116         barrierReceived = 2;
1117   }
1118   else {
1119   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1120    * which is not available to the communication thread on an SMP version.
1121    */
1122     /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1123     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1124     machine_exit(1);
1125   }
1126   
1127   MACHSTATE(2,"ctrl_getone done")
1128   ChMessage_free(&msg);
1129 }
1130
1131 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1132 /*Deliver this reply data to this reply socket.
1133   The data is forwarded to CCS server via charmrun.*/
1134 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1135 {
1136   MACHSTATE(2,"Outgoing CCS reply");
1137   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1138       repData,repLen);
1139   MACHSTATE(1,"Outgoing CCS reply away");
1140 }
1141 #endif
1142
1143 /*****************************************************************************
1144  *
1145  * CmiPrintf, CmiError, CmiScanf
1146  *
1147  *****************************************************************************/
1148 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1149 static void InternalPrintf(const char *f, va_list l)
1150 {
1151   ChMessage replymsg;
1152   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1153   CmiStdoutFlush();
1154   vsprintf(buffer, f, l);
1155   if(Cmi_syncprint) {
1156           CmiCommLock();
1157           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1158           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1159           ChMessage_free(&replymsg);
1160           CmiCommUnlock();
1161   } else {
1162           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1163   }
1164   InternalWriteToTerminal(0,buffer,strlen(buffer));
1165   CmiTmpFree(buffer);
1166 }
1167
1168 static void InternalError(const char *f, va_list l)
1169 {
1170   ChMessage replymsg;
1171   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1172   CmiStdoutFlush();
1173   vsprintf(buffer, f, l);
1174   if(Cmi_syncprint) {
1175           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1176           CmiCommLock();
1177           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1178           ChMessage_free(&replymsg);
1179           CmiCommUnlock();
1180   } else {
1181           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1182   }
1183   InternalWriteToTerminal(1,buffer,strlen(buffer));
1184   CmiTmpFree(buffer);
1185 }
1186
1187 static int InternalScanf(char *fmt, va_list l)
1188 {
1189   ChMessage replymsg;
1190   char *ptr[20];
1191   char *p; int nargs, i;
1192   nargs=0;
1193   p=fmt;
1194   while (*p) {
1195     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1196     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1197     if (p[0]=='%') { nargs++; p++; continue; }
1198     if (*p=='\n') *p=' '; p++;
1199   }
1200   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1201   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1202   CmiLock(Cmi_scanf_mutex);
1203   if (Cmi_charmrun_fd!=-1)
1204   {/*Send charmrun the format string*/
1205         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1206         /*Wait for the reply (characters to scan) from charmrun*/
1207         CmiCommLock();
1208         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1209         i = sscanf((char*)replymsg.data, fmt,
1210                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1211                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1212                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1213         ChMessage_free(&replymsg);
1214         CmiCommUnlock();
1215   } else
1216   {/*Just do the scanf normally*/
1217         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1218                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1219                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1220   }
1221   CmiUnlock(Cmi_scanf_mutex);
1222   return i;
1223 }
1224
1225 #if CMK_CMIPRINTF_IS_A_BUILTIN
1226
1227 /*New stdarg.h declarations*/
1228 void CmiPrintf(const char *fmt, ...)
1229 {
1230   CpdSystemEnter();
1231   {
1232   va_list p; va_start(p, fmt);
1233   if (Cmi_charmrun_fd!=-1)
1234     InternalPrintf(fmt, p);
1235   else
1236     vfprintf(stdout,fmt,p);
1237   va_end(p);
1238   }
1239   CpdSystemExit();
1240 }
1241
1242 void CmiError(const char *fmt, ...)
1243 {
1244   CpdSystemEnter();
1245   {
1246   va_list p; va_start (p, fmt);
1247   if (Cmi_charmrun_fd!=-1)
1248     InternalError(fmt, p);
1249   else
1250     vfprintf(stderr,fmt,p);
1251   va_end(p);
1252   }
1253   CpdSystemExit();
1254 }
1255
1256 int CmiScanf(const char *fmt, ...)
1257 {
1258   int i;
1259   CpdSystemEnter();
1260   {
1261   va_list p; va_start(p, fmt);
1262   i = InternalScanf((char *)fmt, p);
1263   va_end(p);
1264   }
1265   CpdSystemExit();
1266   return i;
1267 }
1268
1269 #endif
1270
1271 /***************************************************************************
1272  * Output redirection:
1273  *  When people don't use CkPrintf, like above, we'd still like to be able
1274  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1275  * which lets us read the characters sent to stdout at our lesiure.
1276  ***************************************************************************/
1277
1278 /*Can read from stdout or stderr using these fd's*/
1279 static int readStdout[2]; 
1280 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1281 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1282 #define readStdoutBufLen (16*1024)
1283 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1284 static int servicingStdout;
1285
1286 /*Initialization-- should only be called once per node*/
1287 static void CmiStdoutInit(void) {
1288         int i;
1289         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1290
1291 /*There's some way to do this same thing in windows, but I don't know how*/
1292 #if !defined(_WIN32) || defined(__CYGWIN__)
1293         /*Prevent buffering in stdio library:*/
1294         setbuf(stdout,NULL); setbuf(stderr,NULL);
1295
1296         /*Reopen stdout and stderr fd's as new pipes:*/
1297         for (i=0;i<2;i++) {
1298                 int pair[2];
1299                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1300                 
1301                 /*First, save a copy of the original stdout*/
1302                 writeStdout[i]=dup(srcFd);
1303 #if 0
1304                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1305                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1306 #else
1307                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1308                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1309                         {perror("building stdio redirection socketpair"); exit(1);}
1310 #endif
1311                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1312                 if (-1==dup2(srcFd,pair[1])) {perror("dup2 redirection pipe"); exit(1);}
1313                 
1314 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1315                 CmiEnableNonblockingIO(srcFd);
1316 #endif
1317 //NOTSURE #if CMK_SHARED_VARS_UNAVAILABLE
1318 #if !CMK_SMP 
1319                 if (Cmi_asyncio)
1320                 {
1321   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1322                         CmiEnableAsyncIO(readStdout[i]);
1323                 }
1324 #endif
1325         }
1326 #else
1327 /*Windows system-- just fake reads for now*/
1328 # ifndef read
1329 #  define read(x,y,z) 0
1330 # endif
1331 # ifndef write
1332 #  define write(x,y,z) 
1333 # endif
1334 #endif
1335 }
1336
1337 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1338 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1339 {
1340         write(writeStdout[isStdErr],str,len);   
1341 }
1342
1343 /*
1344   Service this particular stdout pipe.  
1345   Must hold comm. lock.
1346 */
1347 static void CmiStdoutServiceOne(int i) {
1348         int nBytes;
1349         const static char *cmdName[2]={"print","printerr"};
1350         servicingStdout=1;
1351         while(1) {
1352                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1353                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1354                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1355                 if (nBytes<=0) break; /*Nothing to send*/
1356                 
1357                 /*Send these bytes off to charmrun*/
1358                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1359                 nBytes++; /*Include zero-terminator in message to charmrun*/
1360                 
1361                 if (nBytes>=readStdoutBufLen-100) 
1362                 { /*We must have filled up our output pipe-- most output libraries
1363                    don't handle this well (e.g., glibc printf just drops the line).*/
1364                         
1365                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1366                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1367                         nBytes--; /*Remove terminator from user's data*/
1368                         tooMuchLen=strlen(tooMuchWarn)+1;
1369                 }
1370                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1371                                     tooMuchWarn,tooMuchLen);
1372                 
1373                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1374         }
1375         servicingStdout=0;
1376         serviceStdout[i]=0; /*This pipe is now serviced*/
1377 }
1378
1379 /*Service all stdout pipes, whether it looks like they need it
1380   or not.  Used when you aren't sure if select() has been called recently.
1381   Must hold comm. lock.
1382 */
1383 static void CmiStdoutServiceAll(void) {
1384         int i;
1385         for (i=0;i<2;i++) {
1386                 if (readStdout[i]==0) continue; /*Pipe not open*/
1387                 CmiStdoutServiceOne(i);
1388         }
1389 }
1390
1391 /*Service any outstanding stdout pipes.
1392   Must hold comm. lock.
1393 */
1394 static void CmiStdoutService(void) {
1395         CmiStdoutServiceAll();
1396 }
1397
1398 /*Add our pipes to the pile for select() or poll().
1399   Both can be called with or without the comm. lock.
1400 */
1401 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1402         int i;
1403         for (i=0;i<2;i++) {
1404                 if (readStdout[i]==0) continue; /*Pipe not open*/
1405                 CMK_PIPE_ADDREAD(readStdout[i]);
1406         }
1407 }
1408 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1409         int i;
1410         for (i=0;i<2;i++) {
1411                 if (readStdout[i]==0) continue; /*Pipe not open*/
1412                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1413         }
1414 }
1415 static int CmiStdoutNeedsService(void) {
1416         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1417 }
1418
1419 /*Called every few milliseconds to flush the stdout pipes*/
1420 static void CmiStdoutFlush(void) {
1421         if (servicingStdout) return; /* might be called by SIGALRM */
1422         CmiCommLockOrElse( return; )
1423         CmiCommLock();
1424         CmiStdoutServiceAll();
1425         CmiCommUnlock();
1426 }
1427
1428 /***************************************************************************
1429  * Message Delivery:
1430  *
1431  ***************************************************************************/
1432
1433 #include "machine-dgram.c"
1434
1435
1436 /*****************************************************************************
1437  *
1438  * node_addresses
1439  *
1440  *  These two functions fill the node-table.
1441  *
1442  *
1443  *   This node, like all others, first sends its own address to charmrun
1444  *   using this command:
1445  *
1446  *     Type: nodeinfo
1447  *     Data: Big-endian 4-byte ints
1448  *           <my-node #><Dataport>
1449  *
1450  *   When charmrun has all the addresses, he sends this table to me:
1451  *
1452  *     Type: nodes
1453  *     Data: Big-endian 4-byte ints
1454  *           <number of nodes n>
1455  *           <#PEs><IP><Dataport> Node 0
1456  *           <#PEs><IP><Dataport> Node 1
1457  *           ...
1458  *           <#PEs><IP><Dataport> Node n-1
1459  *
1460  *****************************************************************************/
1461
1462 #if CMK_USE_IBVERBS
1463 void copyInfiAddr(ChInfiAddr *qpList);
1464 #endif
1465
1466 #if CMK_IBVERBS_FAST_START
1467 static void send_partial_init()
1468 {
1469   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1470         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1471 }       
1472 #endif
1473
1474
1475 /*Note: node_addresses_obtain is called before starting
1476   threads, so no locks are needed (or valid!)*/
1477 static void node_addresses_obtain(char **argv)
1478 {
1479   ChMessage nodetabmsg; /* info about all nodes*/
1480   MACHSTATE(3,"node_addresses_obtain { ");
1481   if (Cmi_charmrun_fd==-1) 
1482   {/*Standalone-- fake a single-node nodetab message*/
1483         int npes=1;
1484         ChSingleNodeinfo *fakeTab;
1485         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1486         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1487         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1488 //#if CMK_SHARED_VARS_UNAVAILABLE
1489 #if !CMK_SMP 
1490         if (npes!=1) {
1491                 fprintf(stderr,
1492                         "To use multiple processors, you must run this program as:\n"
1493                         " > charmrun +p%d %s <args>\n"
1494                         "or build the %s-smp version of Charm++.\n",
1495                         npes,argv[0],CMK_MACHINE_NAME);
1496                 exit(1);
1497         }
1498 #else
1499         /* standalone smp version reads ppn */
1500         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1501                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1502           npes = _Cmi_mynodesize;
1503 #endif
1504         /*This is a stupid hack: we expect the *number* of nodes
1505         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1506         (which happens to have exactly that layout!) and stuff
1507         a 1 into the "node number" slot
1508         */
1509         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1510         fakeTab->info.nPE=ChMessageInt_new(npes);
1511         fakeTab->info.dataport=ChMessageInt_new(0);
1512         fakeTab->info.IP=_skt_invalid_ip;
1513   }
1514   else 
1515   { /*Contact charmrun for machine info.*/
1516         ChSingleNodeinfo me;
1517
1518         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1519
1520 #if CMK_USE_IBVERBS
1521         {
1522                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1523                 me.info.qpList = malloc(qpListSize);
1524                 copyInfiAddr(me.info.qpList);
1525                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1526                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1527                 free(me.info.qpList);
1528         }
1529 #else
1530         /*The nPE fields are set by charmrun--
1531           these values don't matter. 
1532           Set IP in case it is mpiexec mode where charmrun does not have IP yet
1533         */
1534         me.info.nPE=ChMessageInt_new(0);
1535         /* me.info.IP=_skt_invalid_ip; */
1536         me.info.IP=skt_innode_my_ip();
1537         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1538 #ifdef CMK_USE_MX
1539         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1540 #endif
1541 #if CMK_USE_IBUD
1542         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1543         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1544         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1545         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1546 #endif
1547         me.info.dataport=ChMessageInt_new(dataport);
1548
1549         /*Send our node info. to charmrun.
1550         CommLock hasn't been initialized yet-- 
1551         use non-locking version*/
1552         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1553         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1554 #endif  //CMK_USE_IBVERBS
1555
1556         MACHSTATE(3,"initnode sent");
1557   
1558         /*We get the other node addresses from a message sent
1559           back via the charmrun control port.*/
1560         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1561                 CmiAbort("Timeout waiting for nodetab!\n");
1562         }
1563         MACHSTATE(2,"recv initnode {");
1564         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1565         MACHSTATE(2,"} recv initnode");
1566   }
1567 //#if CMK_USE_IBVERBS   
1568 //#else
1569   node_addresses_store(&nodetabmsg);
1570   ChMessage_free(&nodetabmsg);
1571 //#endif        
1572   MACHSTATE(3,"} node_addresses_obtain ");
1573 }
1574
1575
1576 /***********************************************************************
1577  * DeliverOutgoingMessage()
1578  *
1579  * This function takes care of delivery of outgoing messages from the
1580  * sender end. Broadcast messages are divided into sets of messages that 
1581  * are bound to the local node, and to remote nodes. For local
1582  * transmission, the messages are directly pushed into the recv
1583  * queues. For non-local transmission, the function DeliverViaNetwork()
1584  * is called
1585  ***********************************************************************/
1586 int DeliverOutgoingMessage(OutgoingMsg ogm)
1587 {
1588   int i, rank, dst; OtherNode node;
1589         
1590   int network = 1;
1591
1592   dst = ogm->dst;
1593
1594   //printf("deliver outgoing message, dest: %d \n", dst);
1595 #ifndef CMK_OPTIMIZE
1596     if (dst<0 || dst>=CmiNumPes())
1597       CmiAbort("Send to out-of-bounds processor!");
1598 #endif
1599     node = nodes_by_pe[dst];
1600     rank = dst - node->nodestart;
1601     if (node->nodestart != Cmi_nodestart) {
1602 #if !CMK_SMP_NOT_RELAX_LOCK                     
1603         CmiCommLock();
1604 #endif          
1605         DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1606         GarbageCollectMsg(ogm);
1607 #if !CMK_SMP_NOT_RELAX_LOCK                     
1608         CmiCommUnlock();
1609 #endif          
1610   }
1611 #if CMK_MULTICORE
1612   network = 0;
1613 #endif
1614   return network;
1615 }
1616
1617
1618 /******************************************************************************
1619  *
1620  * CmiGetNonLocal
1621  *
1622  * The design of this system is that the communication thread does all the
1623  * work, to eliminate as many locking issues as possible.  This is the only
1624  * part of the code that happens in the receiver-thread.
1625  *
1626  * This operation is fairly cheap, it might be worthwhile to inline
1627  * the code into CmiDeliverMsgs to reduce function call overhead.
1628  *
1629  *****************************************************************************/
1630
1631
1632 /**
1633  * Set up an OutgoingMsg structure for this message.
1634  */
1635 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1636   OutgoingMsg ogm;
1637   MallocOutgoingMsg(ogm);
1638   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1639   ogm->size = size;
1640   ogm->data = data;
1641   ogm->src = cs->pe;
1642   ogm->dst = pe;
1643   ogm->freemode = freemode;
1644   ogm->refcount = 0;
1645   return (CmiCommHandle)ogm;    
1646 }
1647
1648
1649 /******************************************************************************
1650  *
1651  * CmiGeneralSend
1652  *
1653  * Description: This is a generic message sending routine. All the
1654  * converse message send functions are implemented in terms of this
1655  * function. (By setting appropriate flags (eg freemode) that tell
1656  * CmiGeneralSend() how exactly to handle the particular case of
1657  * message send)
1658  *
1659  *****************************************************************************/
1660
1661 //CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1662 CmiCommHandle LrtsSendFunc(int destNode, int pe, int size, char *data, int freemode)
1663 {
1664   int sendonnetwork;
1665   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1666   MACHSTATE(1,"CmiGeneralSend {");
1667
1668   CmiMsgHeaderSetLength(data, size);
1669   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
1670
1671 #if CMK_SMP_NOT_RELAX_LOCK  
1672   CmiCommLock();
1673 #endif  
1674   
1675   sendonnetwork = DeliverOutgoingMessage(ogm);
1676   
1677 #if CMK_SMP_NOT_RELAX_LOCK  
1678   CmiCommUnlock();
1679 #endif  
1680   
1681   MACHSTATE(1,"}  LrtsSend");
1682   return (CmiCommHandle)ogm;
1683 }
1684
1685
1686 /******************************************************************************
1687  *
1688  * Comm Handle manipulation.
1689  *
1690  *****************************************************************************/
1691
1692 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1693
1694 /*****************************************************************************
1695  *
1696  * NET version List-Cast and Multicast Code
1697  *
1698  ****************************************************************************/
1699                                                                                 
1700 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1701 {
1702   int i;
1703   for(i=0;i<npes;i++) {
1704     CmiReference(msg);
1705     CmiSyncSendAndFree(pes[i], len, msg);
1706   }
1707 }
1708                                                                                 
1709 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1710 {
1711   CmiError("ListSend not implemented.");
1712   return (CmiCommHandle) 0;
1713 }
1714                                                                                 
1715 /* 
1716   because in all net versions, the message buffer after CmiSyncSendAndFree
1717   returns is not changed, we can use memory reference trick to avoid 
1718   memory copying here
1719 */
1720 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1721 {
1722   int i;
1723   for(i=0;i<npes;i++) {
1724     CmiReference(msg);
1725     CmiSyncSendAndFree(pes[i], len, msg);
1726   }
1727   CmiFree(msg);
1728 }
1729
1730 #endif
1731
1732
1733 void LrtsDrainResources()
1734 {
1735 }
1736
1737 void LrtsPostNonLocal()
1738 {
1739 }
1740
1741 /* Network progress function is used to poll the network when for
1742    messages. This flushes receive buffers on some implementations*/
1743     
1744 #if CMK_MACHINE_PROGRESS_DEFINED
1745 void CmiMachineProgressImpl(){
1746         LrtsAdvanceCommunication(0);
1747 }
1748 #endif
1749
1750 void LrtsAdvanceCommunication(int whileidle)
1751 {
1752   CommunicationServerNet(0, COMM_SERVER_FROM_WORKER);
1753 }
1754
1755 /******************************************************************************
1756  *
1757  * Main code, Init, and Exit
1758  *
1759  *****************************************************************************/
1760
1761 #if CMK_BARRIER_USE_COMMON_CODE
1762
1763 /* happen at node level */
1764 /* must be called on every PE including communication processors */
1765 int CmiBarrier()
1766 {
1767   int len, size, i;
1768   int status;
1769   int numnodes = CmiNumNodes();
1770   static int barrier_phase = 0;
1771
1772   if (Cmi_charmrun_fd == -1) return 0;                // standalone
1773   if (numnodes == 1) {
1774     CmiNodeAllBarrier();
1775     return 0;
1776   }
1777
1778   if (CmiMyRank() == 0) {
1779     ctrl_sendone_locking("barrier",NULL,0,NULL,0);
1780     while (barrierReceived != 1) {
1781       CmiCommLock();
1782       ctrl_getone();
1783       CmiCommUnlock();
1784     }
1785     barrierReceived = 0;
1786     barrier_phase ++;
1787   }
1788
1789   CmiNodeAllBarrier();
1790   /* printf("[%d] OUT of barrier %d \n", CmiMyPe(), barrier_phase); */
1791   return 0;
1792 }
1793
1794
1795 int CmiBarrierZero()
1796 {
1797   int i;
1798   int numnodes = CmiNumNodes();
1799   ChMessage msg;
1800
1801   if (Cmi_charmrun_fd == -1) return 0;                // standalone
1802   if (numnodes == 1) {
1803     CmiNodeAllBarrier();
1804     return 0;
1805   }
1806
1807   if (CmiMyRank() == 0) {
1808     char str[64];
1809     sprintf(str, "%d", CmiMyNode());
1810     ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
1811     if (CmiMyNode() == 0) {
1812       while (barrierReceived != 2) {
1813         CmiCommLock();
1814         ctrl_getone();
1815         CmiCommUnlock();
1816       }
1817       barrierReceived = 0;
1818     }
1819   }
1820
1821   CmiNodeAllBarrier();
1822   return 0;
1823 }
1824
1825 #endif
1826
1827 /******************************************************************************
1828  *
1829  * Main code, Init, and Exit
1830  *
1831  *****************************************************************************/
1832
1833 void LrtsPreCommonInit(int everReturn)
1834 {
1835   CmiNodeAllBarrier();
1836
1837 #if CMK_USE_GM
1838   CmiCheckGmStatus();
1839 #endif
1840     
1841 }
1842
1843 void LrtsPostCommonInit(int everReturn)
1844 {
1845   CmiIdleState *s=CmiNotifyGetState();
1846
1847    /* better to show the status here */
1848   if (CmiMyPe() == 0) {
1849     if (Cmi_netpoll == 1) {
1850       CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
1851     }
1852 #if CMK_SHARED_VARS_UNAVAILABLE
1853     else {
1854       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
1855         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
1856     }
1857 #endif
1858   }       
1859
1860 #if MEMORYUSAGE_OUTPUT
1861   memoryusage_counter = 0;
1862 #endif
1863 #if CMK_USE_GM || CMK_USE_MX
1864   if (Cmi_charmrun_fd != -1)
1865 #endif
1866   {
1867   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
1868       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
1869   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
1870       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
1871   }
1872
1873 #if CMK_SHARED_VARS_UNAVAILABLE
1874   if (Cmi_netpoll) /*Repeatedly call CommServer*/
1875     CcdCallOnConditionKeep(CcdPERIODIC, 
1876         (CcdVoidFn) CommunicationPeriodic, NULL);
1877   else /*Only need this for retransmits*/
1878     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
1879         (CcdVoidFn) CommunicationPeriodic, NULL);
1880 #endif
1881     
1882   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
1883     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
1884 #if CMK_SHARED_VARS_UNAVAILABLE
1885     if (!Cmi_asyncio) {
1886     /* gm cannot live with setitimer */
1887     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1888     }
1889     else {
1890     /*Occasionally ping charmrun, to test if it's dead*/
1891     struct itimerval i;
1892     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
1893 #if MEMORYUSAGE_OUTPUT
1894     i.it_interval.tv_sec = 0;
1895     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1896     i.it_value.tv_sec = 0;
1897     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
1898 #else
1899     i.it_interval.tv_sec = 1;
1900     i.it_interval.tv_usec = 0;
1901     i.it_value.tv_sec = 1;
1902     i.it_value.tv_usec = 0;
1903 #endif
1904     setitimer(ITIMER_REAL, &i, NULL);
1905     }
1906
1907 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
1908     /*Occasionally check for retransmissions, outgoing acks, etc.*/
1909     /*no need for GM case */
1910     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
1911 #endif
1912 #endif
1913       
1914     /*Initialize the clock*/
1915     Cmi_clock=GetClock();
1916   }
1917
1918 #ifdef IGET_FLOWCONTROL 
1919   /* Call the function once to determine the amount of physical memory available */
1920   getAvailSysMem();
1921   /* Call the function to periodically call the token adapt function */
1922   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
1923   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
1924         (CcdVoidFn) TokenUpdatePeriodic, NULL);
1925 #endif
1926   
1927 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
1928   srand((int)(1024.0*CmiWallTimer()));
1929   if (CmiMyPe()==0)
1930     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
1931         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
1932 #endif
1933
1934 #ifdef __ONESIDED_IMPL
1935 #ifdef __ONESIDED_NO_HARDWARE
1936   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
1937   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
1938   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
1939   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
1940 #endif
1941 #ifdef __ONESIDED_GM_HARDWARE
1942   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
1943   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
1944 #endif
1945 #endif
1946     
1947 }
1948
1949 void LrtsExit()
1950 {
1951   MACHSTATE(2,"ConverseExit {");
1952   machine_initiated_shutdown=1;
1953
1954   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
1955   if (CmiMyRank()==0) CmiStdoutFlush();
1956   if (Cmi_charmrun_fd==-1) {
1957     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
1958     else while (1) CmiYield();
1959   }
1960   else {
1961     ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
1962 #if CMK_SHARED_VARS_UNAVAILABLE
1963     Cmi_check_delay = 1.0;      /* speed up checking of charmrun */
1964     while (1) CommunicationServerNet(500, COMM_SERVER_FROM_WORKER);
1965 #elif CMK_MULTICORE
1966         if (!Cmi_commthread && CmiMyRank()==0) {
1967           Cmi_check_delay = 1.0;    /* speed up checking of charmrun */
1968           while (1) CommunicationServerNet(500, COMM_SERVER_FROM_WORKER);
1969         }
1970 #endif
1971   }
1972   MACHSTATE(2,"} ConverseExit");
1973
1974 }
1975
1976 static void set_signals(void)
1977 {
1978   if(!Cmi_truecrash) {
1979     signal(SIGSEGV, KillOnAllSigs);
1980     signal(SIGFPE, KillOnAllSigs);
1981     signal(SIGILL, KillOnAllSigs);
1982     signal(SIGINT, KillOnAllSigs);
1983     signal(SIGTERM, KillOnAllSigs);
1984     signal(SIGABRT, KillOnAllSigs);
1985 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1986     signal(SIGQUIT, KillOnAllSigs);
1987     signal(SIGBUS, KillOnAllSigs);
1988 #     if CMK_HANDLE_SIGUSR
1989     signal(SIGUSR1, HandleUserSignals);
1990     signal(SIGUSR2, HandleUserSignals);
1991 #     endif
1992 #   endif /*UNIX*/
1993   }
1994 }
1995
1996 /*Socket idle function to use before addresses have been
1997   obtained.  During the real program, we idle with CmiYield.
1998 */
1999 static void obtain_idleFn(void) {sleep(0);}
2000
2001 static int net_default_skt_abort(int code,const char *msg)
2002 {
2003   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2004   machine_exit(1);
2005   return -1;
2006 }
2007
2008 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
2009 {
2010 #if CMK_USE_HP_MAIN_FIX
2011 #if FOR_CPLUS
2012   _main(argc,*argv);
2013 #endif
2014 #endif
2015   Cmi_netpoll = 0;
2016 #if CMK_NETPOLL
2017   Cmi_netpoll = 1;
2018 #endif
2019 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2020   Cmi_idlepoll = 0;
2021 #else
2022   Cmi_idlepoll = 1;
2023 #endif
2024   Cmi_truecrash = 0;
2025   if (CmiGetArgFlagDesc(*argv,"+truecrash","Do not install signal handlers") ||
2026       CmiGetArgFlagDesc(*argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2027     /* netpoll disable signal */
2028   if (CmiGetArgFlagDesc(*argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2029   if (CmiGetArgFlagDesc(*argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2030     /* idlepoll use poll instead if sleep when idle */
2031   if (CmiGetArgFlagDesc(*argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2032     /* idlesleep use sleep instead if busywait when idle */
2033   if (CmiGetArgFlagDesc(*argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2034   Cmi_syncprint = CmiGetArgFlagDesc(*argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2035
2036   Cmi_asyncio = 1;
2037 #if CMK_ASYNC_NOT_NEEDED
2038   Cmi_asyncio = 0;
2039 #endif
2040   if (CmiGetArgFlagDesc(*argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2041   if (CmiGetArgFlagDesc(*argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2042 #if CMK_MULTICORE
2043   if (CmiGetArgFlagDesc(*argv,"+commthread","Use communication thread")) {
2044     Cmi_commthread = 1;
2045 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2046     _Cmi_noprocforcommthread = 1;   /* worker thread go sleep */
2047 #endif
2048     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2049   }
2050 #endif
2051
2052   skt_init();
2053   /* use special abort handler instead of default_skt_abort to 
2054      prevent exit trapped by atexit_check() due to the exit() call  */
2055   skt_set_abort(net_default_skt_abort);
2056   atexit(machine_atexit_check);
2057   parse_netstart();
2058   parse_magic();
2059 #if ! defined(_WIN32)\r
2060   /* only get forks in non-smp mode */
2061   parse_forks();
2062 #endif
2063   extract_args(*argv);
2064   log_init();
2065   Cmi_scanf_mutex = CmiCreateLock();
2066
2067     /* NOTE: can not acutally call timer before timerInit ! GZ */
2068   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2069
2070   skt_set_idle(obtain_idleFn);
2071   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2072         set_signals();
2073 #if CMK_USE_TCP
2074         dataskt=skt_server(&dataport);
2075 #elif !CMK_USE_GM && !CMK_USE_MX
2076         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2077 #else
2078           /* GM and MX do not need to create any socket for communication */
2079         dataskt=-1;
2080 #endif
2081         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2082         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2083         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2084         skt_tcp_no_nagle(Cmi_charmrun_fd);
2085         CmiStdoutInit();
2086   } else {/*Standalone operation*/
2087         printf("Charm++: standalone mode (not using charmrun)\n");
2088         dataskt=-1;
2089         Cmi_charmrun_fd=-1;
2090   }
2091
2092   CmiMachineInit(*argv);
2093
2094   node_addresses_obtain(*argv);
2095   MACHSTATE(5,"node_addresses_obtain done");
2096
2097   CmiCommunicationInit(*argv);
2098
2099   skt_set_idle(CmiYield);
2100   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2101
2102   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2103       Cmi_check_delay=1.0e30;
2104
2105 }
2106
2107
2108 #if CMK_CELL
2109
2110 #include "spert_ppu.h"
2111
2112 void machine_OffloadAPIProgress() {
2113   CmiCommLock();
2114   OffloadAPIProgress();
2115   CmiCommUnlock();
2116 }
2117 #endif
2118
2119
2120
2121 /*@}*/