29ed435f79c7ff8a3e175497d4c01c86500faf65
[charm.git] / src / arch / net / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * Basic NET implementation of Converse machine layer
10  * @ingroup NET
11  */
12
13 /** @defgroup NET
14  * NET implementation of machine layer, ethernet in particular
15  * @ingroup Machine
16  *
17  * THE DATAGRAM STREAM
18  *
19  * Messages are sent using UDP datagrams.  The sender allocates a
20  * struct for each datagram to be sent.  These structs stick around
21  * until slightly after the datagram is acknowledged.
22  *
23  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
24  * Each node has an OtherNode struct for every other node in the
25  * system.  The OtherNode struct contains:
26  *
27  *   send_queue   (all datagram-structs not yet transmitted)
28  *   send_window  (all datagram-structs transmitted but not ack'd)
29  *
30  * When an acknowledgement comes in, all packets in the send-window
31  * are either marked as acknowledged or pushed back into the send
32  * queue for retransmission.
33  *
34  * THE OUTGOING MESSAGE
35  *
36  * When you send or broadcast a message, the first thing the system
37  * does is system creates an OutgoingMsg struct to represent the
38  * operation.  The OutgoingMsg contains a very direct expression
39  * of what you want to do:
40  *
41  * OutgoingMsg:
42  *
43  *   size      --- size of message in bytes
44  *   data      --- pointer to the buffer containing the message
45  *   src       --- processor which sent the message
46  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
47  *   freemode  --- see below.
48  *   refcount  --- see below.
49  *
50  * The OutgoingMsg is kept around until the transmission is done, then
51  * it is garbage collected --- the refcount and freemode fields are
52  * to assist garbage collection.
53  *
54  * The freemode indicates which kind of buffer-management policy was
55  * used (sync, async, or freeing).  The sync policy is handled
56  * superficially by immediately converting sync sends into freeing
57  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
58  * (freeing).  If the freemode is 'F', then garbage collection
59  * involves freeing the data and the OutgoingMsg structure itself.  If
60  * the freemode is 'A', then the only cleanup is to change the
61  * freemode to 'X', a condition which is then detectable by
62  * CmiAsyncMsgSent.  In this case, the actual freeing of the
63  * OutgoingMsg is done by CmiReleaseCommHandle.
64  *
65  * When the transmission is initiated, the system computes how many
66  * datagrams need to be sent, total.  This number is stored in the
67  * refcount field.  Each time a datagram is delivered, the refcount
68  * is decremented, when it reaches zero, cleanup is performed.  There
69  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
70  * is a send (not a broadcast) and can be performed with shared
71  * memory, the entire datagram system is bypassed, the message is
72  * simply delivered and freed, not using the refcount mechanism at
73  * all.  Exception 2: If the message is a broadcast, then part of the
74  * broadcast that can be done via shared memory is performed prior to
75  * initiating the datagram/refcount system.
76  *
77  * DATAGRAM FORMATS AND MESSAGE FORMATS
78  *
79  * Datagrams have this format:
80  *
81  *   srcpe   (16 bits) --- source processor number.
82  *   magic   ( 8 bits) --- magic number to make sure DG is good.
83  *   dstrank ( 8 bits) --- destination processor rank.
84  *   seqno   (32 bits) --- packet sequence number.
85  *   data    (XX byte) --- user data.
86  *
87  * The only reason the srcpe is in there is because the receiver needs
88  * to know which receive window to use.  The dstrank field is needed
89  * because transmission is node-to-node.  Once the message is
90  * assembled by the node, it must be delivered to the appropriate PE.
91  * The dstrank field is used to encode certain special-case scenarios.
92  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
93  * and should be delivered to all processors in the node.  If the dstrank
94  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
95  * which case the srcpe is the number of the acknowledger, the seqno is
96  * always zero, and the user data is a list of the seqno's being
97  * acknowledged.  There may be other dstrank codes for special functions.
98  *
99  * To send a message, one chops it up into datagrams and stores those
100  * datagrams in a send-queue.  These outgoing datagrams aren't stored
101  * in the explicit format shown above.  Instead, they are stored as
102  * ImplicitDgrams, which contain the datagram header and a pointer to
103  * the user data (which is in the user message buffer, which is in the
104  * OutgoingMsg).  At transmission time these are combined together.
105
106  * The combination of the datagram header with the user's data is
107  * performed right in the user's message buffer.  Note that the
108  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
109  * of the user's message with a datagram header, sends the datagram
110  * straight from the user's message buffer, then restores the user's
111  * buffer to its original state.  There is a small problem with the
112  * first datagram of the message: one needs 64 bits of space to store
113  * the datagram header.  To make sure this space is there, we added a
114  * 64-bit unused space to the front of the Cmi message header.  In
115  * addition to this, we also add 32 bits to the Cmi message header
116  * to make room for a length-field, making it possible to identify
117  * message boundaries.
118  *
119  * CONCURRENCY CONTROL
120  *
121  * This has changed recently.
122  *
123  * EFFICIENCY NOTES
124  *
125  * The sender-side does little copying.  The async and freeing send
126  * routines do no copying at all.  The sync send routines copy the
127  * message, then use the freeing-send routines.  The other alternative
128  * is to not copy the message, and use the async send mechanism
129  * combined with a blocking wait.  Blocking wait seems like a bad
130  * idea, since it could take a VERY long time to get all those
131  * datagrams out the door.
132  *
133  * The receiver side, unfortunately, must copy.  To avoid copying,
134  * it would have to receive directly into a preallocated message buffer.
135  * Unfortunately, this can't work: there's no way to know how much
136  * memory to preallocate, and there's no way to know which datagram
137  * is coming next.  Thus, we receive into fixed-size (large) datagram
138  * buffers.  These are then inspected, and the messages extracted from
139  * them.
140  *
141  * Note that we are allocating a large number of structs: OutgoingMsg's,
142  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
143  * is a fixed-size structure.  Thus, we can do memory allocation by
144  * simply keeping a linked-list of unused structs around.  The only
145  * place where expensive memory allocation is performed is in the
146  * sync routines.
147  *
148  * Since the datagrams from one node to another are fully ordered,
149  * there is slightly more ordering than is needed: in theory, the
150  * datagrams of one message don't need to be ordered relative to the
151  * datagrams of another.  This was done to simplify the sequencing
152  * mechanisms: implementing a fully-ordered stream is much simpler
153  * than a partially-ordered one.  It also makes it possible to
154  * modularize, layering the message transmitter on top of the
155  * datagram-sequencer.  In other words, it was just easier this way.
156  * Hopefully, this won't cause serious degradation: LAN's rarely get
157  * datagrams out of order anyway.
158  *
159  * A potential efficiency problem is the lack of message-combining.
160  * One datagram could conceivably contain several messages.  This
161  * might be more efficient, it's not clear how much overhead is
162  * involved in sending a short datagram.  Message-combining isn't
163  * really ``integrated'' into the design of this software, but you
164  * could fudge it as follows.  Whenever you pull a short datagram from
165  * the send-queue, check the next one to see if it's also a short
166  * datagram.  If so, pack them together into a ``combined'' datagram.
167  * At the receive side, simply check for ``combined'' datagrams, and
168  * treat them as if they were simply two datagrams.  This would
169  * require extra copying.  I have no idea if this would be worthwhile.
170  *
171  *****************************************************************************/
172
173 /**
174  * @addtogroup NET
175  * @{
176  */
177
178 /*****************************************************************************
179  *
180  * Include Files
181  *
182  ****************************************************************************/
183
184 #define _GNU_SOURCE 1
185 #include <stdarg.h> /*<- was <varargs.h>*/
186
187 #define CMK_USE_PRINTF_HACK 0
188 #if CMK_USE_PRINTF_HACK
189 /*HACK: turn printf into CmiPrintf, by just defining our own
190 external symbol "printf".  This may be more trouble than it's worth,
191 since the only advantage is that it works properly with +syncprint.
192
193 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
194 because they don't call printf.  Has to be defined up here because we probably 
195 haven't properly guessed this compiler's prototype for "printf".
196 */
197 static void InternalPrintf(const char *f, va_list l);
198 int printf(const char *fmt, ...) {
199         int nChar;
200         va_list p; va_start(p, fmt);
201         InternalPrintf(fmt,p);
202         va_end(p);
203         return 10;
204 }
205 #endif
206
207
208 #include "converse.h"
209 #include "memory-isomalloc.h"
210
211 #include <stdio.h>
212 #include <stdlib.h>
213 #include <ctype.h>
214 #include <fcntl.h>
215 #include <errno.h>
216 #include <setjmp.h>
217 #include <signal.h>
218 #include <string.h>
219
220 /* define machine debug */
221 #include "machine.h"
222
223 /******************* Producer-Consumer Queues ************************/
224 #include "pcqueue.h"
225
226 #include "machine-smp.h"
227
228 #if CMK_USE_KQUEUE
229 #include <sys/event.h>
230 int _kq = -1;
231 #endif
232
233 #if CMK_USE_POLL
234 #include <poll.h>
235 #endif
236
237 #if CMK_USE_GM
238 #include "gm.h"
239 struct gm_port *gmport = NULL;
240 int  portFinish = 0;
241 #endif
242
243 #if CMK_USE_MX
244 #include "myriexpress.h"
245 mx_endpoint_t      endpoint;
246 mx_endpoint_addr_t endpoint_addr;
247 int MX_FILTER   =  123456;
248 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
249 #endif
250
251 #if CMK_USE_AMMASSO
252   #include "clustercore/ccil_api.h"
253 #endif
254
255 #if CMK_MULTICORE
256 int Cmi_commthread = 0;
257 #endif
258
259 #include "conv-ccs.h"
260 #include "ccs-server.h"
261 #include "sockRoutines.h"
262
263 #if defined(_WIN32) && ! defined(__CYGWIN__)
264 /*For windows systems:*/
265 #  include <windows.h>
266 #  include <wincon.h>
267 #  include <sys/types.h>
268 #  include <sys/timeb.h>
269 #  define fdopen _fdopen
270 #  define SIGBUS -1  /*These signals don't exist in Win32*/
271 #  define SIGKILL -1
272 #  define SIGQUIT -1
273 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
274
275 #else /*UNIX*/
276 #  include <pwd.h>
277 #  include <unistd.h>
278 #  include <fcntl.h>
279 #  include <sys/file.h>
280 #endif
281
282 #if CMK_PERSISTENT_COMM
283 #include "persist_impl.h"
284 #endif
285
286 #define PRINTBUFSIZE 16384
287
288 #ifdef __ONESIDED_IMPL
289 #ifdef __ONESIDED_NO_HARDWARE
290 int putSrcHandler;
291 int putDestHandler;
292 int getSrcHandler;
293 int getDestHandler;
294 #include "conv-onesided.c"
295 #endif
296 #endif
297
298 static void CommunicationServer(int withDelayMs, int where);
299
300 void CmiHandleImmediate();
301 extern int CmemInsideMem();
302 extern void CmemCallWhenMemAvail();
303 static void ConverseRunPE(int everReturn);
304 void CmiYield(void);
305 void ConverseCommonExit(void);
306
307 static unsigned int dataport=0;
308 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
309 static SOCKET       dataskt;
310
311 extern void TokenUpdatePeriodic();
312 extern void getAvailSysMem();
313
314 #define BROADCAST_SPANNING_FACTOR               4
315
316 /****************************************************************************
317  *
318  * Handling Errors
319  *
320  * Errors should be handled by printing a message on stderr and
321  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
322  * communication should be made.  The other processes will notice the
323  * abnormal termination and will deal with it.
324  *
325  * Rationale: if an error triggers an attempt to send a message,
326  * the attempt to send a message is likely to trigger another error,
327  * leading to an infinite loop and a process that spins instead of
328  * shutting down.
329  *
330  *****************************************************************************/
331
332 static int machine_initiated_shutdown=0;
333 static int already_in_signal_handler=0;
334
335 static void CmiDestoryLocks();
336
337 void CmiMachineExit();
338
339 #if CMK_USE_SYSVSHM /* define teardown function before use */
340 void tearDownSharedBuffers();
341 #endif 
342
343 static void machine_exit(int status)
344 {
345   MACHSTATE(3,"     machine_exit");
346   machine_initiated_shutdown=1;
347
348   CmiDestoryLocks();            /* destory locks to prevent dead locking */
349   EmergencyExit();
350
351 #if CMK_USE_GM
352   if (gmport) { 
353     gm_close(gmport); gmport = 0;
354     gm_finalize();
355   }
356 #endif
357 #if CMK_USE_SYSVSHM
358   tearDownSharedBuffers();
359 #endif
360   CmiMachineExit();
361   exit(status);
362 }
363
364 static void charmrun_abort(const char*);
365
366 static void KillEveryone(const char *msg)
367 {
368   charmrun_abort(msg);
369   machine_exit(1);
370 }
371
372 static void KillEveryoneCode(n)
373 int n;
374 {
375   char _s[100];
376   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
377   charmrun_abort(_s);
378   machine_exit(1);
379 }
380
381 static void KillOnAllSigs(int sigNo)
382 {
383   const char *sig="unknown signal";
384   const char *suggestion="";
385   if (machine_initiated_shutdown ||
386       already_in_signal_handler) 
387         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
388   already_in_signal_handler=1;
389
390   if (CpvAccess(cmiArgDebugFlag)) {
391     CpdNotify(CPD_SIGNAL,sigNo);
392     CpdFreeze();
393   }
394   
395   CmiDestoryLocks();            /* destory locks */
396
397   if (sigNo==SIGSEGV) {
398      sig="segmentation violation";
399      suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
400   }
401   if (sigNo==SIGFPE) {
402      sig="floating point exception";
403      suggestion="Check for integer or floating-point division by zero.\n";
404   }
405   if (sigNo==SIGBUS) {
406      sig="bus error";
407      suggestion="Check for misaligned reads or writes to memory.\n";
408   }
409   if (sigNo==SIGILL) {
410      sig="illegal instruction";
411      suggestion="Check for calls to uninitialized function pointers.\n";
412   }
413   if (sigNo==SIGKILL) sig="caught signal KILL";
414   if (sigNo==SIGQUIT) sig="caught signal QUIT";
415   if (sigNo==SIGTERM) sig="caught signal TERM";
416   MACHSTATE1(5,"     Caught signal %s ",sig);
417 /*ifdef this part*/
418 #ifdef __FAULT__
419   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
420                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
421   }else{
422 #else
423         {
424 #endif
425    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
426         "Signal: %s\n",CmiMyPe(),sig);
427         if (0!=suggestion[0])
428                 CmiError("Suggestion: %s",suggestion);
429         CmiPrintStackTrace(1);
430         charmrun_abort(sig);
431         machine_exit(1);                
432         }       
433 }
434
435 static void machine_atexit_check(void)
436 {
437   if (!machine_initiated_shutdown)
438     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
439   printf("Program finished.\n");
440 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
441   fgetc(stdin);
442 #endif
443 }
444
445 #if !defined(_WIN32) || defined(__CYGWIN__)
446 static void HandleUserSignals(int signum)
447 {
448   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
449   CcdRaiseCondition(condnum);
450 }
451 #endif
452
453 static void PerrorExit(const char *msg)
454 {
455   perror(msg);
456   machine_exit(1);
457 }
458
459 /*****************************************************************************
460  *
461  *     Utility routines for network machine interface.
462  *
463  *****************************************************************************/
464
465 /*
466 Horrific #defines to hide the differences between select() and poll().
467  */
468 #if CMK_USE_POLL /*poll() version*/
469 # define CMK_PIPE_DECL(delayMs) \
470         struct pollfd fds[10]; \
471         int nFds_sto=0; int *nFds=&nFds_sto; \
472         int pollDelayMs=delayMs;
473 # define CMK_PIPE_SUB fds,nFds
474 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
475
476 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
477 # define CMK_PIPE_ADDREAD(rd_fd) \
478         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
479 # define CMK_PIPE_ADDWRITE(wr_fd) \
480         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
481 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
482 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
483
484 #elif CMK_USE_KQUEUE /* kqueue version */
485
486 # define CMK_PIPE_DECL(delayMs) \
487         if (_kq == -1) _kq = kqueue(); \
488     struct kevent ke_sto; \
489     struct kevent* ke = &ke_sto; \
490     struct timespec tmo; \
491     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
492 # define CMK_PIPE_SUB ke
493 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
494
495 # define CMK_PIPE_PARAM struct kevent* ke
496 # define CMK_PIPE_ADDREAD(rd_fd) \
497         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
498                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
499 # define CMK_PIPE_ADDWRITE(wr_fd) \
500         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
501                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
502 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
503 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
504
505 #else /*select() version*/
506
507 # define CMK_PIPE_DECL(delayMs) \
508         fd_set rfds_sto,wfds_sto;\
509         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
510         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
511 # define CMK_PIPE_SUB rfds,wfds
512 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
513
514 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
515 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
516 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
517 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
518 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
519 #endif
520
521 static void CMK_PIPE_CHECKERR(void) {
522 #if defined(_WIN32) && !defined(__CYGWIN__)
523 /* Win32 socket seems to randomly return inexplicable errors
524 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
525         int err=WSAGetLastError();
526         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
527 o,err);
528 */
529 #else /*UNIX machine*/
530         if (errno!=EINTR)
531                 KillEveryone("Socket error in CheckSocketsReady!\n");
532 #endif
533 }
534
535
536 static void CmiStdoutFlush(void);
537 static int  CmiStdoutNeedsService(void);
538 static void CmiStdoutService(void);
539 static void CmiStdoutAdd(CMK_PIPE_PARAM);
540 static void CmiStdoutCheck(CMK_PIPE_PARAM);
541
542
543 double GetClock(void)
544 {
545 #if defined(_WIN32) && !defined(__CYGWIN__)
546   struct _timeb tv; 
547   _ftime(&tv);
548   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
549 #else
550   struct timeval tv; int ok;
551   ok = gettimeofday(&tv, NULL);
552   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
553   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
554 #endif
555 }
556
557 char *CopyMsg(char *msg, int len)
558 {
559   char *copy = (char *)CmiAlloc(len);
560   if (!copy)
561       fprintf(stderr, "Out of memory\n");
562   memcpy(copy, msg, len);
563   return copy;
564 }
565
566 /***********************************************************************
567  *
568  * Abort function:
569  *
570  ************************************************************************/
571
572 static int  Cmi_truecrash;
573 static int already_aborting=0;
574 void CmiAbort(const char *message)
575 {
576   if (already_aborting) machine_exit(1);
577   already_aborting=1;
578         {
579 /*       char str[100];
580          sprintf(str,"dead.%d",CmiMyNode());
581          FILE *fp = fopen(str,"w");
582          fprintf(fp,"%s",message);
583          fclose(fp);*/
584         }
585   MACHSTATE1(5,"CmiAbort(%s)",message);
586
587   /* if CharmDebug is attached simply try to send a message to it */
588   if (CpvAccess(cmiArgDebugFlag)) {
589     CpdNotify(CPD_ABORT, message);
590     CpdFreeze();
591   }
592   
593   /* CmiDestoryLocks();  */
594
595   {
596 /*    char str[22];
597     snprintf(str,18,"dead.%d",CmiMyPe());
598     FILE *fp = fopen(str,"w");
599     fprintf(fp,"Abort:%s\n",message);
600     fclose(fp);*/
601   }
602
603   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
604         "Reason: %s\n",CmiMyPe(),message);
605   CmiPrintStackTrace(0);
606   
607   /*Send off any remaining prints*/
608   CmiStdoutFlush();
609   
610   if(Cmi_truecrash) {
611     printf("CHARM++ FATAL ERROR: %s\n", message);
612     *(int *)NULL = 0; /*Write to null, causing bus error*/
613   } else {
614     charmrun_abort(message);
615     machine_exit(1);
616   }
617 }
618
619
620 /******************************************************************************
621  *
622  * CmiEnableAsyncIO
623  *
624  * The net and tcp versions use a bunch of unix processes talking to each
625  * other via file descriptors.  We need for a signal SIGIO to be generated
626  * each time a message arrives, making it possible to write a signal
627  * handler to handle the messages.  The vast majority of unixes can,
628  * in fact, do this.  However, there isn't any standard for how this is
629  * supposed to be done, so each version of UNIX has a different set of
630  * calls to turn this signal on.  So, there is like one version here for
631  * every major brand of UNIX.
632  *
633  *****************************************************************************/
634
635 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
636 #include <fcntl.h>
637 void CmiEnableAsyncIO(int fd)
638 {
639   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
640     CmiError("setting socket owner: %s\n", strerror(errno)) ;
641     exit(1);
642   }
643   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
644     CmiError("setting socket async: %s\n", strerror(errno)) ;
645     exit(1);
646   }
647 }
648 #else
649 void CmiEnableAsyncIO(int fd) { }
650 #endif
651
652 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
653 #if !defined(_WIN32) || defined(__CYGWIN__)
654 void CmiEnableNonblockingIO(int fd) {
655   int on=1;
656   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
657     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
658     exit(1);
659   }
660 }
661 #else
662 void CmiEnableNonblockingIO(int fd) { }
663 #endif
664
665
666 /******************************************************************************
667 *
668 * Configuration Data
669 *
670 * This data is all read in from the NETSTART variable (provided by the
671 * charmrun) and from the command-line arguments.  Once read in, it is never
672  * modified.
673  *
674  *****************************************************************************/
675
676 int               _Cmi_mynode;    /* Which address space am I */
677 int               _Cmi_mynodesize;/* Number of processors in my address space */
678 int               _Cmi_numnodes;  /* Total number of address spaces */
679 int               _Cmi_numpes;    /* Total number of processors */
680 static int        Cmi_nodestart; /* First processor in this address space */
681 static skt_ip_t   Cmi_self_IP;
682 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
683 static int        Cmi_charmrun_port;
684 static int        Cmi_charmrun_pid;
685 static int        Cmi_charmrun_fd=-1;
686
687 static int    Cmi_netpoll;
688 static int    Cmi_asyncio;
689 static int    Cmi_idlepoll;
690 static int    Cmi_syncprint;
691 static int Cmi_print_stats = 0;
692
693 #if ! CMK_SMP && ! defined(_WIN32)
694 /* parse forks only used in non-smp mode */
695 static void parse_forks(void) {
696   char *forkstr;
697   int nread;
698   int forks;
699   int i,pid;
700   forkstr=getenv("CmiMyForks");
701   if(forkstr!=0) { /* charmrun */
702         nread = sscanf(forkstr,"%d",&forks);
703         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
704                 pid=fork();
705                 if(pid<0) CmiAbort("Fork returned an error");
706                 if(pid==0) { /* forked process */
707                         /* reset mynode,pe & exit loop */
708                         _Cmi_mynode+=i;
709                         _Cmi_mype+=i;
710                         break;
711                 }
712         }
713   }
714 }
715 #endif
716
717 static void parse_netstart(void)
718 {
719   char *ns;
720   int nread;
721   int port;
722   ns = getenv("NETSTART");
723   if (ns!=0) 
724   {/*Read values set by Charmrun*/
725         char Cmi_charmrun_name[1024];
726         nread = sscanf(ns, "%d%s%d%d%d",
727                  &_Cmi_mynode,
728                  Cmi_charmrun_name, &Cmi_charmrun_port,
729                  &Cmi_charmrun_pid, &port);
730         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
731
732         if (nread!=5) {
733                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
734                 exit(1);
735         }
736   } else 
737   {/*No charmrun-- set flag values for standalone operation*/
738         _Cmi_mynode=0;
739         Cmi_charmrun_IP=_skt_invalid_ip;
740         Cmi_charmrun_port=0;
741         Cmi_charmrun_pid=0;
742         dataport = -1;
743   }
744 #if CMK_USE_IBVERBS | CMK_USE_IBUD
745         char *cmi_num_nodes = getenv("CmiNumNodes");
746         if(cmi_num_nodes != NULL){
747                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
748         }
749 #endif  
750 }
751
752 static void extract_common_args(char **argv)
753 {
754   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
755     Cmi_print_stats = 1;
756 }
757
758 /* for SMP */
759 #include "machine-smp.c"
760
761 CsvDeclare(CmiNodeState, NodeState);
762
763 /* Immediate message support */
764 #define CMI_DEST_RANK(msg)      *(int *)(msg)
765 #include "immediate.c"
766
767 /******************************************************************************
768  *
769  * Packet Performance Logging
770  *
771  * This module is designed to give a detailed log of the packets and their
772  * acknowledgements, for performance tuning.  It can be disabled.
773  *
774  *****************************************************************************/
775
776 #define LOGGING 0
777
778 #if LOGGING
779
780 typedef struct logent {
781   double time;
782   int seqno;
783   int srcpe;
784   int dstpe;
785   int kind;
786 } *logent;
787
788
789 logent log;
790 int    log_pos;
791 int    log_wrap;
792
793 static void log_init(void)
794 {
795   log = (logent)malloc(50000 * sizeof(struct logent));
796   _MEMCHECK(log);
797   log_pos = 0;
798   log_wrap = 0;
799 }
800
801 static void log_done(void)
802 {
803   char logname[100]; FILE *f; int i, size;
804   sprintf(logname, "log.%d", _Cmi_mynode);
805   f = fopen(logname, "w");
806   if (f==0) KillEveryone("fopen problem");
807   if (log_wrap) size = 50000; else size=log_pos;
808   for (i=0; i<size; i++) {
809     logent ent = log+i;
810     fprintf(f, "%1.4f %d %c %d %d\n",
811             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
812   }
813   fclose(f);
814 }
815
816 void printLog(void)
817 {
818   char logname[100]; FILE *f; int i, j, size;
819   static int logged = 0;
820   if (logged)
821       return;
822   logged = 1;
823   CmiPrintf("Logging: %d\n", _Cmi_mynode);
824   sprintf(logname, "log.%d", _Cmi_mynode);
825   f = fopen(logname, "w");
826   if (f==0) KillEveryone("fopen problem");
827   for (i = 5000; i; i--)
828   {
829   /*for (i=0; i<size; i++) */
830     j = log_pos - i;
831     if (j < 0)
832     {
833         if (log_wrap)
834             j = 5000 + j;
835         else
836             j = 0;
837     };
838     {
839     logent ent = log+j;
840     fprintf(f, "%1.4f %d %c %d %d\n",
841             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
842     }
843   }
844   fclose(f);
845   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
846 }
847
848 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
849
850 #endif
851
852 #if !LOGGING
853
854 #define log_init() /*empty*/
855 #define log_done() /*empty*/
856 #define printLog() /*empty*/
857 #define LOG(t,s,k,d,q) /*empty*/
858
859 #endif
860
861 /******************************************************************************
862  *
863  * Node state
864  *
865  *****************************************************************************/
866
867
868 static CmiNodeLock    Cmi_scanf_mutex;
869 static double         Cmi_clock;
870 static double         Cmi_check_delay = 3.0;
871
872 /******************************************************************************
873  *
874  * OS Threads
875  * SMP implementation moved to machine-smp.c
876  *****************************************************************************/
877
878 /************************ No kernel SMP threads ***************/
879 #if CMK_SHARED_VARS_UNAVAILABLE
880
881 static volatile int memflag=0;
882 void CmiMemLock() { memflag++; }
883 void CmiMemUnlock() { memflag--; }
884
885 static volatile int comm_flag=0;
886 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
887 #ifndef MACHLOCK_DEBUG
888 #  define CmiCommLock() (comm_flag=1)
889 #  define CmiCommUnlock() (comm_flag=0)
890 #else /* Error-checking flag locks */
891 void CmiCommLock(void) {
892   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
893   comm_flag=1;
894 }
895 void CmiCommUnlock(void) {
896   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
897   comm_flag=0;
898 }
899 #endif
900
901 static struct CmiStateStruct Cmi_state;
902 int _Cmi_mype;
903 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
904 #define CmiGetState() (&Cmi_state)
905 #define CmiGetStateN(n) (&Cmi_state)
906
907 void CmiYield(void) { sleep(0); }
908
909 static void CommunicationInterrupt(int ignored)
910 {
911   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
912   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
913   { /* Already busy inside malloc, comm, or immediate messages */
914     MACHSTATE(5,"--SKIPPING SIGIO--");
915     return;
916   }
917   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
918   {
919     /*Make sure any malloc's we do in here are NOT migratable:*/
920     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
921 /*    _Cmi_myrank=1; */
922     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
923 /*    _Cmi_myrank=0; */
924     CmiIsomallocBlockListActivate(oldList);
925   }
926   MACHSTATE(2,"--END SIGIO--")
927 }
928
929 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
930
931 static void CmiStartThreads(char **argv)
932 {
933   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
934   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
935   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
936     KillEveryone
937       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
938   
939   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
940   _Cmi_mype = Cmi_nodestart;
941
942   /* Prepare Cpv's for immediate messages: */
943   _Cmi_myrank=1;
944   CommunicationServerInit();
945   _Cmi_myrank=0;
946   
947 #if !CMK_ASYNC_NOT_NEEDED
948   if (Cmi_asyncio)
949   {
950     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
951     if (!Cmi_netpoll) {
952       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
953       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
954     }
955 #if CMK_USE_GM || CMK_USE_MX
956       /* charmrun is serviced in interrupt for gm */
957     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
958 #endif
959   }
960 #endif
961 }
962
963 static void CmiDestoryLocks()
964 {
965   comm_flag = 0;
966   memflag = 0;
967 }
968
969 #endif
970
971 /*Network progress utility variables. Period controls the rate at
972   which the network poll is called */
973
974 CpvDeclare(unsigned , networkProgressCount);
975 int networkProgressPeriod;
976
977 CpvDeclare(void *, CmiLocalQueue);
978
979
980 #ifndef CmiMyPe
981 int CmiMyPe(void) 
982
983   return CmiGetState()->pe; 
984 }
985 #endif
986 #ifndef CmiMyRank
987 int CmiMyRank(void)
988 {
989   return CmiGetState()->rank;
990 }
991 #endif
992
993 CpvExtern(int,_charmEpoch);
994
995 /*Add a message to this processor's receive queue 
996   Must be called while holding comm. lock
997 */
998
999 extern double evacTime;
1000
1001 void CmiPushPE(int pe,void *msg)
1002 {
1003   CmiState cs=CmiGetStateN(pe);
1004         /*
1005                 FAULT_EVAC
1006         
1007         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
1008                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
1009         }*/
1010   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
1011   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
1012
1013 #if CMK_IMMEDIATE_MSG
1014   if (CmiIsImmediate(msg)) {
1015     CmiPushImmediateMsg(msg);
1016     return;
1017   }
1018 #endif
1019 #if !CMK_SMP_MULTIQ
1020   PCQueuePush(cs->recv,msg);
1021 #else
1022   PCQueuePush(cs->recv[CmiGetState()->myGrpIdx], msg);
1023 #endif
1024
1025 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1026   if (_Cmi_noprocforcommthread) 
1027 #endif
1028   CmiIdleLock_addMessage(&cs->idle);
1029 }
1030
1031 #if CMK_NODE_QUEUE_AVAILABLE
1032 /*Add a message to the node queue.  
1033   Must be called while holding comm. lock
1034 */
1035 static void CmiPushNode(void *msg)
1036 {
1037   CmiState cs=CmiGetStateN(0);
1038   
1039   MACHSTATE(2,"Pushing message into node queue");
1040   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
1041   
1042 #if CMK_IMMEDIATE_MSG
1043   if (CmiIsImmediate(msg)) {
1044     MACHSTATE(2,"Pushing Immediate message into queue");
1045     CmiPushImmediateMsg(msg);
1046     return;
1047   }
1048 #endif 
1049 /* 
1050  * if CMK_SMP_MULTIQ is enabled, then PCQUEUE's push lock
1051  * may be disabled. In this case, the lock for node recv
1052  * queue has to be used.
1053  * */
1054 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1055   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1056 #endif
1057   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
1058 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1059   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1060 #endif
1061
1062   /*Silly: always try to wake up processor 0, so at least *somebody*
1063     will be awake to handle the message*/
1064 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1065   if (_Cmi_noprocforcommthread) 
1066 #endif
1067   CmiIdleLock_addMessage(&cs->idle);
1068 }
1069 #endif
1070
1071 /***************************************************************
1072  Communication with charmrun:
1073  We can send (ctrl_sendone) and receive (ctrl_getone)
1074  messages on a TCP socket connected to charmrun.
1075  This is used for printfs, CCS, etc; and also for
1076  killing ourselves if charmrun dies.
1077 */
1078
1079 /*This flag prevents simultanious outgoing
1080 messages on the charmrun socket.  It is protected
1081 by the commlock.*/
1082 static int Cmi_charmrun_fd_sendflag=0;
1083
1084 /* ctrl_sendone */
1085 static int sendone_abort_fn(int code,const char *msg) {
1086         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
1087         machine_exit(1);
1088         return -1;
1089 }
1090
1091 static void ctrl_sendone_nolock(const char *type,
1092                                 const char *data1,int dataLen1,
1093                                 const char *data2,int dataLen2)
1094 {
1095   const void *bufs[3]; int lens[3]; int nBuffers=0;
1096   ChMessageHeader hdr;
1097   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
1098   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
1099   if (Cmi_charmrun_fd==-1) 
1100         charmrun_abort("ctrl_sendone called in standalone!\n");
1101   Cmi_charmrun_fd_sendflag=1;
1102   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
1103   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
1104   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
1105   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
1106   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
1107   Cmi_charmrun_fd_sendflag=0;
1108   skt_set_abort(oldAbort);
1109   MACHSTATE(2,"} ctrl_sendone_nolock");
1110 }
1111
1112 static void ctrl_sendone_locking(const char *type,
1113                                 const char *data1,int dataLen1,
1114                                 const char *data2,int dataLen2)
1115 {
1116   CmiCommLock();
1117   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
1118   CmiCommUnlock();
1119 }
1120
1121 #ifndef MEMORYUSAGE_OUTPUT
1122 #define MEMORYUSAGE_OUTPUT 0 
1123 #endif
1124 #if MEMORYUSAGE_OUTPUT 
1125 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
1126 static int memoryusage_counter;
1127 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
1128 #define memoryusage_output {\
1129   memoryusage_counter++;\
1130   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1131 #endif
1132
1133 static double Cmi_check_last;
1134
1135 /* if charmrun dies, we finish */
1136 static void pingCharmrun(void *ignored) 
1137 {
1138 #if MEMORYUSAGE_OUTPUT
1139   memoryusage_output;
1140   if(memoryusage_isOutput){
1141     memoryusage_counter = 0;
1142 #else
1143   {
1144 #endif 
1145
1146   double clock=GetClock();
1147   if (clock > Cmi_check_last + Cmi_check_delay) {
1148     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1149     Cmi_check_last = clock; 
1150 #if CMK_USE_GM || CMK_USE_MX
1151     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1152 #endif
1153     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1154     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1155     CmiCommLock();
1156     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1157     CmiCommUnlock();
1158   }
1159 #if 1
1160 #if CMK_USE_GM || CMK_USE_MX
1161   if (!Cmi_netpoll)
1162 #endif
1163   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1164 #endif
1165   }
1166 }
1167
1168 /* periodic charm ping, for gm and netpoll */
1169 static void pingCharmrunPeriodic(void *ignored)
1170 {
1171   pingCharmrun(ignored);
1172   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1173 }
1174
1175 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1176 static void charmrun_abort(const char *s)
1177 {
1178   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1179         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1180 CmiPrintStackTrace(0);
1181         abort();
1182   } else {
1183         char msgBuf[80];
1184         skt_set_abort(ignore_further_errors);
1185         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1186         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1187   }
1188 }
1189
1190 /* ctrl_getone */
1191
1192 #ifdef __FAULT__
1193 #include "machine-recover.c"
1194 #endif
1195
1196 static void node_addresses_store(ChMessage *msg);
1197
1198 static int barrierReceived = 0;
1199
1200 static void ctrl_getone(void)
1201 {
1202   ChMessage msg;
1203   MACHSTATE(2,"ctrl_getone")
1204   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
1205   ChMessage_recv(Cmi_charmrun_fd,&msg);
1206   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1207
1208   if (strcmp(msg.header.type,"die")==0) {
1209     MACHSTATE(2,"ctrl_getone bye bye")
1210     fprintf(stderr,"aborting: %s\n",msg.data);
1211     log_done();
1212     ConverseCommonExit();
1213     machine_exit(0);
1214   } 
1215 #if CMK_CCS_AVAILABLE
1216   else if (strcmp(msg.header.type, "req_fw")==0) {
1217     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1218         /*Sadly, I *can't* do a:
1219       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1220         here, because I can't send converse messages in the
1221         communication thread.  I *can* poke this message into 
1222         any convenient processor's queue, though:  (OSL, 9/14/2000)
1223         */
1224     int pe=0;/*<- node-local processor number. Any one will do.*/
1225     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1226     MACHSTATE(2,"Incoming CCS request");
1227     CmiPushPE(pe,cmsg);
1228   }
1229 #endif
1230 #ifdef __FAULT__        
1231   else if(strcmp(msg.header.type,"crashnode")==0) {
1232         crash_node_handle(&msg);
1233   }
1234   else if(strcmp(msg.header.type,"initnodetab")==0) {
1235         /** A processor crashed and got recreated. So charmrun sent 
1236           across the whole nodetable data to update this processor*/
1237         node_addresses_store(&msg);
1238         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1239   }
1240 #endif
1241   else if(strcmp(msg.header.type,"barrier")==0) {
1242         barrierReceived = 1;
1243   }
1244   else if(strcmp(msg.header.type,"barrier0")==0) {
1245         barrierReceived = 2;
1246   }
1247   else {
1248   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1249    * which is not available to the communication thread on an SMP version.
1250    */
1251     /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1252     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1253     machine_exit(1);
1254   }
1255   
1256   MACHSTATE(2,"ctrl_getone done")
1257   ChMessage_free(&msg);
1258 }
1259
1260 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1261 /*Deliver this reply data to this reply socket.
1262   The data is forwarded to CCS server via charmrun.*/
1263 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1264 {
1265   MACHSTATE(2,"Outgoing CCS reply");
1266   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1267       repData,repLen);
1268   MACHSTATE(1,"Outgoing CCS reply away");
1269 }
1270 #endif
1271
1272 /*****************************************************************************
1273  *
1274  * CmiPrintf, CmiError, CmiScanf
1275  *
1276  *****************************************************************************/
1277 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1278 static void InternalPrintf(const char *f, va_list l)
1279 {
1280   ChMessage replymsg;
1281   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1282   CmiStdoutFlush();
1283   vsprintf(buffer, f, l);
1284   if(Cmi_syncprint) {
1285           CmiCommLock();
1286           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1287           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1288           ChMessage_free(&replymsg);
1289           CmiCommUnlock();
1290   } else {
1291           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1292   }
1293   InternalWriteToTerminal(0,buffer,strlen(buffer));
1294   CmiTmpFree(buffer);
1295 }
1296
1297 static void InternalError(const char *f, va_list l)
1298 {
1299   ChMessage replymsg;
1300   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1301   CmiStdoutFlush();
1302   vsprintf(buffer, f, l);
1303   if(Cmi_syncprint) {
1304           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1305           CmiCommLock();
1306           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1307           ChMessage_free(&replymsg);
1308           CmiCommUnlock();
1309   } else {
1310           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1311   }
1312   InternalWriteToTerminal(1,buffer,strlen(buffer));
1313   CmiTmpFree(buffer);
1314 }
1315
1316 static int InternalScanf(char *fmt, va_list l)
1317 {
1318   ChMessage replymsg;
1319   char *ptr[20];
1320   char *p; int nargs, i;
1321   nargs=0;
1322   p=fmt;
1323   while (*p) {
1324     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1325     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1326     if (p[0]=='%') { nargs++; p++; continue; }
1327     if (*p=='\n') *p=' '; p++;
1328   }
1329   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1330   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1331   CmiLock(Cmi_scanf_mutex);
1332   if (Cmi_charmrun_fd!=-1)
1333   {/*Send charmrun the format string*/
1334         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1335         /*Wait for the reply (characters to scan) from charmrun*/
1336         CmiCommLock();
1337         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1338         i = sscanf((char*)replymsg.data, fmt,
1339                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1340                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1341                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1342         ChMessage_free(&replymsg);
1343         CmiCommUnlock();
1344   } else
1345   {/*Just do the scanf normally*/
1346         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1347                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1348                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1349   }
1350   CmiUnlock(Cmi_scanf_mutex);
1351   return i;
1352 }
1353
1354 #if CMK_CMIPRINTF_IS_A_BUILTIN
1355
1356 /*New stdarg.h declarations*/
1357 void CmiPrintf(const char *fmt, ...)
1358 {
1359   CpdSystemEnter();
1360   {
1361   va_list p; va_start(p, fmt);
1362   if (Cmi_charmrun_fd!=-1)
1363     InternalPrintf(fmt, p);
1364   else
1365     vfprintf(stdout,fmt,p);
1366   va_end(p);
1367   }
1368   CpdSystemExit();
1369 }
1370
1371 void CmiError(const char *fmt, ...)
1372 {
1373   CpdSystemEnter();
1374   {
1375   va_list p; va_start (p, fmt);
1376   if (Cmi_charmrun_fd!=-1)
1377     InternalError(fmt, p);
1378   else
1379     vfprintf(stderr,fmt,p);
1380   va_end(p);
1381   }
1382   CpdSystemExit();
1383 }
1384
1385 int CmiScanf(const char *fmt, ...)
1386 {
1387   int i;
1388   CpdSystemEnter();
1389   {
1390   va_list p; va_start(p, fmt);
1391   i = InternalScanf((char *)fmt, p);
1392   va_end(p);
1393   }
1394   CpdSystemExit();
1395   return i;
1396 }
1397
1398 #endif
1399
1400 /***************************************************************************
1401  * Output redirection:
1402  *  When people don't use CkPrintf, like above, we'd still like to be able
1403  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1404  * which lets us read the characters sent to stdout at our lesiure.
1405  ***************************************************************************/
1406
1407 /*Can read from stdout or stderr using these fd's*/
1408 static int readStdout[2]; 
1409 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1410 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1411 #define readStdoutBufLen (16*1024)
1412 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1413 static int servicingStdout;
1414
1415 /*Initialization-- should only be called once per node*/
1416 static void CmiStdoutInit(void) {
1417         int i;
1418         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1419
1420 /*There's some way to do this same thing in windows, but I don't know how*/
1421 #if !defined(_WIN32) || defined(__CYGWIN__)
1422         /*Prevent buffering in stdio library:*/
1423         setbuf(stdout,NULL); setbuf(stderr,NULL);
1424
1425         /*Reopen stdout and stderr fd's as new pipes:*/
1426         for (i=0;i<2;i++) {
1427                 int pair[2];
1428                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1429                 
1430                 /*First, save a copy of the original stdout*/
1431                 writeStdout[i]=dup(srcFd);
1432 #if 0
1433                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1434                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1435 #else
1436                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1437                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1438                         {perror("building stdio redirection socketpair"); exit(1);}
1439 #endif
1440                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1441                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1442                 
1443 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1444                 CmiEnableNonblockingIO(srcFd);
1445 #endif
1446 #if CMK_SHARED_VARS_UNAVAILABLE
1447                 if (Cmi_asyncio)
1448                 {
1449   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1450                         CmiEnableAsyncIO(readStdout[i]);
1451                 }
1452 #endif
1453         }
1454 #else
1455 /*Windows system-- just fake reads for now*/
1456 # ifndef read
1457 #  define read(x,y,z) 0
1458 # endif
1459 # ifndef write
1460 #  define write(x,y,z) 
1461 # endif
1462 #endif
1463 }
1464
1465 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1466 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1467 {
1468         write(writeStdout[isStdErr],str,len);   
1469 }
1470
1471 /*
1472   Service this particular stdout pipe.  
1473   Must hold comm. lock.
1474 */
1475 static void CmiStdoutServiceOne(int i) {
1476         int nBytes;
1477         const static char *cmdName[2]={"print","printerr"};
1478         servicingStdout=1;
1479         while(1) {
1480                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1481                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1482                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1483                 if (nBytes<=0) break; /*Nothing to send*/
1484                 
1485                 /*Send these bytes off to charmrun*/
1486                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1487                 nBytes++; /*Include zero-terminator in message to charmrun*/
1488                 
1489                 if (nBytes>=readStdoutBufLen-100) 
1490                 { /*We must have filled up our output pipe-- most output libraries
1491                    don't handle this well (e.g., glibc printf just drops the line).*/
1492                         
1493                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1494                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1495                         nBytes--; /*Remove terminator from user's data*/
1496                         tooMuchLen=strlen(tooMuchWarn)+1;
1497                 }
1498                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1499                                     tooMuchWarn,tooMuchLen);
1500                 
1501                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1502         }
1503         servicingStdout=0;
1504         serviceStdout[i]=0; /*This pipe is now serviced*/
1505 }
1506
1507 /*Service all stdout pipes, whether it looks like they need it
1508   or not.  Used when you aren't sure if select() has been called recently.
1509   Must hold comm. lock.
1510 */
1511 static void CmiStdoutServiceAll(void) {
1512         int i;
1513         for (i=0;i<2;i++) {
1514                 if (readStdout[i]==0) continue; /*Pipe not open*/
1515                 CmiStdoutServiceOne(i);
1516         }
1517 }
1518
1519 /*Service any outstanding stdout pipes.
1520   Must hold comm. lock.
1521 */
1522 static void CmiStdoutService(void) {
1523         CmiStdoutServiceAll();
1524 }
1525
1526 /*Add our pipes to the pile for select() or poll().
1527   Both can be called with or without the comm. lock.
1528 */
1529 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1530         int i;
1531         for (i=0;i<2;i++) {
1532                 if (readStdout[i]==0) continue; /*Pipe not open*/
1533                 CMK_PIPE_ADDREAD(readStdout[i]);
1534         }
1535 }
1536 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1537         int i;
1538         for (i=0;i<2;i++) {
1539                 if (readStdout[i]==0) continue; /*Pipe not open*/
1540                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1541         }
1542 }
1543 static int CmiStdoutNeedsService(void) {
1544         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1545 }
1546
1547 /*Called every few milliseconds to flush the stdout pipes*/
1548 static void CmiStdoutFlush(void) {
1549         if (servicingStdout) return; /* might be called by SIGALRM */
1550         CmiCommLockOrElse( return; )
1551         CmiCommLock();
1552         CmiStdoutServiceAll();
1553         CmiCommUnlock();
1554 }
1555
1556 /***************************************************************************
1557  * Message Delivery:
1558  *
1559  ***************************************************************************/
1560
1561 #include "machine-dgram.c"
1562
1563
1564 #ifndef CmiNodeFirst
1565 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1566 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1567 #endif
1568
1569 #ifndef CmiNodeOf
1570 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1571 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1572 #endif
1573
1574
1575 /*****************************************************************************
1576  *
1577  * node_addresses
1578  *
1579  *  These two functions fill the node-table.
1580  *
1581  *
1582  *   This node, like all others, first sends its own address to charmrun
1583  *   using this command:
1584  *
1585  *     Type: nodeinfo
1586  *     Data: Big-endian 4-byte ints
1587  *           <my-node #><Dataport>
1588  *
1589  *   When charmrun has all the addresses, he sends this table to me:
1590  *
1591  *     Type: nodes
1592  *     Data: Big-endian 4-byte ints
1593  *           <number of nodes n>
1594  *           <#PEs><IP><Dataport> Node 0
1595  *           <#PEs><IP><Dataport> Node 1
1596  *           ...
1597  *           <#PEs><IP><Dataport> Node n-1
1598  *
1599  *****************************************************************************/
1600
1601 #if CMK_USE_IBVERBS
1602 void copyInfiAddr(ChInfiAddr *qpList);
1603 #endif
1604
1605 #if CMK_IBVERBS_FAST_START
1606 static void send_partial_init()
1607 {
1608   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1609         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1610 }       
1611 #endif
1612
1613
1614 /*Note: node_addresses_obtain is called before starting
1615   threads, so no locks are needed (or valid!)*/
1616 static void node_addresses_obtain(char **argv)
1617 {
1618   ChMessage nodetabmsg; /* info about all nodes*/
1619   MACHSTATE(3,"node_addresses_obtain { ");
1620   if (Cmi_charmrun_fd==-1) 
1621   {/*Standalone-- fake a single-node nodetab message*/
1622         int npes=1;
1623         ChSingleNodeinfo *fakeTab;
1624         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1625         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1626         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1627 #if CMK_SHARED_VARS_UNAVAILABLE
1628         if (npes!=1) {
1629                 fprintf(stderr,
1630                         "To use multiple processors, you must run this program as:\n"
1631                         " > charmrun +p%d %s <args>\n"
1632                         "or build the %s-smp version of Charm++.\n",
1633                         npes,argv[0],CMK_MACHINE_NAME);
1634                 exit(1);
1635         }
1636 #else
1637         /* standalone smp version reads ppn */
1638         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1639                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1640           npes = _Cmi_mynodesize;
1641 #endif
1642         /*This is a stupid hack: we expect the *number* of nodes
1643         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1644         (which happens to have exactly that layout!) and stuff
1645         a 1 into the "node number" slot
1646         */
1647         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1648         fakeTab->info.nPE=ChMessageInt_new(npes);
1649         fakeTab->info.dataport=ChMessageInt_new(0);
1650         fakeTab->info.IP=_skt_invalid_ip;
1651   }
1652   else 
1653   { /*Contact charmrun for machine info.*/
1654         ChSingleNodeinfo me;
1655
1656         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1657
1658 #if CMK_USE_IBVERBS
1659         {
1660                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1661                 me.info.qpList = malloc(qpListSize);
1662                 copyInfiAddr(me.info.qpList);
1663                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1664                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1665                 free(me.info.qpList);
1666         }
1667 #else
1668         /*The nPE and IP fields are set by charmrun--
1669           these values don't matter. */
1670         me.info.nPE=ChMessageInt_new(0);
1671         me.info.IP=_skt_invalid_ip;
1672         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1673 #ifdef CMK_USE_MX
1674         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1675 #endif
1676 #if CMK_USE_IBUD
1677         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1678         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1679         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1680         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1681 #endif
1682         me.info.dataport=ChMessageInt_new(dataport);
1683
1684         /*Send our node info. to charmrun.
1685         CommLock hasn't been initialized yet-- 
1686         use non-locking version*/
1687         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1688         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1689 #endif  //CMK_USE_IBVERBS
1690
1691         MACHSTATE(3,"initnode sent");
1692   
1693         /*We get the other node addresses from a message sent
1694           back via the charmrun control port.*/
1695         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1696                 CmiAbort("Timeout waiting for nodetab!\n");
1697         }
1698         MACHSTATE(2,"recv initnode {");
1699         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1700         MACHSTATE(2,"} recv initnode");
1701   }
1702 //#if CMK_USE_IBVERBS   
1703 //#else
1704   node_addresses_store(&nodetabmsg);
1705   ChMessage_free(&nodetabmsg);
1706 //#endif        
1707   MACHSTATE(3,"} node_addresses_obtain ");
1708 }
1709
1710 #if CMK_NODE_QUEUE_AVAILABLE
1711
1712 /***********************************************************************
1713  * DeliverOutgoingNodeMessage()
1714  *
1715  * This function takes care of delivery of outgoing node messages from the
1716  * sender end. Broadcast messages are divided into sets of messages that 
1717  * are bound to the local node, and to remote nodes. For local
1718  * transmission, the messages are directly pushed into the recv
1719  * queues. For non-local transmission, the function DeliverViaNetwork()
1720  * is called
1721  ***********************************************************************/
1722 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1723 {
1724   int i, rank, dst; OtherNode node;
1725
1726   dst = ogm->dst;
1727   switch (dst) {
1728   case NODE_BROADCAST_ALL:
1729     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1730     /*case-fallthrough (no break)-- deliver to all other processors*/
1731   case NODE_BROADCAST_OTHERS:
1732 #if CMK_BROADCAST_SPANNING_TREE
1733     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1734 #elif CMK_BROADCAST_HYPERCUBE
1735     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1736 #else
1737     for (i=0; i<_Cmi_numnodes; i++)
1738       if (i!=_Cmi_mynode)
1739         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
1740 #endif
1741     GarbageCollectMsg(ogm);
1742     break;
1743   default:
1744     node = nodes+dst;
1745     rank=DGRAM_NODEMESSAGE;
1746     if (dst != _Cmi_mynode) {
1747       DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1748       GarbageCollectMsg(ogm);
1749     } else {
1750       if (ogm->freemode == 'A') {
1751         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1752         ogm->freemode = 'X';
1753       } else {
1754         CmiPushNode(ogm->data);
1755         FreeOutgoingMsg(ogm);
1756       }
1757     }
1758   }
1759 }
1760
1761 #else
1762
1763 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1764
1765 #endif
1766
1767 #if CMK_C_INLINE
1768 inline static
1769 #endif
1770 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
1771 #if CMK_USE_SYSVSHM
1772         {
1773 #if SYSVSHM_STATS
1774         double _startValidTime = CmiWallTimer();
1775 #endif
1776         int ret=CmiValidSysvshm(ogm,node);
1777 #if SYSVSHM_STATS
1778         sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1779 #endif                  
1780         MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
1781         if(ret){
1782                 CmiSendMessageSysvshm(ogm,node,rank,broot);
1783         }else{
1784                 DeliverViaNetwork(ogm, node, rank, broot,copy);
1785         }
1786         } 
1787 #elif CMK_USE_PXSHM
1788      {
1789 #if PXSHM_STATS
1790         double _startValidTime = CmiWallTimer();
1791 #endif
1792       int ret=CmiValidPxshm(ogm,node);
1793 #if PXSHM_STATS
1794         pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1795 #endif                  
1796       MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
1797       if(ret){
1798          CmiSendMessagePxshm(ogm,node,rank,broot);
1799        }else{
1800          DeliverViaNetwork(ogm, node, rank, broot,copy);
1801        }
1802       } 
1803 #else
1804       DeliverViaNetwork(ogm, node, rank, broot, copy);
1805 #endif                  
1806         
1807 }
1808
1809
1810
1811 /***********************************************************************
1812  * DeliverOutgoingMessage()
1813  *
1814  * This function takes care of delivery of outgoing messages from the
1815  * sender end. Broadcast messages are divided into sets of messages that 
1816  * are bound to the local node, and to remote nodes. For local
1817  * transmission, the messages are directly pushed into the recv
1818  * queues. For non-local transmission, the function DeliverViaNetwork()
1819  * is called
1820  ***********************************************************************/
1821 int DeliverOutgoingMessage(OutgoingMsg ogm)
1822 {
1823   int i, rank, dst; OtherNode node;
1824         
1825   int network = 1;
1826
1827         
1828   dst = ogm->dst;
1829
1830   switch (dst) {
1831   case PE_BROADCAST_ALL:
1832 #if !CMK_SMP_NOT_RELAX_LOCK       
1833     CmiCommLock();
1834 #endif
1835     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1836       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1837     }
1838 #if CMK_BROADCAST_SPANNING_TREE
1839     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1840 #elif CMK_BROADCAST_HYPERCUBE
1841     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1842 #else
1843     for (i=0; i<_Cmi_numnodes; i++)
1844       if (i!=_Cmi_mynode){
1845         /*FAULT_EVAC : is the target processor valid*/
1846         if(CmiNodeAlive(i)){
1847           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1848         }
1849       } 
1850 #endif
1851     GarbageCollectMsg(ogm);
1852 #if !CMK_SMP_NOT_RELAX_LOCK               
1853     CmiCommUnlock();
1854 #endif    
1855     break;
1856   case PE_BROADCAST_OTHERS:
1857 #if !CMK_SMP_NOT_RELAX_LOCK               
1858     CmiCommLock();
1859 #endif  
1860     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1861       if (rank + Cmi_nodestart != ogm->src) {
1862         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1863       }
1864 #if CMK_BROADCAST_SPANNING_TREE
1865     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1866 #elif CMK_BROADCAST_HYPERCUBE
1867     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1868 #else
1869     for (i = 0; i<_Cmi_numnodes; i++)
1870       if (i!=_Cmi_mynode){
1871         /*FAULT_EVAC : is the target processor valid*/
1872         if(CmiNodeAlive(i)){
1873           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1874         }
1875       } 
1876 #endif
1877     GarbageCollectMsg(ogm);
1878 #if !CMK_SMP_NOT_RELAX_LOCK               
1879     CmiCommUnlock();
1880 #endif    
1881     break;
1882   default:
1883 #ifndef CMK_OPTIMIZE
1884     if (dst<0 || dst>=CmiNumPes())
1885       CmiAbort("Send to out-of-bounds processor!");
1886 #endif
1887     node = nodes_by_pe[dst];
1888     rank = dst - node->nodestart;
1889     if (node->nodestart != Cmi_nodestart) {
1890 #if !CMK_SMP_NOT_RELAX_LOCK                     
1891         CmiCommLock();
1892 #endif          
1893         DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1894         GarbageCollectMsg(ogm);
1895 #if !CMK_SMP_NOT_RELAX_LOCK                     
1896         CmiCommUnlock();
1897 #endif          
1898     } else {
1899       network = 0;
1900       if (ogm->freemode == 'A') {
1901         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1902         ogm->freemode = 'X';
1903       } else {
1904         CmiPushPE(rank, ogm->data);
1905         FreeOutgoingMsg(ogm);
1906       }
1907     }
1908   }
1909 #if CMK_MULTICORE
1910   network = 0;
1911 #endif
1912   return network;
1913 }
1914
1915
1916 /******************************************************************************
1917  *
1918  * CmiGetNonLocal
1919  *
1920  * The design of this system is that the communication thread does all the
1921  * work, to eliminate as many locking issues as possible.  This is the only
1922  * part of the code that happens in the receiver-thread.
1923  *
1924  * This operation is fairly cheap, it might be worthwhile to inline
1925  * the code into CmiDeliverMsgs to reduce function call overhead.
1926  *
1927  *****************************************************************************/
1928
1929 #if CMK_NODE_QUEUE_AVAILABLE
1930 char *CmiGetNonLocalNodeQ(void)
1931 {
1932   char *result = 0;
1933   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1934     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1935     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1936     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1937   }
1938   return result;
1939 }
1940 #endif
1941
1942 void *CmiGetNonLocal(void)
1943 {
1944 #if CMK_SMP_MULTIQ
1945   int i;
1946 #endif
1947
1948   CmiState cs = CmiGetState();
1949   CmiIdleLock_checkMessage(&cs->idle);
1950
1951 #if !CMK_SMP_MULTIQ
1952   return (void *) PCQueuePop(cs->recv);
1953 #else
1954   void *retVal = NULL;
1955   for(i=cs->curPolledIdx; i<MULTIQ_GRPSIZE; i++){
1956     retVal = (void *)PCQueuePop(cs->recv[i]);
1957     if(retVal!=NULL) {
1958         cs->curPolledIdx = i+1;
1959         return retVal;
1960     }
1961   }
1962   cs->curPolledIdx=0;
1963   return NULL;
1964 #endif
1965 }
1966
1967
1968 /**
1969  * Set up an OutgoingMsg structure for this message.
1970  */
1971 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1972   OutgoingMsg ogm;
1973   MallocOutgoingMsg(ogm);
1974   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1975   ogm->size = size;
1976   ogm->data = data;
1977   ogm->src = cs->pe;
1978   ogm->dst = pe;
1979   ogm->freemode = freemode;
1980   ogm->refcount = 0;
1981   return (CmiCommHandle)ogm;    
1982 }
1983
1984 #if CMK_NODE_QUEUE_AVAILABLE
1985
1986 /******************************************************************************
1987  *
1988  * CmiGeneralNodeSend
1989  *
1990  * Description: This is a generic message sending routine. All the
1991  * converse message send functions are implemented in terms of this
1992  * function. (By setting appropriate flags (eg freemode) that tell
1993  * CmiGeneralSend() how exactly to handle the particular case of
1994  * message send)
1995  *
1996  *****************************************************************************/
1997
1998 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
1999 {
2000   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2001   MACHSTATE(1,"CmiGeneralNodeSend {");
2002
2003   if (freemode == 'S') {
2004     char *copy = (char *)CmiAlloc(size);
2005     if (!copy)
2006       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2007     memcpy(copy, data, size);
2008     data = copy; freemode = 'F';
2009   }
2010
2011 #if CMK_IMMEDIATE_MSG
2012     /* execute the immediate message right away */
2013   if (node == CmiMyNode() && CmiIsImmediate(data)) {
2014     CmiPushImmediateMsg(data);
2015       /* only communication thread executes immediate messages in SMP */
2016     if (!_immRunning) CmiHandleImmediate();
2017     return;
2018   }
2019 #endif
2020
2021   CmiMsgHeaderSetLength(data, size);
2022   ogm=PrepareOutgoing(cs,node,size,freemode,data);
2023   CmiCommLock();
2024   DeliverOutgoingNodeMessage(ogm);
2025   CmiCommUnlock();
2026   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2027   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2028   MACHSTATE(1,"} CmiGeneralNodeSend");
2029   return (CmiCommHandle)ogm;
2030 }
2031
2032 #endif
2033
2034
2035 /******************************************************************************
2036  *
2037  * CmiGeneralSend
2038  *
2039  * Description: This is a generic message sending routine. All the
2040  * converse message send functions are implemented in terms of this
2041  * function. (By setting appropriate flags (eg freemode) that tell
2042  * CmiGeneralSend() how exactly to handle the particular case of
2043  * message send)
2044  *
2045  *****************************************************************************/
2046
2047 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2048 {
2049   int sendonnetwork;
2050   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2051   MACHSTATE(1,"CmiGeneralSend {");
2052
2053   if (freemode == 'S') {
2054 #if CMK_USE_GM
2055     if (pe != cs->pe) {
2056       freemode = 'G';
2057     }
2058     else
2059 #endif
2060     {
2061     char *copy = (char *)CmiAlloc(size);
2062     if (!copy)
2063       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2064     memcpy(copy, data, size);
2065     data = copy; freemode = 'F';
2066     }
2067   }
2068   
2069   if (pe == cs->pe) {
2070 #if ! CMK_SMP
2071     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
2072                       The SMP comm thread never gets here, because of the pe test. */
2073 #endif
2074     {
2075 #if CMK_IMMEDIATE_MSG
2076       /* execute the immediate message right away */
2077       /* but to avoid infinite recursive call, don't do this if _immRunning */
2078     if (CmiIsImmediate(data)) {
2079       CmiPushImmediateMsg(data);
2080       CmiHandleImmediate();
2081       return 0;
2082     }
2083 #endif
2084     CdsFifo_Enqueue(cs->localqueue, data);
2085     if (freemode == 'A') {
2086       MallocOutgoingMsg(ogm);
2087       ogm->freemode = 'X';
2088       return ogm;
2089     } else return 0;
2090     }
2091   }
2092
2093 #if CMK_PERSISTENT_COMM
2094   if (phs) {
2095       CmiAssert(phsSize == 1);
2096       CmiSendPersistentMsg(*phs, pe, size, data);
2097       return NULL;
2098   }
2099 #endif
2100
2101   CmiMsgHeaderSetLength(data, size);
2102   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
2103
2104   #if CMK_SMP_NOT_RELAX_LOCK  
2105   CmiCommLock();
2106 #endif  
2107   
2108   sendonnetwork = DeliverOutgoingMessage(ogm);
2109   
2110 #if CMK_SMP_NOT_RELAX_LOCK  
2111   CmiCommUnlock();
2112 #endif  
2113   
2114   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2115 #if CMK_USE_SYSVSHM
2116         CommunicationServerSysvshm();
2117 #elif CMK_USE_PXSHM
2118         CommunicationServerPxshm();
2119 #endif
2120 #if !CMK_SHARED_VARS_UNAVAILABLE
2121 #if !CMK_SMP_NOT_SKIP_COMMSERVER
2122   if (sendonnetwork!=0)   /* only call server when we send msg on network in SMP */
2123 #endif
2124 #endif
2125   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2126   MACHSTATE(1,"}  CmiGeneralSend");
2127   return (CmiCommHandle)ogm;
2128 }
2129
2130
2131 void CmiSyncSendFn(int p, int s, char *m)
2132
2133   CQdCreate(CpvAccess(cQdState), 1);
2134   CmiGeneralSend(p,s,'S',m); 
2135 }
2136
2137 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2138
2139   CQdCreate(CpvAccess(cQdState), 1);
2140   return CmiGeneralSend(p,s,'A',m); 
2141 }
2142
2143 void CmiFreeSendFn(int p, int s, char *m)
2144
2145   CQdCreate(CpvAccess(cQdState), 1);
2146   CmiGeneralSend(p,s,'F',m); 
2147 }
2148
2149 void CmiSyncBroadcastFn(int s, char *m)
2150
2151   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2152   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
2153 }
2154
2155 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2156
2157   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2158   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
2159 }
2160
2161 void CmiFreeBroadcastFn(int s, char *m)
2162
2163   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
2164   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
2165 }
2166
2167 void CmiSyncBroadcastAllFn(int s, char *m)
2168
2169   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2170   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
2171 }
2172
2173 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2174
2175   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2176   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
2177 }
2178
2179 void CmiFreeBroadcastAllFn(int s, char *m)
2180
2181   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2182   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
2183 }
2184
2185 #if CMK_NODE_QUEUE_AVAILABLE
2186
2187 void CmiSyncNodeSendFn(int p, int s, char *m)
2188
2189   CQdCreate(CpvAccess(cQdState), 1);
2190   CmiGeneralNodeSend(p,s,'S',m); 
2191 }
2192
2193 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
2194
2195   CQdCreate(CpvAccess(cQdState), 1);
2196   return CmiGeneralNodeSend(p,s,'A',m); 
2197 }
2198
2199 void CmiFreeNodeSendFn(int p, int s, char *m)
2200
2201   CQdCreate(CpvAccess(cQdState), 1);
2202   CmiGeneralNodeSend(p,s,'F',m); 
2203 }
2204
2205 void CmiSyncNodeBroadcastFn(int s, char *m)
2206
2207   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2208   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
2209 }
2210
2211 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
2212
2213   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2214   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
2215 }
2216
2217 void CmiFreeNodeBroadcastFn(int s, char *m)
2218
2219   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2220   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
2221 }
2222
2223 void CmiSyncNodeBroadcastAllFn(int s, char *m)
2224
2225   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2226   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
2227 }
2228
2229 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
2230
2231   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2232   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
2233 }
2234
2235 void CmiFreeNodeBroadcastAllFn(int s, char *m)
2236
2237   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2238   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
2239 }
2240 #endif
2241
2242 /******************************************************************************
2243  *
2244  * Comm Handle manipulation.
2245  *
2246  *****************************************************************************/
2247
2248 int CmiAsyncMsgSent(CmiCommHandle handle)
2249 {
2250   return (((OutgoingMsg)handle)->freemode == 'X');
2251 }
2252
2253 void CmiReleaseCommHandle(CmiCommHandle handle)
2254 {
2255   FreeOutgoingMsg(((OutgoingMsg)handle));
2256 }
2257
2258 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
2259
2260 /*****************************************************************************
2261  *
2262  * NET version List-Cast and Multicast Code
2263  *
2264  ****************************************************************************/
2265                                                                                 
2266 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2267 {
2268   int i;
2269   for(i=0;i<npes;i++) {
2270     CmiReference(msg);
2271     CmiSyncSendAndFree(pes[i], len, msg);
2272   }
2273 }
2274                                                                                 
2275 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2276 {
2277   CmiError("ListSend not implemented.");
2278   return (CmiCommHandle) 0;
2279 }
2280                                                                                 
2281 /* 
2282   because in all net versions, the message buffer after CmiSyncSendAndFree
2283   returns is not changed, we can use memory reference trick to avoid 
2284   memory copying here
2285 */
2286 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2287 {
2288   int i;
2289   for(i=0;i<npes;i++) {
2290     CmiReference(msg);
2291     CmiSyncSendAndFree(pes[i], len, msg);
2292   }
2293   CmiFree(msg);
2294 }
2295
2296 #endif
2297
2298 #if CMK_BROADCAST_SPANNING_TREE
2299 /*
2300   if root is 1, it is called from the broadcast root, only ogm is needed;
2301   if root is 0, it is called in the tree, ogm must be NULL and msg, size and startpe are needed
2302   note: function leaves msg buffer untouched
2303 */
2304 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
2305 {
2306   CmiState cs = CmiGetState();
2307   int i;
2308
2309   if (root) startpe = _Cmi_mynode;
2310   else ogm = NULL;
2311
2312   CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
2313
2314   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
2315     int p = _Cmi_mynode-startpe;
2316     if (p<0) p+=_Cmi_numnodes;
2317     p = BROADCAST_SPANNING_FACTOR*p + i;
2318     if (p > _Cmi_numnodes - 1) break;
2319     p += startpe;
2320     p = p%_Cmi_numnodes;
2321     CmiAssert(p!=_Cmi_mynode);
2322     /* CmiPrintf("SendSpanningChildren: %d => %d\n", _Cmi_mynode, p); */
2323     if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2324   //  DeliverViaNetwork(ogm, nodes + p, noderank, startpe, 1);
2325     DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
2326   }
2327   if (!root && ogm) GarbageCollectMsg(ogm);
2328 }
2329 #endif
2330
2331 #if CMK_BROADCAST_HYPERCUBE
2332 int log_of_2 (int i) {
2333   int m;
2334   for (m=0; i>(1<<m); ++m);
2335   return m;
2336 }
2337
2338 /* called from root - send msg along the hypercube in broadcast.
2339   note: function leaves msg buffer untouched
2340 */
2341 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
2342 {
2343   CmiState cs = CmiGetState();
2344   int i, k, npes, tmp;
2345   int *dest_pes;
2346
2347   if (root) {
2348     msg = ogm->data;
2349     srcpe = CmiMyNode();
2350   }
2351   else ogm = NULL;
2352
2353   tmp = srcpe ^ CmiMyNode();
2354   k = log_of_2(CmiNumNodes()) + 2;
2355   if (tmp) {
2356      do {--k;} while (!(tmp>>k));
2357   }
2358
2359         MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
2360
2361   dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
2362   k--;
2363   npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
2364   
2365   for (i = 0; i < npes; i++) {
2366     int p = dest_pes[i];
2367     /* CmiPrintf("SendHypercube: %d => %d (%d)\n", cs->pe, p, i); */
2368     if (!root && ! ogm) 
2369       ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2370     DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
2371   }
2372   if (!root && ogm) GarbageCollectMsg(ogm);
2373   CmiTmpFree(dest_pes);
2374 }
2375 #endif
2376
2377 /*
2378 #if CMK_IMMEDIATE_MSG
2379 void CmiProbeImmediateMsg()
2380 {
2381   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2382 }
2383 #endif
2384 */
2385
2386 /* Network progress function is used to poll the network when for
2387    messages. This flushes receive buffers on some implementations*/ 
2388 void CmiMachineProgressImpl()
2389 {
2390 #if CMK_USE_SYSVSHM
2391         CommunicationServerSysvshm();
2392 #elif CMK_USE_PXSHM
2393         CommunicationServerPxshm();
2394 #endif
2395   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2396 }
2397
2398 /******************************************************************************
2399  *
2400  * Main code, Init, and Exit
2401  *
2402  *****************************************************************************/
2403 extern void CthInit(char **argv);
2404 extern void ConverseCommonInit(char **);
2405
2406 static char     **Cmi_argv;
2407 static char     **Cmi_argvcopy;
2408 static CmiStartFn Cmi_startfn;   /* The start function */
2409 static int        Cmi_usrsched;  /* Continue after start function finishes? */
2410
2411 static void ConverseRunPE(int everReturn)
2412 {
2413   CmiIdleState *s=CmiNotifyGetState();
2414   CmiState cs;
2415   char** CmiMyArgv;
2416   CmiNodeAllBarrier();
2417   cs = CmiGetState();
2418   CpvInitialize(void *,CmiLocalQueue);
2419   CpvAccess(CmiLocalQueue) = cs->localqueue;
2420
2421   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
2422   if (CmiMyRank())
2423     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
2424   else
2425     CmiMyArgv = Cmi_argv;
2426   CthInit(CmiMyArgv);
2427
2428 #if CMK_USE_GM
2429   CmiCheckGmStatus();
2430 #endif
2431
2432   ConverseCommonInit(CmiMyArgv);
2433
2434   /* initialize the network progress counter*/
2435   /* Network progress function is used to poll the network when for
2436      messages. This flushes receive buffers on some  implementations*/ 
2437   CpvInitialize(int , networkProgressCount);
2438   CpvAccess(networkProgressCount) = 0;
2439
2440   /* better to show the status here */
2441   if (CmiMyPe() == 0) {
2442     if (Cmi_netpoll == 1) {
2443       CmiPrintf("Charm++: scheduler running in netpoll mode.\n");
2444     }
2445 #if CMK_SHARED_VARS_UNAVAILABLE
2446     else {
2447       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
2448         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
2449     }
2450 #endif
2451   }
2452 #if MEMORYUSAGE_OUTPUT
2453   memoryusage_counter = 0;
2454 #endif
2455 #if CMK_USE_GM || CMK_USE_MX
2456   if (Cmi_charmrun_fd != -1)
2457 #endif
2458   {
2459   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
2460       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
2461   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
2462       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
2463 #if CMK_USE_SYSVSHM
2464          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
2465          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
2466 #elif CMK_USE_PXSHM
2467                 //TODO: add pxshm notify idle
2468          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
2469          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
2470 #endif
2471   }
2472
2473 #if CMK_SHARED_VARS_UNAVAILABLE
2474   if (Cmi_netpoll) /*Repeatedly call CommServer*/
2475     CcdCallOnConditionKeep(CcdPERIODIC, 
2476         (CcdVoidFn) CommunicationPeriodic, NULL);
2477   else /*Only need this for retransmits*/
2478     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
2479         (CcdVoidFn) CommunicationPeriodic, NULL);
2480 #endif
2481
2482   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
2483     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
2484 #if CMK_SHARED_VARS_UNAVAILABLE
2485     if (!Cmi_asyncio) {
2486     /* gm cannot live with setitimer */
2487     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
2488     }
2489     else {
2490     /*Occasionally ping charmrun, to test if it's dead*/
2491     struct itimerval i;
2492     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
2493 #if MEMORYUSAGE_OUTPUT
2494     i.it_interval.tv_sec = 0;
2495     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2496     i.it_value.tv_sec = 0;
2497     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2498 #else
2499     i.it_interval.tv_sec = 1;
2500     i.it_interval.tv_usec = 0;
2501     i.it_value.tv_sec = 1;
2502     i.it_value.tv_usec = 0;
2503 #endif
2504     setitimer(ITIMER_REAL, &i, NULL);
2505     }
2506
2507 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
2508     /*Occasionally check for retransmissions, outgoing acks, etc.*/
2509     /*no need for GM case */
2510     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
2511 #endif
2512 #endif
2513     
2514     /*Initialize the clock*/
2515     Cmi_clock=GetClock();
2516   }
2517
2518 #ifdef IGET_FLOWCONTROL 
2519   /* Call the function once to determine the amount of physical memory available */
2520   getAvailSysMem();
2521   /* Call the function to periodically call the token adapt function */
2522   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
2523   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
2524         (CcdVoidFn) TokenUpdatePeriodic, NULL);
2525 #endif
2526   
2527 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
2528   srand((int)(1024.0*CmiWallTimer()));
2529   if (CmiMyPe()==0)
2530     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
2531         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
2532 #endif
2533
2534 #ifdef __ONESIDED_IMPL
2535 #ifdef __ONESIDED_NO_HARDWARE
2536   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
2537   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
2538   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2539   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2540 #endif
2541 #ifdef __ONESIDED_GM_HARDWARE
2542   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2543   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2544 #endif
2545 #endif
2546
2547   /* communication thread */
2548   if (CmiMyRank() == CmiMyNodeSize()) {
2549     if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2550     if (Cmi_charmrun_fd!=-1)
2551           while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
2552   }
2553   else{
2554     if (!everReturn) {
2555       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2556       /* turn on immediate messages only now
2557        node barrier previously should take care of the node synchronization */
2558       _immediateReady = 1;
2559       if (Cmi_usrsched==0) CsdScheduler(-1);
2560       ConverseExit();
2561     }else{
2562       _immediateReady = 1;
2563     }
2564   }
2565 }
2566
2567 void ConverseExit(void)
2568 {
2569   MACHSTATE(2,"ConverseExit {");
2570   machine_initiated_shutdown=1;
2571   if (CmiMyRank()==0) {
2572     if(Cmi_print_stats)
2573       printNetStatistics();
2574     log_done();
2575   }
2576 #if CMK_USE_SYSVSHM
2577         CmiExitSysvshm();
2578 #elif CMK_USE_PXSHM
2579         CmiExitPxshm();
2580 #endif
2581   ConverseCommonExit();               /* should be called by every rank */
2582   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
2583   if (CmiMyRank()==0) CmiStdoutFlush();
2584   if (Cmi_charmrun_fd==-1) {
2585     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
2586     else while (1) CmiYield();
2587   }
2588   else {
2589         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
2590 #if CMK_SHARED_VARS_UNAVAILABLE
2591         Cmi_check_delay = 1.0;          /* speed up checking of charmrun */
2592         while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2593 #elif CMK_MULTICORE
2594         if (!Cmi_commthread && CmiMyRank()==0) {
2595           Cmi_check_delay = 1.0;        /* speed up checking of charmrun */
2596           while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2597         }
2598 #endif
2599   }
2600   MACHSTATE(2,"} ConverseExit");
2601
2602 /*Comm. thread will kill us.*/
2603   while (1) CmiYield();
2604 }
2605
2606 static void set_signals(void)
2607 {
2608   if(!Cmi_truecrash) {
2609     signal(SIGSEGV, KillOnAllSigs);
2610     signal(SIGFPE, KillOnAllSigs);
2611     signal(SIGILL, KillOnAllSigs);
2612     signal(SIGINT, KillOnAllSigs);
2613     signal(SIGTERM, KillOnAllSigs);
2614     signal(SIGABRT, KillOnAllSigs);
2615 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2616     signal(SIGQUIT, KillOnAllSigs);
2617     signal(SIGBUS, KillOnAllSigs);
2618 #     if CMK_HANDLE_SIGUSR
2619     signal(SIGUSR1, HandleUserSignals);
2620     signal(SIGUSR2, HandleUserSignals);
2621 #     endif
2622 #   endif /*UNIX*/
2623   }
2624 }
2625
2626 /*Socket idle function to use before addresses have been
2627   obtained.  During the real program, we idle with CmiYield.
2628 */
2629 static void obtain_idleFn(void) {sleep(0);}
2630
2631 static int net_default_skt_abort(int code,const char *msg)
2632 {
2633   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2634   machine_exit(1);
2635   return -1;
2636 }
2637
2638 #if MACHINE_DEBUG_LOG
2639 FILE *debugLog = NULL;
2640 #endif
2641
2642 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
2643 {
2644 #if MACHINE_DEBUG
2645   debugLog=NULL;
2646 #endif
2647 #if CMK_USE_HP_MAIN_FIX
2648 #if FOR_CPLUS
2649   _main(argc,argv);
2650 #endif
2651 #endif
2652   Cmi_startfn = fn; Cmi_usrsched = usc;
2653   Cmi_netpoll = 0;
2654 #if CMK_NETPOLL
2655   Cmi_netpoll = 1;
2656 #endif
2657 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2658   Cmi_idlepoll = 0;
2659 #else
2660   Cmi_idlepoll = 1;
2661 #endif
2662   Cmi_truecrash = 0;
2663   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
2664       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2665     /* netpoll disable signal */
2666   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2667   if (CmiGetArgFlagDesc(argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2668     /* idlepoll use poll instead if sleep when idle */
2669   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2670     /* idlesleep use sleep instead if busywait when idle */
2671   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2672   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2673
2674   Cmi_asyncio = 1;
2675 #if CMK_ASYNC_NOT_NEEDED
2676   Cmi_asyncio = 0;
2677 #endif
2678   if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2679   if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2680 #if CMK_MULTICORE
2681   if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
2682     Cmi_commthread = 1;
2683 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2684     _Cmi_noprocforcommthread = 1;   /* worker thread go sleep */
2685 #endif
2686     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2687   }
2688 #endif
2689
2690   skt_init();
2691   /* use special abort handler instead of default_skt_abort to 
2692      prevent exit trapped by atexit_check() due to the exit() call  */
2693   skt_set_abort(net_default_skt_abort);
2694   atexit(machine_atexit_check);
2695   parse_netstart();
2696 #if ! CMK_SMP && ! defined(_WIN32)
2697   /* only get forks in non-smp mode */
2698   parse_forks();
2699 #endif
2700   extract_args(argv);
2701   log_init();
2702   Cmi_scanf_mutex = CmiCreateLock();
2703
2704 #if MACHINE_DEBUG_LOG
2705   {
2706     char ln[200];
2707     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2708     debugLog=fopen(ln,"w");
2709   }
2710 #endif
2711
2712     /* NOTE: can not acutally call timer before timerInit ! GZ */
2713   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2714
2715   skt_set_idle(obtain_idleFn);
2716   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2717         set_signals();
2718 #if CMK_USE_TCP
2719         dataskt=skt_server(&dataport);
2720 #elif !CMK_USE_GM && !CMK_USE_MX
2721         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2722 #else
2723           /* GM and MX do not need to create any socket for communication */
2724         dataskt=-1;
2725 #endif
2726         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2727         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2728         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2729         skt_tcp_no_nagle(Cmi_charmrun_fd);
2730         CmiStdoutInit();
2731   } else {/*Standalone operation*/
2732         printf("Charm++: standalone mode (not using charmrun)\n");
2733         dataskt=-1;
2734         Cmi_charmrun_fd=-1;
2735   }
2736
2737   CmiMachineInit(argv);
2738
2739   node_addresses_obtain(argv);
2740   MACHSTATE(5,"node_addresses_obtain done");
2741
2742 #if CMK_USE_IBVERBS
2743   if (Cmi_charmrun_fd==-1) CmiAbort("Fatal error: standalone mode is not supported in ibverbs. \n");
2744 #endif
2745
2746   CmiCommunicationInit(argv);
2747
2748 #if CMK_USE_SYSVSHM
2749   CmiInitSysvshm(argv);
2750 #elif CMK_USE_PXSHM
2751   CmiInitPxshm(argv);
2752 #endif
2753
2754   skt_set_idle(CmiYield);
2755   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2756
2757   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2758       Cmi_check_delay=1.0e30;
2759
2760   CsvInitialize(CmiNodeState, NodeState);
2761   CmiNodeStateInit(&CsvAccess(NodeState));
2762  
2763
2764   /* Network progress function is used to poll the network when for
2765      messages. This flushes receive buffers on some  implementations*/ 
2766   networkProgressPeriod = 0;  
2767   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2768
2769   Cmi_argvcopy = CmiCopyArgs(argv);
2770   Cmi_argv = argv; 
2771
2772   CmiStartThreads(argv);
2773
2774 #if CMK_USE_AMMASSO
2775   CmiAmmassoOpenQueuePairs();
2776 #endif
2777
2778   ConverseRunPE(everReturn);
2779 }
2780
2781 #if CMK_PERSISTENT_COMM
2782
2783 int persistentSendMsgHandlerIdx;
2784
2785 static void sendPerMsgHandler(char *msg)
2786 {
2787   int msgSize;
2788   void *destAddr, *destSizeAddr;
2789
2790   msgSize = CmiMsgHeaderGetLength(msg);
2791   msgSize -= 2*sizeof(void *);
2792   destAddr = *(void **)(msg + msgSize);
2793   destSizeAddr = *(void **)(msg + msgSize + sizeof(void*));
2794 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2795   CmiSetHandler(msg, CmiGetXHandler(msg));
2796   *((int *)destSizeAddr) = msgSize;
2797   memcpy(destAddr, msg, msgSize);
2798 }
2799
2800 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2801 {
2802   CmiAssert(h!=NULL);
2803   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2804   CmiAssert(slot->used == 1);
2805   CmiAssert(slot->destPE == destPE);
2806   if (size > slot->sizeMax) {
2807     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2808     CmiAbort("Abort: Invalid size\n");
2809   }
2810
2811 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2812
2813   if (slot->destAddress[0]) {
2814     int newsize = size + sizeof(void *)*2;
2815     char *newmsg = (char*)CmiAlloc(newsize);
2816     memcpy(newmsg, m, size);
2817     memcpy(newmsg+size, &slot->destAddress[0], sizeof(void *));
2818     memcpy(newmsg+size+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2819     CmiFree(m);
2820     CmiMsgHeaderSetLength(newmsg, size + sizeof(void *)*2);
2821     CmiSetXHandler(newmsg, CmiGetHandler(newmsg));
2822     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2823     phs = NULL; phsSize = 0;
2824     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2825   }
2826   else {
2827 #if 1
2828     if (slot->messageBuf != NULL) {
2829       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2830       CmiAbort("");
2831     }
2832     slot->messageBuf = m;
2833     slot->messageSize = size;
2834 #else
2835     /* normal send */
2836     PersistentHandle  *phs_tmp = phs;
2837     int phsSize_tmp = phsSize;
2838     phs = NULL; phsSize = 0;
2839     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2840     CmiSyncSendAndFree(slot->destPE, size, m);
2841     phs = phs_tmp; phsSize = phsSize_tmp;
2842 #endif
2843   }
2844 }
2845
2846 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2847 {
2848   char *dupmsg = (char *) CmiAlloc(size);
2849   memcpy(dupmsg, msg, size);
2850
2851   /*  CmiPrintf("Setting root to %d\n", 0); */
2852   if (CmiMyPe()==destPE) {
2853     CQdCreate(CpvAccess(cQdState), 1);
2854     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2855   }
2856   else
2857     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2858 }
2859
2860 /* called in PumpMsgs */
2861 int PumpPersistent()
2862 {
2863   PersistentReceivesTable *slot = persistentReceivesTableHead;
2864   int status = 0;
2865   while (slot) {
2866     unsigned int size = *(slot->recvSizePtr[0]);
2867     if (size > 0)
2868     {
2869       char *msg = slot->messagePtr[0];
2870 /*CmiPrintf("size: %d msg:%p %p\n", size, msg, slot->messagePtr);*/
2871
2872 #if 0
2873       void *dupmsg;
2874       dupmsg = CmiAlloc(size);
2875       
2876       _MEMCHECK(dupmsg);
2877       memcpy(dupmsg, msg, size);
2878       msg = dupmsg;
2879 #else
2880       /* return messagePtr directly and user MUST make sure not to delete it. */
2881       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2882
2883       CmiReference(msg);
2884 #endif
2885
2886       CmiPushPE(CMI_DEST_RANK(msg), msg);
2887 #if CMK_BROADCAST_SPANNING_TREE
2888       if (CMI_BROADCAST_ROOT(msg))
2889           SendSpanningChildren(size, msg);
2890 #endif
2891       *(slot->recvSizePtr[0]) = 0;
2892       status = 1;
2893     }
2894     slot = slot->next;
2895   }
2896   return status;
2897 }
2898
2899 void *PerAlloc(int size)
2900 {
2901   return CmiAlloc(size);
2902 }
2903                                                                                 
2904 void PerFree(char *msg)
2905 {
2906     CmiFree(msg);
2907 }
2908
2909 void persist_machine_init() 
2910 {
2911   persistentSendMsgHandlerIdx =
2912        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
2913 }
2914
2915 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
2916 {
2917   int i;
2918   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
2919     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
2920     _MEMCHECK(buf);
2921     memset(buf, 0, maxBytes+sizeof(int)*2);
2922     slot->messagePtr[i] = buf;
2923     slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
2924   }
2925   slot->sizeMax = maxBytes;
2926 }
2927
2928 #endif
2929
2930
2931 #if CMK_CELL
2932
2933 #include "spert_ppu.h"
2934
2935 void machine_OffloadAPIProgress() {
2936   CmiCommLock();
2937   OffloadAPIProgress();
2938   CmiCommUnlock();
2939 }
2940 #endif
2941
2942
2943
2944 /*@}*/