A few bug fixes related to the mlogft machine.
[charm.git] / src / arch / net / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * Basic NET implementation of Converse machine layer
10  * @ingroup NET
11  */
12
13 /** @defgroup NET
14  * NET implementation of machine layer, ethernet in particular
15  * @ingroup Machine
16  *
17  * THE DATAGRAM STREAM
18  *
19  * Messages are sent using UDP datagrams.  The sender allocates a
20  * struct for each datagram to be sent.  These structs stick around
21  * until slightly after the datagram is acknowledged.
22  *
23  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
24  * Each node has an OtherNode struct for every other node in the
25  * system.  The OtherNode struct contains:
26  *
27  *   send_queue   (all datagram-structs not yet transmitted)
28  *   send_window  (all datagram-structs transmitted but not ack'd)
29  *
30  * When an acknowledgement comes in, all packets in the send-window
31  * are either marked as acknowledged or pushed back into the send
32  * queue for retransmission.
33  *
34  * THE OUTGOING MESSAGE
35  *
36  * When you send or broadcast a message, the first thing the system
37  * does is system creates an OutgoingMsg struct to represent the
38  * operation.  The OutgoingMsg contains a very direct expression
39  * of what you want to do:
40  *
41  * OutgoingMsg:
42  *
43  *   size      --- size of message in bytes
44  *   data      --- pointer to the buffer containing the message
45  *   src       --- processor which sent the message
46  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
47  *   freemode  --- see below.
48  *   refcount  --- see below.
49  *
50  * The OutgoingMsg is kept around until the transmission is done, then
51  * it is garbage collected --- the refcount and freemode fields are
52  * to assist garbage collection.
53  *
54  * The freemode indicates which kind of buffer-management policy was
55  * used (sync, async, or freeing).  The sync policy is handled
56  * superficially by immediately converting sync sends into freeing
57  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
58  * (freeing).  If the freemode is 'F', then garbage collection
59  * involves freeing the data and the OutgoingMsg structure itself.  If
60  * the freemode is 'A', then the only cleanup is to change the
61  * freemode to 'X', a condition which is then detectable by
62  * CmiAsyncMsgSent.  In this case, the actual freeing of the
63  * OutgoingMsg is done by CmiReleaseCommHandle.
64  *
65  * When the transmission is initiated, the system computes how many
66  * datagrams need to be sent, total.  This number is stored in the
67  * refcount field.  Each time a datagram is delivered, the refcount
68  * is decremented, when it reaches zero, cleanup is performed.  There
69  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
70  * is a send (not a broadcast) and can be performed with shared
71  * memory, the entire datagram system is bypassed, the message is
72  * simply delivered and freed, not using the refcount mechanism at
73  * all.  Exception 2: If the message is a broadcast, then part of the
74  * broadcast that can be done via shared memory is performed prior to
75  * initiating the datagram/refcount system.
76  *
77  * DATAGRAM FORMATS AND MESSAGE FORMATS
78  *
79  * Datagrams have this format:
80  *
81  *   srcpe   (16 bits) --- source processor number.
82  *   magic   ( 8 bits) --- magic number to make sure DG is good.
83  *   dstrank ( 8 bits) --- destination processor rank.
84  *   seqno   (32 bits) --- packet sequence number.
85  *   data    (XX byte) --- user data.
86  *
87  * The only reason the srcpe is in there is because the receiver needs
88  * to know which receive window to use.  The dstrank field is needed
89  * because transmission is node-to-node.  Once the message is
90  * assembled by the node, it must be delivered to the appropriate PE.
91  * The dstrank field is used to encode certain special-case scenarios.
92  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
93  * and should be delivered to all processors in the node.  If the dstrank
94  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
95  * which case the srcpe is the number of the acknowledger, the seqno is
96  * always zero, and the user data is a list of the seqno's being
97  * acknowledged.  There may be other dstrank codes for special functions.
98  *
99  * To send a message, one chops it up into datagrams and stores those
100  * datagrams in a send-queue.  These outgoing datagrams aren't stored
101  * in the explicit format shown above.  Instead, they are stored as
102  * ImplicitDgrams, which contain the datagram header and a pointer to
103  * the user data (which is in the user message buffer, which is in the
104  * OutgoingMsg).  At transmission time these are combined together.
105
106  * The combination of the datagram header with the user's data is
107  * performed right in the user's message buffer.  Note that the
108  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
109  * of the user's message with a datagram header, sends the datagram
110  * straight from the user's message buffer, then restores the user's
111  * buffer to its original state.  There is a small problem with the
112  * first datagram of the message: one needs 64 bits of space to store
113  * the datagram header.  To make sure this space is there, we added a
114  * 64-bit unused space to the front of the Cmi message header.  In
115  * addition to this, we also add 32 bits to the Cmi message header
116  * to make room for a length-field, making it possible to identify
117  * message boundaries.
118  *
119  * CONCURRENCY CONTROL
120  *
121  * This has changed recently.
122  *
123  * EFFICIENCY NOTES
124  *
125  * The sender-side does little copying.  The async and freeing send
126  * routines do no copying at all.  The sync send routines copy the
127  * message, then use the freeing-send routines.  The other alternative
128  * is to not copy the message, and use the async send mechanism
129  * combined with a blocking wait.  Blocking wait seems like a bad
130  * idea, since it could take a VERY long time to get all those
131  * datagrams out the door.
132  *
133  * The receiver side, unfortunately, must copy.  To avoid copying,
134  * it would have to receive directly into a preallocated message buffer.
135  * Unfortunately, this can't work: there's no way to know how much
136  * memory to preallocate, and there's no way to know which datagram
137  * is coming next.  Thus, we receive into fixed-size (large) datagram
138  * buffers.  These are then inspected, and the messages extracted from
139  * them.
140  *
141  * Note that we are allocating a large number of structs: OutgoingMsg's,
142  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
143  * is a fixed-size structure.  Thus, we can do memory allocation by
144  * simply keeping a linked-list of unused structs around.  The only
145  * place where expensive memory allocation is performed is in the
146  * sync routines.
147  *
148  * Since the datagrams from one node to another are fully ordered,
149  * there is slightly more ordering than is needed: in theory, the
150  * datagrams of one message don't need to be ordered relative to the
151  * datagrams of another.  This was done to simplify the sequencing
152  * mechanisms: implementing a fully-ordered stream is much simpler
153  * than a partially-ordered one.  It also makes it possible to
154  * modularize, layering the message transmitter on top of the
155  * datagram-sequencer.  In other words, it was just easier this way.
156  * Hopefully, this won't cause serious degradation: LAN's rarely get
157  * datagrams out of order anyway.
158  *
159  * A potential efficiency problem is the lack of message-combining.
160  * One datagram could conceivably contain several messages.  This
161  * might be more efficient, it's not clear how much overhead is
162  * involved in sending a short datagram.  Message-combining isn't
163  * really ``integrated'' into the design of this software, but you
164  * could fudge it as follows.  Whenever you pull a short datagram from
165  * the send-queue, check the next one to see if it's also a short
166  * datagram.  If so, pack them together into a ``combined'' datagram.
167  * At the receive side, simply check for ``combined'' datagrams, and
168  * treat them as if they were simply two datagrams.  This would
169  * require extra copying.  I have no idea if this would be worthwhile.
170  *
171  *****************************************************************************/
172
173 /**
174  * @addtogroup NET
175  * @{
176  */
177
178 /*****************************************************************************
179  *
180  * Include Files
181  *
182  ****************************************************************************/
183
184 #include <stdarg.h> /*<- was <varargs.h>*/
185
186 #define CMK_USE_PRINTF_HACK 0
187 #if CMK_USE_PRINTF_HACK
188 /*HACK: turn printf into CmiPrintf, by just defining our own
189 external symbol "printf".  This may be more trouble than it's worth,
190 since the only advantage is that it works properly with +syncprint.
191
192 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
193 because they don't call printf.  Has to be defined up here because we probably 
194 haven't properly guessed this compiler's prototype for "printf".
195 */
196 static void InternalPrintf(const char *f, va_list l);
197 int printf(const char *fmt, ...) {
198         int nChar;
199         va_list p; va_start(p, fmt);
200         InternalPrintf(fmt,p);
201         va_end(p);
202         return 10;
203 }
204 #endif
205
206
207 #include "converse.h"
208 #include "memory-isomalloc.h"
209
210 #include <stdio.h>
211 #include <stdlib.h>
212 #include <ctype.h>
213 #include <fcntl.h>
214 #include <errno.h>
215 #include <setjmp.h>
216 #include <signal.h>
217 #include <string.h>
218
219 /* define machine debug */
220 #include "machine.h"
221
222 /******************* Producer-Consumer Queues ************************/
223 #include "pcqueue.h"
224
225 #include "machine-smp.h"
226
227 #if CMK_USE_KQUEUE
228 #include <sys/event.h>
229 int _kq = -1;
230 #endif
231
232 #if CMK_USE_POLL
233 #include <poll.h>
234 #endif
235
236 #if CMK_USE_GM
237 #include "gm.h"
238 struct gm_port *gmport = NULL;
239 int  portFinish = 0;
240 #endif
241
242 #if CMK_USE_MX
243 #include "myriexpress.h"
244 mx_endpoint_t      endpoint;
245 mx_endpoint_addr_t endpoint_addr;
246 int MX_FILTER   =  123456;
247 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
248 #endif
249
250 #if CMK_USE_AMMASSO
251   #include "clustercore/ccil_api.h"
252 #endif
253
254 #if CMK_MULTICORE
255 int Cmi_commthread = 0;
256 #endif
257
258 #include "conv-ccs.h"
259 #include "ccs-server.h"
260 #include "sockRoutines.h"
261
262 #if defined(_WIN32) && ! defined(__CYGWIN__)
263 /*For windows systems:*/
264 #  include <windows.h>
265 #  include <wincon.h>
266 #  include <sys/types.h>
267 #  include <sys/timeb.h>
268 #  define fdopen _fdopen
269 #  define SIGBUS -1  /*These signals don't exist in Win32*/
270 #  define SIGKILL -1
271 #  define SIGQUIT -1
272 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
273
274 #else /*UNIX*/
275 #  include <pwd.h>
276 #  include <unistd.h>
277 #  include <fcntl.h>
278 #  include <sys/file.h>
279 #endif
280
281 #if CMK_PERSISTENT_COMM
282 #include "persist_impl.h"
283 #endif
284
285 #define PRINTBUFSIZE 16384
286
287 #ifdef __ONESIDED_IMPL
288 #ifdef __ONESIDED_NO_HARDWARE
289 int putSrcHandler;
290 int putDestHandler;
291 int getSrcHandler;
292 int getDestHandler;
293 #include "conv-onesided.c"
294 #endif
295 #endif
296
297 static void CommunicationServer(int withDelayMs, int where);
298
299 void CmiHandleImmediate();
300 extern int CmemInsideMem();
301 extern void CmemCallWhenMemAvail();
302 static void ConverseRunPE(int everReturn);
303 void CmiYield(void);
304 void ConverseCommonExit(void);
305
306 static unsigned int dataport=0;
307 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
308 static SOCKET       dataskt;
309
310 extern void TokenUpdatePeriodic();
311 extern void getAvailSysMem();
312
313 #define BROADCAST_SPANNING_FACTOR               4
314
315 /****************************************************************************
316  *
317  * Handling Errors
318  *
319  * Errors should be handled by printing a message on stderr and
320  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
321  * communication should be made.  The other processes will notice the
322  * abnormal termination and will deal with it.
323  *
324  * Rationale: if an error triggers an attempt to send a message,
325  * the attempt to send a message is likely to trigger another error,
326  * leading to an infinite loop and a process that spins instead of
327  * shutting down.
328  *
329  *****************************************************************************/
330
331 static int machine_initiated_shutdown=0;
332 static int already_in_signal_handler=0;
333
334 static void CmiDestoryLocks();
335
336 #if CMK_USE_SYSVSHM /* define teardown function before use */
337 void tearDownSharedBuffers();
338 #endif 
339
340 static void machine_exit(int status)
341 {
342   MACHSTATE(3,"     machine_exit");
343   machine_initiated_shutdown=1;
344
345   CmiDestoryLocks();            /* destory locks to prevent dead locking */
346
347 #if CMK_USE_GM
348   if (gmport) { 
349     gm_close(gmport); gmport = 0;
350     gm_finalize();
351   }
352 #endif
353 #if CMK_USE_SYSVSHM
354         tearDownSharedBuffers();
355 #endif
356   exit(status);
357 }
358
359 static void charmrun_abort(const char*);
360
361 static void KillEveryone(const char *msg)
362 {
363   charmrun_abort(msg);
364   machine_exit(1);
365 }
366
367 static void KillEveryoneCode(n)
368 int n;
369 {
370   char _s[100];
371   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
372   charmrun_abort(_s);
373   machine_exit(1);
374 }
375
376 static void KillOnAllSigs(int sigNo)
377 {
378   const char *sig="unknown signal";
379   const char *suggestion="";
380   if (machine_initiated_shutdown ||
381       already_in_signal_handler) 
382         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
383   already_in_signal_handler=1;
384
385   if (CpvAccess(cmiArgDebugFlag)) {
386     CmiPrintf("CPD: Signal received on processor %d: %d\n",CmiMyPe(),sigNo);
387     CpdFreeze();
388   }
389   
390   CmiDestoryLocks();            /* destory locks */
391
392   if (sigNo==SIGSEGV) {
393      sig="segmentation violation";
394      suggestion="Try running with '++debug', or linking with '-memory paranoid'.\n";
395   }
396   if (sigNo==SIGFPE) {
397      sig="floating point exception";
398      suggestion="Check for integer or floating-point division by zero.\n";
399   }
400   if (sigNo==SIGBUS) {
401      sig="bus error";
402      suggestion="Check for misaligned reads or writes to memory.\n";
403   }
404   if (sigNo==SIGILL) {
405      sig="illegal instruction";
406      suggestion="Check for calls to uninitialized function pointers.\n";
407   }
408   if (sigNo==SIGKILL) sig="caught signal KILL";
409   if (sigNo==SIGQUIT) sig="caught signal QUIT";
410   if (sigNo==SIGTERM) sig="caught signal TERM";
411   MACHSTATE1(5,"     Caught signal %s ",sig);
412 /*ifdef this part*/
413 #ifdef __FAULT__
414   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
415                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
416   }else{
417 #else
418         {
419 #endif
420    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
421         "Signal: %s\n",CmiMyPe(),sig);
422         if (0!=suggestion[0])
423                 CmiError("Suggestion: %s",suggestion);
424         CmiPrintStackTrace(1);
425         charmrun_abort(sig);
426         machine_exit(1);                
427         }       
428 }
429
430 static void machine_atexit_check(void)
431 {
432   if (!machine_initiated_shutdown)
433     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
434   printf("Program finished.\n");
435 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
436   fgetc(stdin);
437 #endif
438 }
439
440 #if !defined(_WIN32) || defined(__CYGWIN__)
441 static void HandleUserSignals(int signum)
442 {
443   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
444   CcdRaiseCondition(condnum);
445 }
446 #endif
447
448 static void PerrorExit(const char *msg)
449 {
450   perror(msg);
451   machine_exit(1);
452 }
453
454 /*****************************************************************************
455  *
456  *     Utility routines for network machine interface.
457  *
458  *****************************************************************************/
459
460 /*
461 Horrific #defines to hide the differences between select() and poll().
462  */
463 #if CMK_USE_POLL /*poll() version*/
464 # define CMK_PIPE_DECL(delayMs) \
465         struct pollfd fds[10]; \
466         int nFds_sto=0; int *nFds=&nFds_sto; \
467         int pollDelayMs=delayMs;
468 # define CMK_PIPE_SUB fds,nFds
469 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
470
471 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
472 # define CMK_PIPE_ADDREAD(rd_fd) \
473         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
474 # define CMK_PIPE_ADDWRITE(wr_fd) \
475         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
476 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
477 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
478
479 #elif CMK_USE_KQUEUE /* kqueue version */
480
481 # define CMK_PIPE_DECL(delayMs) \
482         if (_kq == -1) _kq = kqueue(); \
483     struct kevent ke_sto; \
484     struct kevent* ke = &ke_sto; \
485     struct timespec tmo; \
486     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
487 # define CMK_PIPE_SUB ke
488 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
489
490 # define CMK_PIPE_PARAM struct kevent* ke
491 # define CMK_PIPE_ADDREAD(rd_fd) \
492         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
493                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
494 # define CMK_PIPE_ADDWRITE(wr_fd) \
495         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
496                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
497 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
498 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
499
500 #else /*select() version*/
501
502 # define CMK_PIPE_DECL(delayMs) \
503         fd_set rfds_sto,wfds_sto;\
504         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
505         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
506 # define CMK_PIPE_SUB rfds,wfds
507 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
508
509 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
510 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
511 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
512 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
513 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
514 #endif
515
516 static void CMK_PIPE_CHECKERR(void) {
517 #if defined(_WIN32) && !defined(__CYGWIN__)
518 /* Win32 socket seems to randomly return inexplicable errors
519 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
520         int err=WSAGetLastError();
521         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
522 o,err);
523 */
524 #else /*UNIX machine*/
525         if (errno!=EINTR)
526                 KillEveryone("Socket error in CheckSocketsReady!\n");
527 #endif
528 }
529
530
531 static void CmiStdoutFlush(void);
532 static int  CmiStdoutNeedsService(void);
533 static void CmiStdoutService(void);
534 static void CmiStdoutAdd(CMK_PIPE_PARAM);
535 static void CmiStdoutCheck(CMK_PIPE_PARAM);
536
537
538 double GetClock(void)
539 {
540 #if defined(_WIN32) && !defined(__CYGWIN__)
541   struct _timeb tv; 
542   _ftime(&tv);
543   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
544 #else
545   struct timeval tv; int ok;
546   ok = gettimeofday(&tv, NULL);
547   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
548   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
549 #endif
550 }
551
552 char *CopyMsg(char *msg, int len)
553 {
554   char *copy = (char *)CmiAlloc(len);
555   if (!copy)
556       fprintf(stderr, "Out of memory\n");
557   memcpy(copy, msg, len);
558   return copy;
559 }
560
561 /***********************************************************************
562  *
563  * Abort function:
564  *
565  ************************************************************************/
566
567 static int  Cmi_truecrash;
568 static int already_aborting=0;
569 void CmiAbort(const char *message)
570 {
571   if (already_aborting) machine_exit(1);
572   already_aborting=1;
573         {
574 /*       char str[100];
575          sprintf(str,"dead.%d",CmiMyNode());
576          FILE *fp = fopen(str,"w");
577          fprintf(fp,"%s",message);
578          fclose(fp);*/
579         }
580   MACHSTATE1(5,"CmiAbort(%s)",message);
581
582   /* if CharmDebug is attached simply try to send a message to it */
583   if (CpvAccess(cmiArgDebugFlag)) {
584     CmiPrintf("CPD: CmiAbort called on processor %d\n",CmiMyPe());
585     CpdFreeze();
586   }
587   
588   /* CmiDestoryLocks();  */
589
590   {
591 /*    char str[22];
592     snprintf(str,18,"dead.%d",CmiMyPe());
593     FILE *fp = fopen(str,"w");
594     fprintf(fp,"Abort:%s\n",message);
595     fclose(fp);*/
596   }
597
598   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
599         "Reason: %s\n",CmiMyPe(),message);
600   CmiPrintStackTrace(0);
601   
602   /*Send off any remaining prints*/
603   CmiStdoutFlush();
604   
605   if(Cmi_truecrash) {
606     printf("CHARM++ FATAL ERROR: %s\n", message);
607     *(int *)NULL = 0; /*Write to null, causing bus error*/
608   } else {
609     charmrun_abort(message);
610     machine_exit(1);
611   }
612 }
613
614
615 /******************************************************************************
616  *
617  * CmiEnableAsyncIO
618  *
619  * The net and tcp versions use a bunch of unix processes talking to each
620  * other via file descriptors.  We need for a signal SIGIO to be generated
621  * each time a message arrives, making it possible to write a signal
622  * handler to handle the messages.  The vast majority of unixes can,
623  * in fact, do this.  However, there isn't any standard for how this is
624  * supposed to be done, so each version of UNIX has a different set of
625  * calls to turn this signal on.  So, there is like one version here for
626  * every major brand of UNIX.
627  *
628  *****************************************************************************/
629
630 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
631 #include <fcntl.h>
632 void CmiEnableAsyncIO(int fd)
633 {
634   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
635     CmiError("setting socket owner: %s\n", strerror(errno)) ;
636     exit(1);
637   }
638   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
639     CmiError("setting socket async: %s\n", strerror(errno)) ;
640     exit(1);
641   }
642 }
643 #else
644 void CmiEnableAsyncIO(int fd) { }
645 #endif
646
647 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
648 #if !defined(_WIN32) || defined(__CYGWIN__)
649 void CmiEnableNonblockingIO(int fd) {
650   int on=1;
651   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
652     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
653     exit(1);
654   }
655 }
656 #else
657 void CmiEnableNonblockingIO(int fd) { }
658 #endif
659
660
661 /******************************************************************************
662 *
663 * Configuration Data
664 *
665 * This data is all read in from the NETSTART variable (provided by the
666 * charmrun) and from the command-line arguments.  Once read in, it is never
667  * modified.
668  *
669  *****************************************************************************/
670
671 int               _Cmi_mynode;    /* Which address space am I */
672 int               _Cmi_mynodesize;/* Number of processors in my address space */
673 int               _Cmi_numnodes;  /* Total number of address spaces */
674 int               _Cmi_numpes;    /* Total number of processors */
675 static int        Cmi_nodestart; /* First processor in this address space */
676 static skt_ip_t   Cmi_self_IP;
677 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
678 static int        Cmi_charmrun_port;
679 static int        Cmi_charmrun_pid;
680 static int        Cmi_charmrun_fd=-1;
681
682 static int    Cmi_netpoll;
683 static int    Cmi_asyncio;
684 static int    Cmi_idlepoll;
685 static int    Cmi_syncprint;
686 static int Cmi_print_stats = 0;
687
688 #if ! CMK_SMP && ! defined(_WIN32)
689 /* parse forks only used in non-smp mode */
690 static void parse_forks(void) {
691   char *forkstr;
692   int nread;
693   int forks;
694   int i,pid;
695   forkstr=getenv("CmiMyForks");
696   if(forkstr!=0) { /* charmrun */
697         nread = sscanf(forkstr,"%d",&forks);
698         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
699                 pid=fork();
700                 if(pid<0) CmiAbort("Fork returned an error");
701                 if(pid==0) { /* forked process */
702                         /* reset mynode,pe & exit loop */
703                         _Cmi_mynode+=i;
704                         _Cmi_mype+=i;
705                         break;
706                 }
707         }
708   }
709 }
710 #endif
711
712 static void parse_netstart(void)
713 {
714   char *ns;
715   int nread;
716   int port;
717   ns = getenv("NETSTART");
718   if (ns!=0) 
719   {/*Read values set by Charmrun*/
720         char Cmi_charmrun_name[1024];
721         nread = sscanf(ns, "%d%s%d%d%d",
722                  &_Cmi_mynode,
723                  Cmi_charmrun_name, &Cmi_charmrun_port,
724                  &Cmi_charmrun_pid, &port);
725         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
726
727         if (nread!=5) {
728                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
729                 exit(1);
730         }
731   } else 
732   {/*No charmrun-- set flag values for standalone operation*/
733         _Cmi_mynode=0;
734         Cmi_charmrun_IP=_skt_invalid_ip;
735         Cmi_charmrun_port=0;
736         Cmi_charmrun_pid=0;
737         dataport = -1;
738   }
739 #if CMK_USE_IBVERBS | CMK_USE_IBUD
740         char *cmi_num_nodes = getenv("CmiNumNodes");
741         if(cmi_num_nodes != NULL){
742                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
743         }
744 #endif  
745 }
746
747 static void extract_common_args(char **argv)
748 {
749   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
750     Cmi_print_stats = 1;
751 }
752
753 /* for SMP */
754 #include "machine-smp.c"
755
756 CsvDeclare(CmiNodeState, NodeState);
757
758 /* Immediate message support */
759 #define CMI_DEST_RANK(msg)      *(int *)(msg)
760 #include "immediate.c"
761
762 /******************************************************************************
763  *
764  * Packet Performance Logging
765  *
766  * This module is designed to give a detailed log of the packets and their
767  * acknowledgements, for performance tuning.  It can be disabled.
768  *
769  *****************************************************************************/
770
771 #define LOGGING 0
772
773 #if LOGGING
774
775 typedef struct logent {
776   double time;
777   int seqno;
778   int srcpe;
779   int dstpe;
780   int kind;
781 } *logent;
782
783
784 logent log;
785 int    log_pos;
786 int    log_wrap;
787
788 static void log_init(void)
789 {
790   log = (logent)malloc(50000 * sizeof(struct logent));
791   _MEMCHECK(log);
792   log_pos = 0;
793   log_wrap = 0;
794 }
795
796 static void log_done(void)
797 {
798   char logname[100]; FILE *f; int i, size;
799   sprintf(logname, "log.%d", _Cmi_mynode);
800   f = fopen(logname, "w");
801   if (f==0) KillEveryone("fopen problem");
802   if (log_wrap) size = 50000; else size=log_pos;
803   for (i=0; i<size; i++) {
804     logent ent = log+i;
805     fprintf(f, "%1.4f %d %c %d %d\n",
806             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
807   }
808   fclose(f);
809 }
810
811 void printLog(void)
812 {
813   char logname[100]; FILE *f; int i, j, size;
814   static int logged = 0;
815   if (logged)
816       return;
817   logged = 1;
818   CmiPrintf("Logging: %d\n", _Cmi_mynode);
819   sprintf(logname, "log.%d", _Cmi_mynode);
820   f = fopen(logname, "w");
821   if (f==0) KillEveryone("fopen problem");
822   for (i = 5000; i; i--)
823   {
824   /*for (i=0; i<size; i++) */
825     j = log_pos - i;
826     if (j < 0)
827     {
828         if (log_wrap)
829             j = 5000 + j;
830         else
831             j = 0;
832     };
833     {
834     logent ent = log+j;
835     fprintf(f, "%1.4f %d %c %d %d\n",
836             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
837     }
838   }
839   fclose(f);
840   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
841 }
842
843 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
844
845 #endif
846
847 #if !LOGGING
848
849 #define log_init() /*empty*/
850 #define log_done() /*empty*/
851 #define printLog() /*empty*/
852 #define LOG(t,s,k,d,q) /*empty*/
853
854 #endif
855
856 /******************************************************************************
857  *
858  * Node state
859  *
860  *****************************************************************************/
861
862
863 static CmiNodeLock    Cmi_scanf_mutex;
864 static double         Cmi_clock;
865 static double         Cmi_check_delay = 3.0;
866
867 /******************************************************************************
868  *
869  * OS Threads
870  * SMP implementation moved to machine-smp.c
871  *****************************************************************************/
872
873 /************************ No kernel SMP threads ***************/
874 #if CMK_SHARED_VARS_UNAVAILABLE
875
876 static volatile int memflag=0;
877 void CmiMemLock() { memflag++; }
878 void CmiMemUnlock() { memflag--; }
879
880 static volatile int comm_flag=0;
881 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
882 #ifndef MACHLOCK_DEBUG
883 #  define CmiCommLock() (comm_flag=1)
884 #  define CmiCommUnlock() (comm_flag=0)
885 #else /* Error-checking flag locks */
886 void CmiCommLock(void) {
887   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
888   comm_flag=1;
889 }
890 void CmiCommUnlock(void) {
891   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
892   comm_flag=0;
893 }
894 #endif
895
896 static struct CmiStateStruct Cmi_state;
897 int _Cmi_mype;
898 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
899 #define CmiGetState() (&Cmi_state)
900 #define CmiGetStateN(n) (&Cmi_state)
901
902 void CmiYield(void) { sleep(0); }
903
904 static void CommunicationInterrupt(int ignored)
905 {
906   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
907   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
908   { /* Already busy inside malloc, comm, or immediate messages */
909     MACHSTATE(5,"--SKIPPING SIGIO--");
910     return;
911   }
912   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
913   {
914     /*Make sure any malloc's we do in here are NOT migratable:*/
915     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
916 /*    _Cmi_myrank=1; */
917     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
918 /*    _Cmi_myrank=0; */
919     CmiIsomallocBlockListActivate(oldList);
920   }
921   MACHSTATE(2,"--END SIGIO--")
922 }
923
924 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
925
926 static void CmiStartThreads(char **argv)
927 {
928   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
929   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
930   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
931     KillEveryone
932       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
933   
934   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
935   _Cmi_mype = Cmi_nodestart;
936
937   /* Prepare Cpv's for immediate messages: */
938   _Cmi_myrank=1;
939   CommunicationServerInit();
940   _Cmi_myrank=0;
941   
942 #if !CMK_ASYNC_NOT_NEEDED
943   if (Cmi_asyncio)
944   {
945     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
946     if (!Cmi_netpoll) {
947       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
948       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
949     }
950 #if CMK_USE_GM || CMK_USE_MX
951       /* charmrun is serviced in interrupt for gm */
952     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
953 #endif
954   }
955 #endif
956 }
957
958 static void CmiDestoryLocks()
959 {
960   comm_flag = 0;
961   memflag = 0;
962 }
963
964 #endif
965
966 /*Network progress utility variables. Period controls the rate at
967   which the network poll is called */
968
969 CpvDeclare(unsigned , networkProgressCount);
970 int networkProgressPeriod;
971
972 CpvDeclare(void *, CmiLocalQueue);
973
974
975 #ifndef CmiMyPe
976 int CmiMyPe(void) 
977
978   return CmiGetState()->pe; 
979 }
980 #endif
981 #ifndef CmiMyRank
982 int CmiMyRank(void)
983 {
984   return CmiGetState()->rank;
985 }
986 #endif
987
988 CpvExtern(int,_charmEpoch);
989
990 /*Add a message to this processor's receive queue 
991   Must be called while holding comm. lock
992 */
993
994 extern double evacTime;
995
996 static void CmiPushPE(int pe,void *msg)
997 {
998   CmiState cs=CmiGetStateN(pe);
999         /*
1000                 FAULT_EVAC
1001         
1002         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
1003                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
1004         }*/
1005   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
1006   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
1007
1008 #if CMK_IMMEDIATE_MSG
1009   if (CmiIsImmediate(msg)) {
1010     CmiPushImmediateMsg(msg);
1011     return;
1012   }
1013 #endif
1014   PCQueuePush(cs->recv,msg);
1015 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1016   if (_Cmi_noprocforcommthread) 
1017 #endif
1018   CmiIdleLock_addMessage(&cs->idle);
1019 }
1020
1021 #if CMK_NODE_QUEUE_AVAILABLE
1022 /*Add a message to the node queue.  
1023   Must be called while holding comm. lock
1024 */
1025 static void CmiPushNode(void *msg)
1026 {
1027   CmiState cs=CmiGetStateN(0);
1028   
1029   MACHSTATE(2,"Pushing message into node queue");
1030   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
1031   
1032 #if CMK_IMMEDIATE_MSG
1033   if (CmiIsImmediate(msg)) {
1034     MACHSTATE(2,"Pushing Immediate message into queue");
1035     CmiPushImmediateMsg(msg);
1036     return;
1037   }
1038 #endif 
1039   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
1040   /*Silly: always try to wake up processor 0, so at least *somebody*
1041     will be awake to handle the message*/
1042 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1043   if (_Cmi_noprocforcommthread) 
1044 #endif
1045   CmiIdleLock_addMessage(&cs->idle);
1046 }
1047 #endif
1048
1049 /***************************************************************
1050  Communication with charmrun:
1051  We can send (ctrl_sendone) and receive (ctrl_getone)
1052  messages on a TCP socket connected to charmrun.
1053  This is used for printfs, CCS, etc; and also for
1054  killing ourselves if charmrun dies.
1055 */
1056
1057 /*This flag prevents simultanious outgoing
1058 messages on the charmrun socket.  It is protected
1059 by the commlock.*/
1060 static int Cmi_charmrun_fd_sendflag=0;
1061
1062 /* ctrl_sendone */
1063 static int sendone_abort_fn(int code,const char *msg) {
1064         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
1065         machine_exit(1);
1066         return -1;
1067 }
1068
1069 static void ctrl_sendone_nolock(const char *type,
1070                                 const char *data1,int dataLen1,
1071                                 const char *data2,int dataLen2)
1072 {
1073   const void *bufs[3]; int lens[3]; int nBuffers=0;
1074   ChMessageHeader hdr;
1075   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
1076   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
1077   if (Cmi_charmrun_fd==-1) 
1078         charmrun_abort("ctrl_sendone called in standalone!\n");
1079   Cmi_charmrun_fd_sendflag=1;
1080   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
1081   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
1082   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
1083   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
1084   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
1085   Cmi_charmrun_fd_sendflag=0;
1086   skt_set_abort(oldAbort);
1087   MACHSTATE(2,"} ctrl_sendone_nolock");
1088 }
1089
1090 static void ctrl_sendone_locking(const char *type,
1091                                 const char *data1,int dataLen1,
1092                                 const char *data2,int dataLen2)
1093 {
1094   CmiCommLock();
1095   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
1096   CmiCommUnlock();
1097 }
1098
1099 #ifndef MEMORYUSAGE_OUTPUT
1100 #define MEMORYUSAGE_OUTPUT 0 
1101 #endif
1102 #if MEMORYUSAGE_OUTPUT 
1103 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
1104 static int memoryusage_counter;
1105 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
1106 #define memoryusage_output {\
1107   memoryusage_counter++;\
1108   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1109 #endif
1110
1111 static double Cmi_check_last;
1112
1113 /* if charmrun dies, we finish */
1114 static void pingCharmrun(void *ignored) 
1115 {
1116 #if MEMORYUSAGE_OUTPUT
1117   memoryusage_output;
1118   if(memoryusage_isOutput){
1119     memoryusage_counter = 0;
1120 #else
1121   {
1122 #endif 
1123
1124   double clock=GetClock();
1125   if (clock > Cmi_check_last + Cmi_check_delay) {
1126     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1127     Cmi_check_last = clock; 
1128 #if CMK_USE_GM || CMK_USE_MX
1129     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1130 #endif
1131     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1132     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1133     CmiCommLock();
1134     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1135     CmiCommUnlock();
1136   }
1137 #if 1
1138 #if CMK_USE_GM || CMK_USE_MX
1139   if (!Cmi_netpoll)
1140 #endif
1141   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1142 #endif
1143   }
1144 }
1145
1146 /* periodic charm ping, for gm and netpoll */
1147 static void pingCharmrunPeriodic(void *ignored)
1148 {
1149   pingCharmrun(ignored);
1150   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1151 }
1152
1153 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1154 static void charmrun_abort(const char *s)
1155 {
1156   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1157         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1158 CmiPrintStackTrace(0);
1159         abort();
1160   } else {
1161         char msgBuf[80];
1162         skt_set_abort(ignore_further_errors);
1163         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1164         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1165   }
1166 }
1167
1168 /* ctrl_getone */
1169
1170 #ifdef __FAULT__
1171 #include "machine-recover.c"
1172 #endif
1173
1174 static void node_addresses_store(ChMessage *msg);
1175
1176 static void ctrl_getone(void)
1177 {
1178   ChMessage msg;
1179   MACHSTATE(2,"ctrl_getone")
1180   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
1181   ChMessage_recv(Cmi_charmrun_fd,&msg);
1182   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1183
1184   if (strcmp(msg.header.type,"die")==0) {
1185     MACHSTATE(2,"ctrl_getone bye bye")
1186     fprintf(stderr,"aborting: %s\n",msg.data);
1187     log_done();
1188     ConverseCommonExit();
1189     machine_exit(0);
1190   } 
1191 #if CMK_CCS_AVAILABLE
1192   else if (strcmp(msg.header.type, "req_fw")==0) {
1193     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1194         /*Sadly, I *can't* do a:
1195       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1196         here, because I can't send converse messages in the
1197         communication thread.  I *can* poke this message into 
1198         any convenient processor's queue, though:  (OSL, 9/14/2000)
1199         */
1200     int pe=0;/*<- node-local processor number. Any one will do.*/
1201     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1202     MACHSTATE(2,"Incoming CCS request");
1203     CmiPushPE(pe,cmsg);
1204   }
1205 #endif
1206 #ifdef __FAULT__        
1207   else if(strcmp(msg.header.type,"crashnode")==0) {
1208         crash_node_handle(&msg);
1209   }
1210   else if(strcmp(msg.header.type,"initnodetab")==0) {
1211         /** A processor crashed and got recreated. So charmrun sent 
1212           across the whole nodetable data to update this processor*/
1213         node_addresses_store(&msg);
1214         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1215   }
1216 #endif
1217   else {
1218   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1219    * which is not available to the communication thread on an SMP version.
1220    */
1221     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1222     machine_exit(1);
1223   }
1224   
1225   MACHSTATE(2,"ctrl_getone done")
1226   ChMessage_free(&msg);
1227 }
1228
1229 #if CMK_CCS_AVAILABLE
1230 /*Deliver this reply data to this reply socket.
1231   The data is forwarded to CCS server via charmrun.*/
1232 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1233 {
1234   MACHSTATE(2,"Outgoing CCS reply");
1235   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1236                        repData,repLen);
1237   MACHSTATE(1,"Outgoing CCS reply away");
1238 }
1239 #endif
1240
1241 /*****************************************************************************
1242  *
1243  * CmiPrintf, CmiError, CmiScanf
1244  *
1245  *****************************************************************************/
1246 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1247 static void InternalPrintf(const char *f, va_list l)
1248 {
1249   ChMessage replymsg;
1250   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1251   CmiStdoutFlush();
1252   vsprintf(buffer, f, l);
1253   if(Cmi_syncprint) {
1254           CmiCommLock();
1255           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1256           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1257           ChMessage_free(&replymsg);
1258           CmiCommUnlock();
1259   } else {
1260           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1261   }
1262   InternalWriteToTerminal(0,buffer,strlen(buffer));
1263   CmiTmpFree(buffer);
1264 }
1265
1266 static void InternalError(const char *f, va_list l)
1267 {
1268   ChMessage replymsg;
1269   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1270   CmiStdoutFlush();
1271   vsprintf(buffer, f, l);
1272   if(Cmi_syncprint) {
1273           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1274           CmiCommLock();
1275           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1276           ChMessage_free(&replymsg);
1277           CmiCommUnlock();
1278   } else {
1279           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1280   }
1281   InternalWriteToTerminal(1,buffer,strlen(buffer));
1282   CmiTmpFree(buffer);
1283 }
1284
1285 static int InternalScanf(char *fmt, va_list l)
1286 {
1287   ChMessage replymsg;
1288   char *ptr[20];
1289   char *p; int nargs, i;
1290   nargs=0;
1291   p=fmt;
1292   while (*p) {
1293     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1294     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1295     if (p[0]=='%') { nargs++; p++; continue; }
1296     if (*p=='\n') *p=' '; p++;
1297   }
1298   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1299   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1300   CmiLock(Cmi_scanf_mutex);
1301   if (Cmi_charmrun_fd!=-1)
1302   {/*Send charmrun the format string*/
1303         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1304         /*Wait for the reply (characters to scan) from charmrun*/
1305         CmiCommLock();
1306         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1307         i = sscanf((char*)replymsg.data, fmt,
1308                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1309                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1310                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1311         ChMessage_free(&replymsg);
1312         CmiCommUnlock();
1313   } else
1314   {/*Just do the scanf normally*/
1315         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1316                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1317                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1318   }
1319   CmiUnlock(Cmi_scanf_mutex);
1320   return i;
1321 }
1322
1323 #if CMK_CMIPRINTF_IS_A_BUILTIN
1324
1325 /*New stdarg.h declarations*/
1326 void CmiPrintf(const char *fmt, ...)
1327 {
1328   va_list p; va_start(p, fmt);
1329   if (Cmi_charmrun_fd!=-1)
1330     InternalPrintf(fmt, p);
1331   else
1332     vfprintf(stdout,fmt,p);
1333   va_end(p);
1334 }
1335
1336 void CmiError(const char *fmt, ...)
1337 {
1338   va_list p; va_start (p, fmt);
1339   if (Cmi_charmrun_fd!=-1)
1340     InternalError(fmt, p);
1341   else
1342     vfprintf(stderr,fmt,p);
1343   va_end(p);
1344 }
1345
1346 int CmiScanf(const char *fmt, ...)
1347 {
1348   va_list p; int i; va_start(p, fmt);
1349   i = InternalScanf((char *)fmt, p);
1350   va_end(p);
1351   return i;
1352 }
1353
1354 #endif
1355
1356 /***************************************************************************
1357  * Output redirection:
1358  *  When people don't use CkPrintf, like above, we'd still like to be able
1359  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1360  * which lets us read the characters sent to stdout at our lesiure.
1361  ***************************************************************************/
1362
1363 /*Can read from stdout or stderr using these fd's*/
1364 static int readStdout[2]; 
1365 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1366 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1367 #define readStdoutBufLen (16*1024)
1368 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1369 static int servicingStdout;
1370
1371 /*Initialization-- should only be called once per node*/
1372 static void CmiStdoutInit(void) {
1373         int i;
1374         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1375
1376 /*There's some way to do this same thing in windows, but I don't know how*/
1377 #if !defined(_WIN32) || defined(__CYGWIN__)
1378         /*Prevent buffering in stdio library:*/
1379         setbuf(stdout,NULL); setbuf(stderr,NULL);
1380
1381         /*Reopen stdout and stderr fd's as new pipes:*/
1382         for (i=0;i<2;i++) {
1383                 int pair[2];
1384                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1385                 
1386                 /*First, save a copy of the original stdout*/
1387                 writeStdout[i]=dup(srcFd);
1388 #if 0
1389                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1390                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1391 #else
1392                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1393                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1394                         {perror("building stdio redirection socketpair"); exit(1);}
1395 #endif
1396                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1397                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1398                 
1399 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1400                 CmiEnableNonblockingIO(srcFd);
1401 #endif
1402 #if CMK_SHARED_VARS_UNAVAILABLE
1403                 if (Cmi_asyncio)
1404                 {
1405   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1406                         CmiEnableAsyncIO(readStdout[i]);
1407                 }
1408 #endif
1409         }
1410 #else
1411 /*Windows system-- just fake reads for now*/
1412 # ifndef read
1413 #  define read(x,y,z) 0
1414 # endif
1415 # ifndef write
1416 #  define write(x,y,z) 
1417 # endif
1418 #endif
1419 }
1420
1421 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1422 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1423 {
1424         write(writeStdout[isStdErr],str,len);   
1425 }
1426
1427 /*
1428   Service this particular stdout pipe.  
1429   Must hold comm. lock.
1430 */
1431 static void CmiStdoutServiceOne(int i) {
1432         int nBytes;
1433         const static char *cmdName[2]={"print","printerr"};
1434         servicingStdout=1;
1435         while(1) {
1436                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1437                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1438                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1439                 if (nBytes<=0) break; /*Nothing to send*/
1440                 
1441                 /*Send these bytes off to charmrun*/
1442                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1443                 nBytes++; /*Include zero-terminator in message to charmrun*/
1444                 
1445                 if (nBytes>=readStdoutBufLen-100) 
1446                 { /*We must have filled up our output pipe-- most output libraries
1447                    don't handle this well (e.g., glibc printf just drops the line).*/
1448                         
1449                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1450                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1451                         nBytes--; /*Remove terminator from user's data*/
1452                         tooMuchLen=strlen(tooMuchWarn)+1;
1453                 }
1454                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1455                                     tooMuchWarn,tooMuchLen);
1456                 
1457                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1458         }
1459         servicingStdout=0;
1460         serviceStdout[i]=0; /*This pipe is now serviced*/
1461 }
1462
1463 /*Service all stdout pipes, whether it looks like they need it
1464   or not.  Used when you aren't sure if select() has been called recently.
1465   Must hold comm. lock.
1466 */
1467 static void CmiStdoutServiceAll(void) {
1468         int i;
1469         for (i=0;i<2;i++) {
1470                 if (readStdout[i]==0) continue; /*Pipe not open*/
1471                 CmiStdoutServiceOne(i);
1472         }
1473 }
1474
1475 /*Service any outstanding stdout pipes.
1476   Must hold comm. lock.
1477 */
1478 static void CmiStdoutService(void) {
1479         CmiStdoutServiceAll();
1480 }
1481
1482 /*Add our pipes to the pile for select() or poll().
1483   Both can be called with or without the comm. lock.
1484 */
1485 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1486         int i;
1487         for (i=0;i<2;i++) {
1488                 if (readStdout[i]==0) continue; /*Pipe not open*/
1489                 CMK_PIPE_ADDREAD(readStdout[i]);
1490         }
1491 }
1492 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1493         int i;
1494         for (i=0;i<2;i++) {
1495                 if (readStdout[i]==0) continue; /*Pipe not open*/
1496                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1497         }
1498 }
1499 static int CmiStdoutNeedsService(void) {
1500         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1501 }
1502
1503 /*Called every few milliseconds to flush the stdout pipes*/
1504 static void CmiStdoutFlush(void) {
1505         if (servicingStdout) return; /* might be called by SIGALRM */
1506         CmiCommLockOrElse( return; )
1507         CmiCommLock();
1508         CmiStdoutServiceAll();
1509         CmiCommUnlock();
1510 }
1511
1512 /***************************************************************************
1513  * Message Delivery:
1514  *
1515  ***************************************************************************/
1516
1517 #include "machine-dgram.c"
1518
1519
1520 #ifndef CmiNodeFirst
1521 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1522 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1523 #endif
1524
1525 #ifndef CmiNodeOf
1526 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1527 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1528 #endif
1529
1530
1531 /*****************************************************************************
1532  *
1533  * node_addresses
1534  *
1535  *  These two functions fill the node-table.
1536  *
1537  *
1538  *   This node, like all others, first sends its own address to charmrun
1539  *   using this command:
1540  *
1541  *     Type: nodeinfo
1542  *     Data: Big-endian 4-byte ints
1543  *           <my-node #><Dataport>
1544  *
1545  *   When charmrun has all the addresses, he sends this table to me:
1546  *
1547  *     Type: nodes
1548  *     Data: Big-endian 4-byte ints
1549  *           <number of nodes n>
1550  *           <#PEs><IP><Dataport> Node 0
1551  *           <#PEs><IP><Dataport> Node 1
1552  *           ...
1553  *           <#PEs><IP><Dataport> Node n-1
1554  *
1555  *****************************************************************************/
1556
1557 #if CMK_USE_IBVERBS
1558 void copyInfiAddr(ChInfiAddr *qpList);
1559 #endif
1560
1561 #if CMK_IBVERBS_FAST_START
1562 static void send_partial_init()
1563 {
1564   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1565         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1566 }       
1567 #endif
1568
1569
1570 /*Note: node_addresses_obtain is called before starting
1571   threads, so no locks are needed (or valid!)*/
1572 static void node_addresses_obtain(char **argv)
1573 {
1574   ChMessage nodetabmsg; /* info about all nodes*/
1575   MACHSTATE(3,"node_addresses_obtain { ");
1576   if (Cmi_charmrun_fd==-1) 
1577   {/*Standalone-- fake a single-node nodetab message*/
1578         int npes=1;
1579         ChSingleNodeinfo *fakeTab;
1580         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1581         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1582         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1583 #if CMK_SHARED_VARS_UNAVAILABLE
1584         if (npes!=1) {
1585                 fprintf(stderr,
1586                         "To use multiple processors, you must run this program as:\n"
1587                         " > charmrun +p%d %s <args>\n"
1588                         "or build the %s-smp version of Charm++.\n",
1589                         npes,argv[0],CMK_MACHINE_NAME);
1590                 exit(1);
1591         }
1592 #else
1593         /* standalone smp version reads ppn */
1594         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1595                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1596           npes = _Cmi_mynodesize;
1597 #endif
1598         /*This is a stupid hack: we expect the *number* of nodes
1599         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1600         (which happens to have exactly that layout!) and stuff
1601         a 1 into the "node number" slot
1602         */
1603         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1604         fakeTab->info.nPE=ChMessageInt_new(npes);
1605         fakeTab->info.dataport=ChMessageInt_new(0);
1606         fakeTab->info.IP=_skt_invalid_ip;
1607   }
1608   else 
1609   { /*Contact charmrun for machine info.*/
1610         ChSingleNodeinfo me;
1611
1612         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1613
1614 #if CMK_USE_IBVERBS
1615         {
1616                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1617                 me.info.qpList = malloc(qpListSize);
1618                 copyInfiAddr(me.info.qpList);
1619                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1620                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1621                 free(me.info.qpList);
1622         }
1623 #else
1624         /*The nPE and IP fields are set by charmrun--
1625           these values don't matter. */
1626         me.info.nPE=ChMessageInt_new(0);
1627         me.info.IP=_skt_invalid_ip;
1628         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1629 #ifdef CMK_USE_MX
1630         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1631 #endif
1632 #if CMK_USE_IBUD
1633         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1634         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1635         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1636         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1637 #endif
1638         me.info.dataport=ChMessageInt_new(dataport);
1639
1640         /*Send our node info. to charmrun.
1641         CommLock hasn't been initialized yet-- 
1642         use non-locking version*/
1643         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1644         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1645 #endif  //CMK_USE_IBVERBS
1646
1647         MACHSTATE(3,"initnode sent");
1648   
1649         /*We get the other node addresses from a message sent
1650           back via the charmrun control port.*/
1651         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1652                 CmiAbort("Timeout waiting for nodetab!\n");
1653         }
1654         MACHSTATE(2,"recv initnode {");
1655         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1656         MACHSTATE(2,"} recv initnode");
1657   }
1658 //#if CMK_USE_IBVERBS   
1659 //#else
1660   node_addresses_store(&nodetabmsg);
1661   ChMessage_free(&nodetabmsg);
1662 //#endif        
1663   MACHSTATE(3,"} node_addresses_obtain ");
1664 }
1665
1666 #if CMK_NODE_QUEUE_AVAILABLE
1667
1668 /***********************************************************************
1669  * DeliverOutgoingNodeMessage()
1670  *
1671  * This function takes care of delivery of outgoing node messages from the
1672  * sender end. Broadcast messages are divided into sets of messages that 
1673  * are bound to the local node, and to remote nodes. For local
1674  * transmission, the messages are directly pushed into the recv
1675  * queues. For non-local transmission, the function DeliverViaNetwork()
1676  * is called
1677  ***********************************************************************/
1678 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1679 {
1680   int i, rank, dst; OtherNode node;
1681
1682   dst = ogm->dst;
1683   switch (dst) {
1684   case NODE_BROADCAST_ALL:
1685     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1686     /*case-fallthrough (no break)-- deliver to all other processors*/
1687   case NODE_BROADCAST_OTHERS:
1688 #if CMK_BROADCAST_SPANNING_TREE
1689     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1690 #elif CMK_BROADCAST_HYPERCUBE
1691     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1692 #else
1693     for (i=0; i<_Cmi_numnodes; i++)
1694       if (i!=_Cmi_mynode)
1695         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
1696 #endif
1697     GarbageCollectMsg(ogm);
1698     break;
1699   default:
1700     node = nodes+dst;
1701     rank=DGRAM_NODEMESSAGE;
1702     if (dst != _Cmi_mynode) {
1703       DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1704       GarbageCollectMsg(ogm);
1705     } else {
1706       if (ogm->freemode == 'A') {
1707         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1708         ogm->freemode = 'X';
1709       } else {
1710         CmiPushNode(ogm->data);
1711         FreeOutgoingMsg(ogm);
1712       }
1713     }
1714   }
1715 }
1716
1717 #else
1718
1719 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1720
1721 #endif
1722
1723 #if CMK_C_INLINE
1724 inline static
1725 #endif
1726 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
1727 #if CMK_USE_SYSVSHM
1728         {
1729 #if SYSVSHM_STATS
1730         double _startValidTime = CmiWallTimer();
1731 #endif
1732         int ret=CmiValidSysvshm(ogm,node);
1733 #if SYSVSHM_STATS
1734         sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1735 #endif                  
1736         MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
1737         if(ret){
1738                 CmiSendMessageSysvshm(ogm,node,rank,broot);
1739         }else{
1740                 DeliverViaNetwork(ogm, node, rank, broot,copy);
1741         }
1742         } 
1743 #elif CMK_USE_PXSHM
1744      {
1745 #if PXSHM_STATS
1746         double _startValidTime = CmiWallTimer();
1747 #endif
1748       int ret=CmiValidPxshm(ogm,node);
1749 #if PXSHM_STATS
1750         pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1751 #endif                  
1752       MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
1753       if(ret){
1754          CmiSendMessagePxshm(ogm,node,rank,broot);
1755        }else{
1756          DeliverViaNetwork(ogm, node, rank, broot,copy);
1757        }
1758       } 
1759 #else
1760       DeliverViaNetwork(ogm, node, rank, broot, copy);
1761 #endif                  
1762         
1763 }
1764
1765
1766
1767 /***********************************************************************
1768  * DeliverOutgoingMessage()
1769  *
1770  * This function takes care of delivery of outgoing messages from the
1771  * sender end. Broadcast messages are divided into sets of messages that 
1772  * are bound to the local node, and to remote nodes. For local
1773  * transmission, the messages are directly pushed into the recv
1774  * queues. For non-local transmission, the function DeliverViaNetwork()
1775  * is called
1776  ***********************************************************************/
1777 int DeliverOutgoingMessage(OutgoingMsg ogm)
1778 {
1779   int i, rank, dst; OtherNode node;
1780   int network = 1;
1781   
1782   dst = ogm->dst;
1783
1784   switch (dst) {
1785   case PE_BROADCAST_ALL:
1786     CmiCommLock();
1787     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1788       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1789     }
1790 #if CMK_BROADCAST_SPANNING_TREE
1791     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1792 #elif CMK_BROADCAST_HYPERCUBE
1793     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1794 #else
1795     for (i=0; i<_Cmi_numnodes; i++)
1796       if (i!=_Cmi_mynode){
1797         /*FAULT_EVAC : is the target processor valid*/
1798         if(CmiNodeAlive(i)){
1799           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1800         }
1801       } 
1802 #endif
1803     GarbageCollectMsg(ogm);
1804     CmiCommUnlock();
1805     break;
1806   case PE_BROADCAST_OTHERS:
1807     CmiCommLock();
1808     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1809       if (rank + Cmi_nodestart != ogm->src) {
1810         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1811       }
1812 #if CMK_BROADCAST_SPANNING_TREE
1813     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1814 #elif CMK_BROADCAST_HYPERCUBE
1815     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1816 #else
1817     for (i = 0; i<_Cmi_numnodes; i++)
1818       if (i!=_Cmi_mynode){
1819         /*FAULT_EVAC : is the target processor valid*/
1820         if(CmiNodeAlive(i)){
1821           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1822         }
1823       } 
1824 #endif
1825     GarbageCollectMsg(ogm);
1826     CmiCommUnlock();
1827     break;
1828   default:
1829 #ifndef CMK_OPTIMIZE
1830     if (dst<0 || dst>=CmiNumPes())
1831       CmiAbort("Send to out-of-bounds processor!");
1832 #endif
1833     node = nodes_by_pe[dst];
1834     rank = dst - node->nodestart;
1835     if (node->nodestart != Cmi_nodestart) {
1836         CmiCommLock();
1837         DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1838         GarbageCollectMsg(ogm);
1839         CmiCommUnlock();
1840     } else {
1841       network = 0;
1842       if (ogm->freemode == 'A') {
1843         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1844         ogm->freemode = 'X';
1845       } else {
1846         CmiPushPE(rank, ogm->data);
1847         FreeOutgoingMsg(ogm);
1848       }
1849     }
1850   }
1851 #if CMK_MULTICORE
1852   network = 0;
1853 #endif
1854   return network;
1855 }
1856
1857
1858 /******************************************************************************
1859  *
1860  * CmiGetNonLocal
1861  *
1862  * The design of this system is that the communication thread does all the
1863  * work, to eliminate as many locking issues as possible.  This is the only
1864  * part of the code that happens in the receiver-thread.
1865  *
1866  * This operation is fairly cheap, it might be worthwhile to inline
1867  * the code into CmiDeliverMsgs to reduce function call overhead.
1868  *
1869  *****************************************************************************/
1870
1871 #if CMK_NODE_QUEUE_AVAILABLE
1872 char *CmiGetNonLocalNodeQ(void)
1873 {
1874   char *result = 0;
1875   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1876     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1877     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1878     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1879   }
1880   return result;
1881 }
1882 #endif
1883
1884 void *CmiGetNonLocal(void)
1885 {
1886   CmiState cs = CmiGetState();
1887   CmiIdleLock_checkMessage(&cs->idle);
1888   return (void *) PCQueuePop(cs->recv);
1889 }
1890
1891
1892 /**
1893  * Set up an OutgoingMsg structure for this message.
1894  */
1895 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1896   OutgoingMsg ogm;
1897   MallocOutgoingMsg(ogm);
1898   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1899   ogm->size = size;
1900   ogm->data = data;
1901   ogm->src = cs->pe;
1902   ogm->dst = pe;
1903   ogm->freemode = freemode;
1904   ogm->refcount = 0;
1905   return (CmiCommHandle)ogm;    
1906 }
1907
1908 #if CMK_NODE_QUEUE_AVAILABLE
1909
1910 /******************************************************************************
1911  *
1912  * CmiGeneralNodeSend
1913  *
1914  * Description: This is a generic message sending routine. All the
1915  * converse message send functions are implemented in terms of this
1916  * function. (By setting appropriate flags (eg freemode) that tell
1917  * CmiGeneralSend() how exactly to handle the particular case of
1918  * message send)
1919  *
1920  *****************************************************************************/
1921
1922 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
1923 {
1924   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1925   MACHSTATE(1,"CmiGeneralNodeSend {");
1926
1927   if (freemode == 'S') {
1928     char *copy = (char *)CmiAlloc(size);
1929     if (!copy)
1930       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
1931     memcpy(copy, data, size);
1932     data = copy; freemode = 'F';
1933   }
1934
1935 #if CMK_IMMEDIATE_MSG
1936     /* execute the immediate message right away */
1937   if (node == CmiMyNode() && CmiIsImmediate(data)) {
1938     CmiPushImmediateMsg(data);
1939       /* only communication thread executes immediate messages in SMP */
1940     if (!_immRunning) CmiHandleImmediate();
1941     return;
1942   }
1943 #endif
1944
1945   CmiMsgHeaderSetLength(data, size);
1946   ogm=PrepareOutgoing(cs,node,size,freemode,data);
1947   CmiCommLock();
1948   DeliverOutgoingNodeMessage(ogm);
1949   CmiCommUnlock();
1950   /* Check if any packets have arrived recently (preserves kernel network buffers). */
1951   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
1952   MACHSTATE(1,"} CmiGeneralNodeSend");
1953   return (CmiCommHandle)ogm;
1954 }
1955
1956 #endif
1957
1958
1959 /******************************************************************************
1960  *
1961  * CmiGeneralSend
1962  *
1963  * Description: This is a generic message sending routine. All the
1964  * converse message send functions are implemented in terms of this
1965  * function. (By setting appropriate flags (eg freemode) that tell
1966  * CmiGeneralSend() how exactly to handle the particular case of
1967  * message send)
1968  *
1969  *****************************************************************************/
1970
1971 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
1972 {
1973   int sendonnetwork;
1974   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1975   MACHSTATE(1,"CmiGeneralSend {");
1976
1977   if (freemode == 'S') {
1978 #if CMK_USE_GM
1979     if (pe != cs->pe) {
1980       freemode = 'G';
1981     }
1982     else
1983 #endif
1984     {
1985     char *copy = (char *)CmiAlloc(size);
1986     if (!copy)
1987       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
1988     memcpy(copy, data, size);
1989     data = copy; freemode = 'F';
1990     }
1991   }
1992   
1993   if (pe == cs->pe) {
1994 #if ! CMK_SMP
1995     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
1996                       The SMP comm thread never gets here, because of the pe test. */
1997 #endif
1998     {
1999 #if CMK_IMMEDIATE_MSG
2000       /* execute the immediate message right away */
2001       /* but to avoid infinite recursive call, don't do this if _immRunning */
2002     if (CmiIsImmediate(data)) {
2003       CmiPushImmediateMsg(data);
2004       CmiHandleImmediate();
2005       return 0;
2006     }
2007 #endif
2008     CdsFifo_Enqueue(cs->localqueue, data);
2009     if (freemode == 'A') {
2010       MallocOutgoingMsg(ogm);
2011       ogm->freemode = 'X';
2012       return ogm;
2013     } else return 0;
2014     }
2015   }
2016
2017 #if CMK_PERSISTENT_COMM
2018   if (phs) {
2019       CmiAssert(phsSize == 1);
2020       CmiSendPersistentMsg(*phs, pe, size, data);
2021       return NULL;
2022   }
2023 #endif
2024
2025   CmiMsgHeaderSetLength(data, size);
2026   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
2027   /* CmiCommLock(); */
2028   sendonnetwork = DeliverOutgoingMessage(ogm);
2029   /* CmiCommUnlock(); */
2030   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2031 #if CMK_USE_SYSVSHM
2032         CommunicationServerSysvshm();
2033 #elif CMK_USE_PXSHM
2034         CommunicationServerPxshm();
2035 #endif
2036 #if !CMK_SHARED_VARS_UNAVAILABLE
2037   if (sendonnetwork!=0)   /* only call server when we send msg on network in SMP */
2038 #endif
2039   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2040   MACHSTATE(1,"}  CmiGeneralSend");
2041   return (CmiCommHandle)ogm;
2042 }
2043
2044
2045 void CmiSyncSendFn(int p, int s, char *m)
2046
2047   CQdCreate(CpvAccess(cQdState), 1);
2048   CmiGeneralSend(p,s,'S',m); 
2049 }
2050
2051 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2052
2053   CQdCreate(CpvAccess(cQdState), 1);
2054   return CmiGeneralSend(p,s,'A',m); 
2055 }
2056
2057 void CmiFreeSendFn(int p, int s, char *m)
2058
2059   CQdCreate(CpvAccess(cQdState), 1);
2060   CmiGeneralSend(p,s,'F',m); 
2061 }
2062
2063 void CmiSyncBroadcastFn(int s, char *m)
2064
2065   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2066   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
2067 }
2068
2069 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2070
2071   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2072   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
2073 }
2074
2075 void CmiFreeBroadcastFn(int s, char *m)
2076
2077   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
2078   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
2079 }
2080
2081 void CmiSyncBroadcastAllFn(int s, char *m)
2082
2083   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2084   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
2085 }
2086
2087 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2088
2089   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2090   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
2091 }
2092
2093 void CmiFreeBroadcastAllFn(int s, char *m)
2094
2095   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2096   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
2097 }
2098
2099 #if CMK_NODE_QUEUE_AVAILABLE
2100
2101 void CmiSyncNodeSendFn(int p, int s, char *m)
2102
2103   CQdCreate(CpvAccess(cQdState), 1);
2104   CmiGeneralNodeSend(p,s,'S',m); 
2105 }
2106
2107 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
2108
2109   CQdCreate(CpvAccess(cQdState), 1);
2110   return CmiGeneralNodeSend(p,s,'A',m); 
2111 }
2112
2113 void CmiFreeNodeSendFn(int p, int s, char *m)
2114
2115   CQdCreate(CpvAccess(cQdState), 1);
2116   CmiGeneralNodeSend(p,s,'F',m); 
2117 }
2118
2119 void CmiSyncNodeBroadcastFn(int s, char *m)
2120
2121   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2122   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
2123 }
2124
2125 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
2126
2127   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2128   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
2129 }
2130
2131 void CmiFreeNodeBroadcastFn(int s, char *m)
2132
2133   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2134   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
2135 }
2136
2137 void CmiSyncNodeBroadcastAllFn(int s, char *m)
2138
2139   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2140   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
2141 }
2142
2143 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
2144
2145   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2146   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
2147 }
2148
2149 void CmiFreeNodeBroadcastAllFn(int s, char *m)
2150
2151   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2152   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
2153 }
2154 #endif
2155
2156 /******************************************************************************
2157  *
2158  * Comm Handle manipulation.
2159  *
2160  *****************************************************************************/
2161
2162 int CmiAsyncMsgSent(CmiCommHandle handle)
2163 {
2164   return (((OutgoingMsg)handle)->freemode == 'X');
2165 }
2166
2167 void CmiReleaseCommHandle(CmiCommHandle handle)
2168 {
2169   FreeOutgoingMsg(((OutgoingMsg)handle));
2170 }
2171
2172 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
2173
2174 /*****************************************************************************
2175  *
2176  * NET version List-Cast and Multicast Code
2177  *
2178  ****************************************************************************/
2179                                                                                 
2180 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2181 {
2182   int i;
2183   for(i=0;i<npes;i++) {
2184     CmiReference(msg);
2185     CmiSyncSendAndFree(pes[i], len, msg);
2186   }
2187 }
2188                                                                                 
2189 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2190 {
2191   CmiError("ListSend not implemented.");
2192   return (CmiCommHandle) 0;
2193 }
2194                                                                                 
2195 /* 
2196   because in all net versions, the message buffer after CmiSyncSendAndFree
2197   returns is not changed, we can use memory reference trick to avoid 
2198   memory copying here
2199 */
2200 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2201 {
2202   int i;
2203   for(i=0;i<npes;i++) {
2204     CmiReference(msg);
2205     CmiSyncSendAndFree(pes[i], len, msg);
2206   }
2207   CmiFree(msg);
2208 }
2209
2210 #endif
2211
2212 #if CMK_BROADCAST_SPANNING_TREE
2213 /*
2214   if root is 1, it is called from the broadcast root, only ogm is needed;
2215   if root is 0, it is called in the tree, ogm must be NULL and msg, size and startpe are needed
2216   note: function leaves msg buffer untouched
2217 */
2218 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
2219 {
2220   CmiState cs = CmiGetState();
2221   int i;
2222
2223   if (root) startpe = _Cmi_mynode;
2224   else ogm = NULL;
2225
2226   CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
2227
2228   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
2229     int p = _Cmi_mynode-startpe;
2230     if (p<0) p+=_Cmi_numnodes;
2231     p = BROADCAST_SPANNING_FACTOR*p + i;
2232     if (p > _Cmi_numnodes - 1) break;
2233     p += startpe;
2234     p = p%_Cmi_numnodes;
2235     CmiAssert(p!=_Cmi_mynode);
2236     /* CmiPrintf("SendSpanningChildren: %d => %d\n", _Cmi_mynode, p); */
2237     if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2238   //  DeliverViaNetwork(ogm, nodes + p, noderank, startpe, 1);
2239     DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
2240   }
2241   if (!root && ogm) GarbageCollectMsg(ogm);
2242 }
2243 #endif
2244
2245 #if CMK_BROADCAST_HYPERCUBE
2246 int log_of_2 (int i) {
2247   int m;
2248   for (m=0; i>(1<<m); ++m);
2249   return m;
2250 }
2251
2252 /* called from root - send msg along the hypercube in broadcast.
2253   note: function leaves msg buffer untouched
2254 */
2255 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
2256 {
2257   CmiState cs = CmiGetState();
2258   int i, k, npes, tmp;
2259   int *dest_pes;
2260
2261   if (root) {
2262     msg = ogm->data;
2263     srcpe = CmiMyNode();
2264   }
2265   else ogm = NULL;
2266
2267   tmp = srcpe ^ CmiMyNode();
2268   k = log_of_2(CmiNumNodes()) + 2;
2269   if (tmp) {
2270      do {--k;} while (!(tmp>>k));
2271   }
2272
2273         MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
2274
2275   dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
2276   k--;
2277   npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
2278   
2279   for (i = 0; i < npes; i++) {
2280     int p = dest_pes[i];
2281     /* CmiPrintf("SendHypercube: %d => %d (%d)\n", cs->pe, p, i); */
2282     if (!root && ! ogm) 
2283       ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2284     DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
2285   }
2286   if (!root && ogm) GarbageCollectMsg(ogm);
2287   CmiTmpFree(dest_pes);
2288 }
2289 #endif
2290
2291 /*
2292 #if CMK_IMMEDIATE_MSG
2293 void CmiProbeImmediateMsg()
2294 {
2295   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2296 }
2297 #endif
2298 */
2299
2300 /* Network progress function is used to poll the network when for
2301    messages. This flushes receive buffers on some implementations*/ 
2302 void CmiMachineProgressImpl()
2303 {
2304 #if CMK_USE_SYSVSHM
2305         CommunicationServerSysvshm();
2306 #elif CMK_USE_PXSHM
2307         CommunicationServerPxshm();
2308 #endif
2309   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2310 }
2311
2312 /******************************************************************************
2313  *
2314  * Main code, Init, and Exit
2315  *
2316  *****************************************************************************/
2317 extern void CthInit(char **argv);
2318 extern void ConverseCommonInit(char **);
2319
2320 static char     **Cmi_argv;
2321 static char     **Cmi_argvcopy;
2322 static CmiStartFn Cmi_startfn;   /* The start function */
2323 static int        Cmi_usrsched;  /* Continue after start function finishes? */
2324
2325 static void ConverseRunPE(int everReturn)
2326 {
2327   CmiIdleState *s=CmiNotifyGetState();
2328   CmiState cs;
2329   char** CmiMyArgv;
2330   CmiNodeAllBarrier();
2331   cs = CmiGetState();
2332   CpvInitialize(void *,CmiLocalQueue);
2333   CpvAccess(CmiLocalQueue) = cs->localqueue;
2334
2335   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
2336   if (CmiMyRank())
2337     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
2338   else
2339     CmiMyArgv = Cmi_argv;
2340   CthInit(CmiMyArgv);
2341
2342 #if CMK_USE_GM
2343   CmiCheckGmStatus();
2344 #endif
2345
2346 #if CMK_USE_MX
2347   CmiMXMakeConnection();  
2348 #endif
2349
2350   ConverseCommonInit(CmiMyArgv);
2351
2352   /* initialize the network progress counter*/
2353   /* Network progress function is used to poll the network when for
2354      messages. This flushes receive buffers on some  implementations*/ 
2355   CpvInitialize(int , networkProgressCount);
2356   CpvAccess(networkProgressCount) = 0;
2357
2358   /* better to show the status here */
2359   if (CmiMyPe() == 0) {
2360     if (Cmi_netpoll == 1) {
2361       CmiPrintf("Charm++: scheduler running in netpoll mode.\n");
2362     }
2363 #if CMK_SHARED_VARS_UNAVAILABLE
2364     else {
2365       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
2366         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
2367     }
2368 #endif
2369   }
2370 #if MEMORYUSAGE_OUTPUT
2371   memoryusage_counter = 0;
2372 #endif
2373 #if CMK_USE_GM || CMK_USE_MX
2374   if (Cmi_charmrun_fd != -1)
2375 #endif
2376   {
2377   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
2378       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
2379   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
2380       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
2381 #if CMK_USE_SYSVSHM
2382          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
2383          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
2384 #elif CMK_USE_PXSHM
2385                 //TODO: add pxshm notify idle
2386          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
2387          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
2388 #endif
2389   }
2390
2391 #if CMK_SHARED_VARS_UNAVAILABLE
2392   if (Cmi_netpoll) /*Repeatedly call CommServer*/
2393     CcdCallOnConditionKeep(CcdPERIODIC, 
2394         (CcdVoidFn) CommunicationPeriodic, NULL);
2395   else /*Only need this for retransmits*/
2396     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
2397         (CcdVoidFn) CommunicationPeriodic, NULL);
2398 #endif
2399
2400   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
2401     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
2402 #if CMK_SHARED_VARS_UNAVAILABLE
2403     if (!Cmi_asyncio) {
2404     /* gm cannot live with setitimer */
2405     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
2406     }
2407     else {
2408     /*Occasionally ping charmrun, to test if it's dead*/
2409     struct itimerval i;
2410     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
2411 #if MEMORYUSAGE_OUTPUT
2412     i.it_interval.tv_sec = 0;
2413     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2414     i.it_value.tv_sec = 0;
2415     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2416 #else
2417     i.it_interval.tv_sec = 1;
2418     i.it_interval.tv_usec = 0;
2419     i.it_value.tv_sec = 1;
2420     i.it_value.tv_usec = 0;
2421 #endif
2422     setitimer(ITIMER_REAL, &i, NULL);
2423     }
2424
2425 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
2426     /*Occasionally check for retransmissions, outgoing acks, etc.*/
2427     /*no need for GM case */
2428     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
2429 #endif
2430 #endif
2431     
2432     /*Initialize the clock*/
2433     Cmi_clock=GetClock();
2434   }
2435
2436 #ifdef IGET_FLOWCONTROL 
2437   /* Call the function once to determine the amount of physical memory available */
2438   getAvailSysMem();
2439   /* Call the function to periodically call the token adapt function */
2440   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
2441   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
2442         (CcdVoidFn) TokenUpdatePeriodic, NULL);
2443 #endif
2444   
2445 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
2446   srand((int)(1024.0*CmiWallTimer()));
2447   if (CmiMyPe()==0)
2448     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
2449         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
2450 #endif
2451
2452 #ifdef __ONESIDED_IMPL
2453 #ifdef __ONESIDED_NO_HARDWARE
2454   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
2455   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
2456   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2457   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2458 #endif
2459 #ifdef __ONESIDED_GM_HARDWARE
2460   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2461   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2462 #endif
2463 #endif
2464
2465   /* communication thread */
2466   if (CmiMyRank() == CmiMyNodeSize()) {
2467     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2468     if (Cmi_charmrun_fd!=-1)
2469           while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
2470   }
2471   else
2472   if (!everReturn) {
2473     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2474     /* turn on immediate messages only now
2475      node barrier previously should take care of the node synchronization */
2476     _immediateReady = 1;
2477     if (Cmi_usrsched==0) CsdScheduler(-1);
2478     ConverseExit();
2479   }
2480 }
2481
2482 void ConverseExit(void)
2483 {
2484   MACHSTATE(2,"ConverseExit {");
2485   machine_initiated_shutdown=1;
2486   if (CmiMyRank()==0) {
2487     if(Cmi_print_stats)
2488       printNetStatistics();
2489     log_done();
2490   }
2491 #if CMK_USE_SYSVSHM
2492         CmiExitSysvshm();
2493 #elif CMK_USE_PXSHM
2494         CmiExitPxshm();
2495 #endif
2496   CmiMachineExit();
2497   ConverseCommonExit();               /* should be called by every rank */
2498   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
2499   if (CmiMyRank()==0) CmiStdoutFlush();
2500   if (Cmi_charmrun_fd==-1) {
2501     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
2502     else while (1) CmiYield();
2503   }
2504   else {
2505         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
2506 #if CMK_SHARED_VARS_UNAVAILABLE
2507         Cmi_check_delay = 1.0;          /* speed up checking of charmrun */
2508         while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2509 #elif CMK_MULTICORE
2510         if (!Cmi_commthread && CmiMyRank()==0) {
2511           Cmi_check_delay = 1.0;        /* speed up checking of charmrun */
2512           while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2513         }
2514 #endif
2515   }
2516   MACHSTATE(2,"} ConverseExit");
2517
2518 /*Comm. thread will kill us.*/
2519   while (1) CmiYield();
2520 }
2521
2522 static void set_signals(void)
2523 {
2524   if(!Cmi_truecrash) {
2525     signal(SIGSEGV, KillOnAllSigs);
2526     signal(SIGFPE, KillOnAllSigs);
2527     signal(SIGILL, KillOnAllSigs);
2528     signal(SIGINT, KillOnAllSigs);
2529     signal(SIGTERM, KillOnAllSigs);
2530     signal(SIGABRT, KillOnAllSigs);
2531 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2532     signal(SIGQUIT, KillOnAllSigs);
2533     signal(SIGBUS, KillOnAllSigs);
2534 #     if CMK_HANDLE_SIGUSR
2535     signal(SIGUSR1, HandleUserSignals);
2536     signal(SIGUSR2, HandleUserSignals);
2537 #     endif
2538 #   endif /*UNIX*/
2539   }
2540 }
2541
2542 /*Socket idle function to use before addresses have been
2543   obtained.  During the real program, we idle with CmiYield.
2544 */
2545 static void obtain_idleFn(void) {sleep(0);}
2546
2547 static int net_default_skt_abort(int code,const char *msg)
2548 {
2549   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2550   machine_exit(1);
2551   return -1;
2552 }
2553
2554 #if MACHINE_DEBUG_LOG
2555 FILE *debugLog = NULL;
2556 #endif
2557
2558 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
2559 {
2560 #if MACHINE_DEBUG
2561   debugLog=NULL;
2562 #endif
2563 #if CMK_USE_HP_MAIN_FIX
2564 #if FOR_CPLUS
2565   _main(argc,argv);
2566 #endif
2567 #endif
2568   Cmi_startfn = fn; Cmi_usrsched = usc;
2569   Cmi_netpoll = 0;
2570 #if CMK_NETPOLL
2571   Cmi_netpoll = 1;
2572 #endif
2573 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2574   Cmi_idlepoll = 0;
2575 #else
2576   Cmi_idlepoll = 1;
2577 #endif
2578   Cmi_truecrash = 0;
2579   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
2580       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2581     /* netpoll disable signal */
2582   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2583     /* idlepoll use poll instead if sleep when idle */
2584   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2585     /* idlesleep use sleep instead if busywait when idle */
2586   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2587   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2588
2589   Cmi_asyncio = 1;
2590 #if CMK_ASYNC_NOT_NEEDED
2591   Cmi_asyncio = 0;
2592 #endif
2593   if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2594   if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2595 #if CMK_MULTICORE
2596   if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
2597     Cmi_commthread = 1;
2598 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2599     _Cmi_noprocforcommthread = 1;   /* worker thread go sleep */
2600 #endif
2601     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2602   }
2603 #endif
2604
2605   skt_init();
2606   /* use special abort handler instead of default_skt_abort to 
2607      prevent exit trapped by atexit_check() due to the exit() call  */
2608   skt_set_abort(net_default_skt_abort);
2609   atexit(machine_atexit_check);
2610   parse_netstart();
2611 #if ! CMK_SMP && ! defined(_WIN32)
2612   /* only get forks in non-smp mode */
2613   parse_forks();
2614 #endif
2615   extract_args(argv);
2616   log_init();
2617   Cmi_scanf_mutex = CmiCreateLock();
2618
2619 #if MACHINE_DEBUG_LOG
2620   {
2621     char ln[200];
2622     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2623     debugLog=fopen(ln,"w");
2624   }
2625 #endif
2626
2627     /* NOTE: can not acutally call timer before timerInit ! GZ */
2628   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2629
2630   skt_set_idle(obtain_idleFn);
2631   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2632         set_signals();
2633 #if CMK_USE_TCP
2634         dataskt=skt_server(&dataport);
2635 #elif !CMK_USE_GM && !CMK_USE_MX
2636         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2637 #else
2638           /* GM and MX do not need to create any socket for communication */
2639         dataskt=-1;
2640 #endif
2641         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2642         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2643         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2644         CmiStdoutInit();
2645   } else {/*Standalone operation*/
2646         printf("Charm++: standalone mode (not using charmrun)\n");
2647         dataskt=-1;
2648         Cmi_charmrun_fd=-1;
2649   }
2650
2651   CmiMachineInit(argv);
2652
2653   node_addresses_obtain(argv);
2654   MACHSTATE(5,"node_addresses_obtain done");
2655
2656   CmiCommunicationInit(argv);
2657
2658 #if CMK_USE_SYSVSHM
2659   CmiInitSysvshm(argv);
2660 #elif CMK_USE_PXSHM
2661   CmiInitPxshm(argv);
2662 #endif
2663
2664   skt_set_idle(CmiYield);
2665   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2666
2667   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2668       Cmi_check_delay=1.0e30;
2669
2670   CsvInitialize(CmiNodeState, NodeState);
2671   CmiNodeStateInit(&CsvAccess(NodeState));
2672
2673   /* Network progress function is used to poll the network when for
2674      messages. This flushes receive buffers on some  implementations*/ 
2675   networkProgressPeriod = 0;  
2676   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2677
2678   Cmi_argvcopy = CmiCopyArgs(argv);
2679   Cmi_argv = argv; 
2680
2681   CmiStartThreads(argv);
2682
2683 #if CMK_USE_AMMASSO
2684   CmiAmmassoOpenQueuePairs();
2685 #endif
2686
2687   ConverseRunPE(everReturn);
2688 }
2689
2690 #if CMK_PERSISTENT_COMM
2691
2692 int persistentSendMsgHandlerIdx;
2693
2694 static void sendPerMsgHandler(char *msg)
2695 {
2696   int msgSize;
2697   void *destAddr, *destSizeAddr;
2698
2699   msgSize = CmiMsgHeaderGetLength(msg);
2700   msgSize -= 2*sizeof(void *);
2701   destAddr = *(void **)(msg + msgSize);
2702   destSizeAddr = *(void **)(msg + msgSize + sizeof(void*));
2703 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2704   CmiSetHandler(msg, CmiGetXHandler(msg));
2705   *((int *)destSizeAddr) = msgSize;
2706   memcpy(destAddr, msg, msgSize);
2707 }
2708
2709 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2710 {
2711   CmiAssert(h!=NULL);
2712   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2713   CmiAssert(slot->used == 1);
2714   CmiAssert(slot->destPE == destPE);
2715   if (size > slot->sizeMax) {
2716     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2717     CmiAbort("Abort: Invalid size\n");
2718   }
2719
2720 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2721
2722   if (slot->destAddress[0]) {
2723     int newsize = size + sizeof(void *)*2;
2724     char *newmsg = (char*)CmiAlloc(newsize);
2725     memcpy(newmsg, m, size);
2726     memcpy(newmsg+size, &slot->destAddress[0], sizeof(void *));
2727     memcpy(newmsg+size+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2728     CmiFree(m);
2729     CmiMsgHeaderSetLength(newmsg, size + sizeof(void *)*2);
2730     CmiSetXHandler(newmsg, CmiGetHandler(newmsg));
2731     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2732     phs = NULL; phsSize = 0;
2733     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2734   }
2735   else {
2736 #if 1
2737     if (slot->messageBuf != NULL) {
2738       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2739       CmiAbort("");
2740     }
2741     slot->messageBuf = m;
2742     slot->messageSize = size;
2743 #else
2744     /* normal send */
2745     PersistentHandle  *phs_tmp = phs;
2746     int phsSize_tmp = phsSize;
2747     phs = NULL; phsSize = 0;
2748     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2749     CmiSyncSendAndFree(slot->destPE, size, m);
2750     phs = phs_tmp; phsSize = phsSize_tmp;
2751 #endif
2752   }
2753 }
2754
2755 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2756 {
2757   char *dupmsg = (char *) CmiAlloc(size);
2758   memcpy(dupmsg, msg, size);
2759
2760   /*  CmiPrintf("Setting root to %d\n", 0); */
2761   if (CmiMyPe()==destPE) {
2762     CQdCreate(CpvAccess(cQdState), 1);
2763     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2764   }
2765   else
2766     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2767 }
2768
2769 /* called in PumpMsgs */
2770 int PumpPersistent()
2771 {
2772   PersistentReceivesTable *slot = persistentReceivesTableHead;
2773   int status = 0;
2774   while (slot) {
2775     unsigned int size = *(slot->recvSizePtr[0]);
2776     if (size > 0)
2777     {
2778       char *msg = slot->messagePtr[0];
2779 /*CmiPrintf("size: %d msg:%p %p\n", size, msg, slot->messagePtr);*/
2780
2781 #if 0
2782       void *dupmsg;
2783       dupmsg = CmiAlloc(size);
2784       
2785       _MEMCHECK(dupmsg);
2786       memcpy(dupmsg, msg, size);
2787       msg = dupmsg;
2788 #else
2789       /* return messagePtr directly and user MUST make sure not to delete it. */
2790       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2791
2792       CmiReference(msg);
2793 #endif
2794
2795       CmiPushPE(CMI_DEST_RANK(msg), msg);
2796 #if CMK_BROADCAST_SPANNING_TREE
2797       if (CMI_BROADCAST_ROOT(msg))
2798           SendSpanningChildren(size, msg);
2799 #endif
2800       *(slot->recvSizePtr[0]) = 0;
2801       status = 1;
2802     }
2803     slot = slot->next;
2804   }
2805   return status;
2806 }
2807
2808 void *PerAlloc(int size)
2809 {
2810   return CmiAlloc(size);
2811 }
2812                                                                                 
2813 void PerFree(char *msg)
2814 {
2815     CmiFree(msg);
2816 }
2817
2818 void persist_machine_init() 
2819 {
2820   persistentSendMsgHandlerIdx =
2821        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
2822 }
2823
2824 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
2825 {
2826   int i;
2827   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
2828     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
2829     _MEMCHECK(buf);
2830     memset(buf, 0, maxBytes+sizeof(int)*2);
2831     slot->messagePtr[i] = buf;
2832     slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
2833   }
2834   slot->sizeMax = maxBytes;
2835 }
2836
2837 #endif
2838
2839
2840 #if CMK_CELL
2841
2842 #include "spert_ppu.h"
2843
2844 void machine_OffloadAPIProgress() {
2845   CmiCommLock();
2846   OffloadAPIProgress();
2847   CmiCommUnlock();
2848 }
2849 #endif
2850
2851
2852
2853 /*@}*/