added CcdPERIODIC_5minute
[charm.git] / src / arch / net / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * Basic NET implementation of Converse machine layer
10  * @ingroup NET
11  */
12
13 /** @defgroup NET
14  * NET implementation of machine layer, ethernet in particular
15  * @ingroup Machine
16  *
17  * THE DATAGRAM STREAM
18  *
19  * Messages are sent using UDP datagrams.  The sender allocates a
20  * struct for each datagram to be sent.  These structs stick around
21  * until slightly after the datagram is acknowledged.
22  *
23  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
24  * Each node has an OtherNode struct for every other node in the
25  * system.  The OtherNode struct contains:
26  *
27  *   send_queue   (all datagram-structs not yet transmitted)
28  *   send_window  (all datagram-structs transmitted but not ack'd)
29  *
30  * When an acknowledgement comes in, all packets in the send-window
31  * are either marked as acknowledged or pushed back into the send
32  * queue for retransmission.
33  *
34  * THE OUTGOING MESSAGE
35  *
36  * When you send or broadcast a message, the first thing the system
37  * does is system creates an OutgoingMsg struct to represent the
38  * operation.  The OutgoingMsg contains a very direct expression
39  * of what you want to do:
40  *
41  * OutgoingMsg:
42  *
43  *   size      --- size of message in bytes
44  *   data      --- pointer to the buffer containing the message
45  *   src       --- processor which sent the message
46  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
47  *   freemode  --- see below.
48  *   refcount  --- see below.
49  *
50  * The OutgoingMsg is kept around until the transmission is done, then
51  * it is garbage collected --- the refcount and freemode fields are
52  * to assist garbage collection.
53  *
54  * The freemode indicates which kind of buffer-management policy was
55  * used (sync, async, or freeing).  The sync policy is handled
56  * superficially by immediately converting sync sends into freeing
57  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
58  * (freeing).  If the freemode is 'F', then garbage collection
59  * involves freeing the data and the OutgoingMsg structure itself.  If
60  * the freemode is 'A', then the only cleanup is to change the
61  * freemode to 'X', a condition which is then detectable by
62  * CmiAsyncMsgSent.  In this case, the actual freeing of the
63  * OutgoingMsg is done by CmiReleaseCommHandle.
64  *
65  * When the transmission is initiated, the system computes how many
66  * datagrams need to be sent, total.  This number is stored in the
67  * refcount field.  Each time a datagram is delivered, the refcount
68  * is decremented, when it reaches zero, cleanup is performed.  There
69  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
70  * is a send (not a broadcast) and can be performed with shared
71  * memory, the entire datagram system is bypassed, the message is
72  * simply delivered and freed, not using the refcount mechanism at
73  * all.  Exception 2: If the message is a broadcast, then part of the
74  * broadcast that can be done via shared memory is performed prior to
75  * initiating the datagram/refcount system.
76  *
77  * DATAGRAM FORMATS AND MESSAGE FORMATS
78  *
79  * Datagrams have this format:
80  *
81  *   srcpe   (16 bits) --- source processor number.
82  *   magic   ( 8 bits) --- magic number to make sure DG is good.
83  *   dstrank ( 8 bits) --- destination processor rank.
84  *   seqno   (32 bits) --- packet sequence number.
85  *   data    (XX byte) --- user data.
86  *
87  * The only reason the srcpe is in there is because the receiver needs
88  * to know which receive window to use.  The dstrank field is needed
89  * because transmission is node-to-node.  Once the message is
90  * assembled by the node, it must be delivered to the appropriate PE.
91  * The dstrank field is used to encode certain special-case scenarios.
92  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
93  * and should be delivered to all processors in the node.  If the dstrank
94  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
95  * which case the srcpe is the number of the acknowledger, the seqno is
96  * always zero, and the user data is a list of the seqno's being
97  * acknowledged.  There may be other dstrank codes for special functions.
98  *
99  * To send a message, one chops it up into datagrams and stores those
100  * datagrams in a send-queue.  These outgoing datagrams aren't stored
101  * in the explicit format shown above.  Instead, they are stored as
102  * ImplicitDgrams, which contain the datagram header and a pointer to
103  * the user data (which is in the user message buffer, which is in the
104  * OutgoingMsg).  At transmission time these are combined together.
105
106  * The combination of the datagram header with the user's data is
107  * performed right in the user's message buffer.  Note that the
108  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
109  * of the user's message with a datagram header, sends the datagram
110  * straight from the user's message buffer, then restores the user's
111  * buffer to its original state.  There is a small problem with the
112  * first datagram of the message: one needs 64 bits of space to store
113  * the datagram header.  To make sure this space is there, we added a
114  * 64-bit unused space to the front of the Cmi message header.  In
115  * addition to this, we also add 32 bits to the Cmi message header
116  * to make room for a length-field, making it possible to identify
117  * message boundaries.
118  *
119  * CONCURRENCY CONTROL
120  *
121  * This has changed recently.
122  *
123  * EFFICIENCY NOTES
124  *
125  * The sender-side does little copying.  The async and freeing send
126  * routines do no copying at all.  The sync send routines copy the
127  * message, then use the freeing-send routines.  The other alternative
128  * is to not copy the message, and use the async send mechanism
129  * combined with a blocking wait.  Blocking wait seems like a bad
130  * idea, since it could take a VERY long time to get all those
131  * datagrams out the door.
132  *
133  * The receiver side, unfortunately, must copy.  To avoid copying,
134  * it would have to receive directly into a preallocated message buffer.
135  * Unfortunately, this can't work: there's no way to know how much
136  * memory to preallocate, and there's no way to know which datagram
137  * is coming next.  Thus, we receive into fixed-size (large) datagram
138  * buffers.  These are then inspected, and the messages extracted from
139  * them.
140  *
141  * Note that we are allocating a large number of structs: OutgoingMsg's,
142  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
143  * is a fixed-size structure.  Thus, we can do memory allocation by
144  * simply keeping a linked-list of unused structs around.  The only
145  * place where expensive memory allocation is performed is in the
146  * sync routines.
147  *
148  * Since the datagrams from one node to another are fully ordered,
149  * there is slightly more ordering than is needed: in theory, the
150  * datagrams of one message don't need to be ordered relative to the
151  * datagrams of another.  This was done to simplify the sequencing
152  * mechanisms: implementing a fully-ordered stream is much simpler
153  * than a partially-ordered one.  It also makes it possible to
154  * modularize, layering the message transmitter on top of the
155  * datagram-sequencer.  In other words, it was just easier this way.
156  * Hopefully, this won't cause serious degradation: LAN's rarely get
157  * datagrams out of order anyway.
158  *
159  * A potential efficiency problem is the lack of message-combining.
160  * One datagram could conceivably contain several messages.  This
161  * might be more efficient, it's not clear how much overhead is
162  * involved in sending a short datagram.  Message-combining isn't
163  * really ``integrated'' into the design of this software, but you
164  * could fudge it as follows.  Whenever you pull a short datagram from
165  * the send-queue, check the next one to see if it's also a short
166  * datagram.  If so, pack them together into a ``combined'' datagram.
167  * At the receive side, simply check for ``combined'' datagrams, and
168  * treat them as if they were simply two datagrams.  This would
169  * require extra copying.  I have no idea if this would be worthwhile.
170  *
171  *****************************************************************************/
172
173 /**
174  * @addtogroup NET
175  * @{
176  */
177
178 /*****************************************************************************
179  *
180  * Include Files
181  *
182  ****************************************************************************/
183
184 #define _GNU_SOURCE 1
185 #include <stdarg.h> /*<- was <varargs.h>*/
186
187 #define CMK_USE_PRINTF_HACK 0
188 #if CMK_USE_PRINTF_HACK
189 /*HACK: turn printf into CmiPrintf, by just defining our own
190 external symbol "printf".  This may be more trouble than it's worth,
191 since the only advantage is that it works properly with +syncprint.
192
193 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
194 because they don't call printf.  Has to be defined up here because we probably 
195 haven't properly guessed this compiler's prototype for "printf".
196 */
197 static void InternalPrintf(const char *f, va_list l);
198 int printf(const char *fmt, ...) {
199         int nChar;
200         va_list p; va_start(p, fmt);
201         InternalPrintf(fmt,p);
202         va_end(p);
203         return 10;
204 }
205 #endif
206
207
208 #include "converse.h"
209 #include "memory-isomalloc.h"
210
211 #include <stdio.h>
212 #include <stdlib.h>
213 #include <ctype.h>
214 #include <fcntl.h>
215 #include <errno.h>
216 #include <setjmp.h>
217 #include <signal.h>
218 #include <string.h>
219
220 /* define machine debug */
221 #include "machine.h"
222
223 /******************* Producer-Consumer Queues ************************/
224 #include "pcqueue.h"
225
226 #include "machine-smp.h"
227
228 #if CMK_USE_KQUEUE
229 #include <sys/event.h>
230 int _kq = -1;
231 #endif
232
233 #if CMK_USE_POLL
234 #include <poll.h>
235 #endif
236
237 #if CMK_USE_GM
238 #include "gm.h"
239 struct gm_port *gmport = NULL;
240 int  portFinish = 0;
241 #endif
242
243 #if CMK_USE_MX
244 #include "myriexpress.h"
245 mx_endpoint_t      endpoint;
246 mx_endpoint_addr_t endpoint_addr;
247 int MX_FILTER   =  123456;
248 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
249 #endif
250
251 #if CMK_USE_AMMASSO
252   #include "clustercore/ccil_api.h"
253 #endif
254
255 #if CMK_MULTICORE
256 int Cmi_commthread = 0;
257 #endif
258
259 #include "conv-ccs.h"
260 #include "ccs-server.h"
261 #include "sockRoutines.h"
262
263 #if defined(_WIN32) && ! defined(__CYGWIN__)
264 /*For windows systems:*/
265 #  include <windows.h>
266 #  include <wincon.h>
267 #  include <sys/types.h>
268 #  include <sys/timeb.h>
269 #  define fdopen _fdopen
270 #  define SIGBUS -1  /*These signals don't exist in Win32*/
271 #  define SIGKILL -1
272 #  define SIGQUIT -1
273 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
274
275 #else /*UNIX*/
276 #  include <pwd.h>
277 #  include <unistd.h>
278 #  include <fcntl.h>
279 #  include <sys/file.h>
280 #endif
281
282 #if CMK_PERSISTENT_COMM
283 #include "persist_impl.h"
284 #endif
285
286 #define PRINTBUFSIZE 16384
287
288 #ifdef __ONESIDED_IMPL
289 #ifdef __ONESIDED_NO_HARDWARE
290 int putSrcHandler;
291 int putDestHandler;
292 int getSrcHandler;
293 int getDestHandler;
294 #include "conv-onesided.c"
295 #endif
296 #endif
297
298 static void CommunicationServer(int withDelayMs, int where);
299
300 void CmiHandleImmediate();
301 extern int CmemInsideMem();
302 extern void CmemCallWhenMemAvail();
303 static void ConverseRunPE(int everReturn);
304 void CmiYield(void);
305 void ConverseCommonExit(void);
306
307 static unsigned int dataport=0;
308 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
309 static SOCKET       dataskt;
310
311 extern void TokenUpdatePeriodic();
312 extern void getAvailSysMem();
313
314 #define BROADCAST_SPANNING_FACTOR               4
315
316 /****************************************************************************
317  *
318  * Handling Errors
319  *
320  * Errors should be handled by printing a message on stderr and
321  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
322  * communication should be made.  The other processes will notice the
323  * abnormal termination and will deal with it.
324  *
325  * Rationale: if an error triggers an attempt to send a message,
326  * the attempt to send a message is likely to trigger another error,
327  * leading to an infinite loop and a process that spins instead of
328  * shutting down.
329  *
330  *****************************************************************************/
331
332 static int machine_initiated_shutdown=0;
333 static int already_in_signal_handler=0;
334
335 static void CmiDestoryLocks();
336
337 void CmiMachineExit();
338
339 #if CMK_USE_SYSVSHM /* define teardown function before use */
340 void tearDownSharedBuffers();
341 #endif 
342
343 static void machine_exit(int status)
344 {
345   MACHSTATE(3,"     machine_exit");
346   machine_initiated_shutdown=1;
347
348   CmiDestoryLocks();            /* destory locks to prevent dead locking */
349   EmergencyExit();
350
351 #if CMK_USE_GM
352   if (gmport) { 
353     gm_close(gmport); gmport = 0;
354     gm_finalize();
355   }
356 #endif
357 #if CMK_USE_SYSVSHM
358   tearDownSharedBuffers();
359 #endif
360   CmiMachineExit();
361   exit(status);
362 }
363
364 static void charmrun_abort(const char*);
365
366 static void KillEveryone(const char *msg)
367 {
368   charmrun_abort(msg);
369   machine_exit(1);
370 }
371
372 static void KillEveryoneCode(n)
373 int n;
374 {
375   char _s[100];
376   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
377   charmrun_abort(_s);
378   machine_exit(1);
379 }
380
381 static void KillOnAllSigs(int sigNo)
382 {
383   const char *sig="unknown signal";
384   const char *suggestion="";
385   if (machine_initiated_shutdown ||
386       already_in_signal_handler) 
387         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
388   already_in_signal_handler=1;
389
390   if (CpvAccess(cmiArgDebugFlag)) {
391     CpdNotify(CPD_SIGNAL,sigNo);
392     CpdFreeze();
393   }
394   
395   CmiDestoryLocks();            /* destory locks */
396
397   if (sigNo==SIGSEGV) {
398      sig="segmentation violation";
399      suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
400   }
401   if (sigNo==SIGFPE) {
402      sig="floating point exception";
403      suggestion="Check for integer or floating-point division by zero.\n";
404   }
405   if (sigNo==SIGBUS) {
406      sig="bus error";
407      suggestion="Check for misaligned reads or writes to memory.\n";
408   }
409   if (sigNo==SIGILL) {
410      sig="illegal instruction";
411      suggestion="Check for calls to uninitialized function pointers.\n";
412   }
413   if (sigNo==SIGKILL) sig="caught signal KILL";
414   if (sigNo==SIGQUIT) sig="caught signal QUIT";
415   if (sigNo==SIGTERM) sig="caught signal TERM";
416   MACHSTATE1(5,"     Caught signal %s ",sig);
417 /*ifdef this part*/
418 #ifdef __FAULT__
419   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
420                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
421   }else{
422 #else
423         {
424 #endif
425    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
426         "Signal: %s\n",CmiMyPe(),sig);
427         if (0!=suggestion[0])
428                 CmiError("Suggestion: %s",suggestion);
429         CmiPrintStackTrace(1);
430         charmrun_abort(sig);
431         machine_exit(1);                
432         }       
433 }
434
435 static void machine_atexit_check(void)
436 {
437   if (!machine_initiated_shutdown)
438     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
439   printf("Program finished.\n");
440 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
441   fgetc(stdin);
442 #endif
443 }
444
445 #if !defined(_WIN32) || defined(__CYGWIN__)
446 static void HandleUserSignals(int signum)
447 {
448   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
449   CcdRaiseCondition(condnum);
450 }
451 #endif
452
453 static void PerrorExit(const char *msg)
454 {
455   perror(msg);
456   machine_exit(1);
457 }
458
459 /*****************************************************************************
460  *
461  *     Utility routines for network machine interface.
462  *
463  *****************************************************************************/
464
465 /*
466 Horrific #defines to hide the differences between select() and poll().
467  */
468 #if CMK_USE_POLL /*poll() version*/
469 # define CMK_PIPE_DECL(delayMs) \
470         struct pollfd fds[10]; \
471         int nFds_sto=0; int *nFds=&nFds_sto; \
472         int pollDelayMs=delayMs;
473 # define CMK_PIPE_SUB fds,nFds
474 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
475
476 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
477 # define CMK_PIPE_ADDREAD(rd_fd) \
478         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
479 # define CMK_PIPE_ADDWRITE(wr_fd) \
480         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
481 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
482 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
483
484 #elif CMK_USE_KQUEUE /* kqueue version */
485
486 # define CMK_PIPE_DECL(delayMs) \
487         if (_kq == -1) _kq = kqueue(); \
488     struct kevent ke_sto; \
489     struct kevent* ke = &ke_sto; \
490     struct timespec tmo; \
491     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
492 # define CMK_PIPE_SUB ke
493 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
494
495 # define CMK_PIPE_PARAM struct kevent* ke
496 # define CMK_PIPE_ADDREAD(rd_fd) \
497         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
498                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
499 # define CMK_PIPE_ADDWRITE(wr_fd) \
500         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
501                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
502 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
503 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
504
505 #else /*select() version*/
506
507 # define CMK_PIPE_DECL(delayMs) \
508         fd_set rfds_sto,wfds_sto;\
509         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
510         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
511 # define CMK_PIPE_SUB rfds,wfds
512 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
513
514 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
515 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
516 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
517 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
518 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
519 #endif
520
521 static void CMK_PIPE_CHECKERR(void) {
522 #if defined(_WIN32) && !defined(__CYGWIN__)
523 /* Win32 socket seems to randomly return inexplicable errors
524 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
525         int err=WSAGetLastError();
526         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
527 o,err);
528 */
529 #else /*UNIX machine*/
530         if (errno!=EINTR)
531                 KillEveryone("Socket error in CheckSocketsReady!\n");
532 #endif
533 }
534
535
536 static void CmiStdoutFlush(void);
537 static int  CmiStdoutNeedsService(void);
538 static void CmiStdoutService(void);
539 static void CmiStdoutAdd(CMK_PIPE_PARAM);
540 static void CmiStdoutCheck(CMK_PIPE_PARAM);
541
542
543 double GetClock(void)
544 {
545 #if defined(_WIN32) && !defined(__CYGWIN__)
546   struct _timeb tv; 
547   _ftime(&tv);
548   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
549 #else
550   struct timeval tv; int ok;
551   ok = gettimeofday(&tv, NULL);
552   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
553   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
554 #endif
555 }
556
557 char *CopyMsg(char *msg, int len)
558 {
559   char *copy = (char *)CmiAlloc(len);
560   if (!copy)
561       fprintf(stderr, "Out of memory\n");
562   memcpy(copy, msg, len);
563   return copy;
564 }
565
566 /***********************************************************************
567  *
568  * Abort function:
569  *
570  ************************************************************************/
571
572 static int  Cmi_truecrash;
573 static int already_aborting=0;
574 void CmiAbort(const char *message)
575 {
576   if (already_aborting) machine_exit(1);
577   already_aborting=1;
578         {
579 /*       char str[100];
580          sprintf(str,"dead.%d",CmiMyNode());
581          FILE *fp = fopen(str,"w");
582          fprintf(fp,"%s",message);
583          fclose(fp);*/
584         }
585   MACHSTATE1(5,"CmiAbort(%s)",message);
586
587   /* if CharmDebug is attached simply try to send a message to it */
588   if (CpvAccess(cmiArgDebugFlag)) {
589     CpdNotify(CPD_ABORT, message);
590     CpdFreeze();
591   }
592   
593   /* CmiDestoryLocks();  */
594
595   {
596 /*    char str[22];
597     snprintf(str,18,"dead.%d",CmiMyPe());
598     FILE *fp = fopen(str,"w");
599     fprintf(fp,"Abort:%s\n",message);
600     fclose(fp);*/
601   }
602
603   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
604         "Reason: %s\n",CmiMyPe(),message);
605   CmiPrintStackTrace(0);
606   
607   /*Send off any remaining prints*/
608   CmiStdoutFlush();
609   
610   if(Cmi_truecrash) {
611     printf("CHARM++ FATAL ERROR: %s\n", message);
612     *(int *)NULL = 0; /*Write to null, causing bus error*/
613   } else {
614     charmrun_abort(message);
615     machine_exit(1);
616   }
617 }
618
619
620 /******************************************************************************
621  *
622  * CmiEnableAsyncIO
623  *
624  * The net and tcp versions use a bunch of unix processes talking to each
625  * other via file descriptors.  We need for a signal SIGIO to be generated
626  * each time a message arrives, making it possible to write a signal
627  * handler to handle the messages.  The vast majority of unixes can,
628  * in fact, do this.  However, there isn't any standard for how this is
629  * supposed to be done, so each version of UNIX has a different set of
630  * calls to turn this signal on.  So, there is like one version here for
631  * every major brand of UNIX.
632  *
633  *****************************************************************************/
634
635 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
636 #include <fcntl.h>
637 void CmiEnableAsyncIO(int fd)
638 {
639   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
640     CmiError("setting socket owner: %s\n", strerror(errno)) ;
641     exit(1);
642   }
643   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
644     CmiError("setting socket async: %s\n", strerror(errno)) ;
645     exit(1);
646   }
647 }
648 #else
649 void CmiEnableAsyncIO(int fd) { }
650 #endif
651
652 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
653 #if !defined(_WIN32) || defined(__CYGWIN__)
654 void CmiEnableNonblockingIO(int fd) {
655   int on=1;
656   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
657     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
658     exit(1);
659   }
660 }
661 #else
662 void CmiEnableNonblockingIO(int fd) { }
663 #endif
664
665
666 /******************************************************************************
667 *
668 * Configuration Data
669 *
670 * This data is all read in from the NETSTART variable (provided by the
671 * charmrun) and from the command-line arguments.  Once read in, it is never
672  * modified.
673  *
674  *****************************************************************************/
675
676 int               _Cmi_mynode;    /* Which address space am I */
677 int               _Cmi_mynodesize;/* Number of processors in my address space */
678 int               _Cmi_numnodes;  /* Total number of address spaces */
679 int               _Cmi_numpes;    /* Total number of processors */
680 static int        Cmi_nodestart; /* First processor in this address space */
681 static skt_ip_t   Cmi_self_IP;
682 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
683 static int        Cmi_charmrun_port;
684 static int        Cmi_charmrun_pid;
685 static int        Cmi_charmrun_fd=-1;
686
687 static int    Cmi_netpoll;
688 static int    Cmi_asyncio;
689 static int    Cmi_idlepoll;
690 static int    Cmi_syncprint;
691 static int Cmi_print_stats = 0;
692
693 #if ! CMK_SMP && ! defined(_WIN32)
694 /* parse forks only used in non-smp mode */
695 static void parse_forks(void) {
696   char *forkstr;
697   int nread;
698   int forks;
699   int i,pid;
700   forkstr=getenv("CmiMyForks");
701   if(forkstr!=0) { /* charmrun */
702         nread = sscanf(forkstr,"%d",&forks);
703         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
704                 pid=fork();
705                 if(pid<0) CmiAbort("Fork returned an error");
706                 if(pid==0) { /* forked process */
707                         /* reset mynode,pe & exit loop */
708                         _Cmi_mynode+=i;
709                         _Cmi_mype+=i;
710                         break;
711                 }
712         }
713   }
714 }
715 #endif
716
717 static void parse_netstart(void)
718 {
719   char *ns;
720   int nread;
721   int port;
722   ns = getenv("NETSTART");
723   if (ns!=0) 
724   {/*Read values set by Charmrun*/
725         char Cmi_charmrun_name[1024];
726         nread = sscanf(ns, "%d%s%d%d%d",
727                  &_Cmi_mynode,
728                  Cmi_charmrun_name, &Cmi_charmrun_port,
729                  &Cmi_charmrun_pid, &port);
730         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
731
732         if (nread!=5) {
733                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
734                 exit(1);
735         }
736   } else 
737   {/*No charmrun-- set flag values for standalone operation*/
738         _Cmi_mynode=0;
739         Cmi_charmrun_IP=_skt_invalid_ip;
740         Cmi_charmrun_port=0;
741         Cmi_charmrun_pid=0;
742         dataport = -1;
743   }
744 #if CMK_USE_IBVERBS | CMK_USE_IBUD
745         char *cmi_num_nodes = getenv("CmiNumNodes");
746         if(cmi_num_nodes != NULL){
747                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
748         }
749 #endif  
750 }
751
752 static void extract_common_args(char **argv)
753 {
754   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
755     Cmi_print_stats = 1;
756 }
757
758 /* for SMP */
759 #include "machine-smp.c"
760
761 CsvDeclare(CmiNodeState, NodeState);
762
763 /* Immediate message support */
764 #define CMI_DEST_RANK(msg)      *(int *)(msg)
765 #include "immediate.c"
766
767 /******************************************************************************
768  *
769  * Packet Performance Logging
770  *
771  * This module is designed to give a detailed log of the packets and their
772  * acknowledgements, for performance tuning.  It can be disabled.
773  *
774  *****************************************************************************/
775
776 #define LOGGING 0
777
778 #if LOGGING
779
780 typedef struct logent {
781   double time;
782   int seqno;
783   int srcpe;
784   int dstpe;
785   int kind;
786 } *logent;
787
788
789 logent log;
790 int    log_pos;
791 int    log_wrap;
792
793 static void log_init(void)
794 {
795   log = (logent)malloc(50000 * sizeof(struct logent));
796   _MEMCHECK(log);
797   log_pos = 0;
798   log_wrap = 0;
799 }
800
801 static void log_done(void)
802 {
803   char logname[100]; FILE *f; int i, size;
804   sprintf(logname, "log.%d", _Cmi_mynode);
805   f = fopen(logname, "w");
806   if (f==0) KillEveryone("fopen problem");
807   if (log_wrap) size = 50000; else size=log_pos;
808   for (i=0; i<size; i++) {
809     logent ent = log+i;
810     fprintf(f, "%1.4f %d %c %d %d\n",
811             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
812   }
813   fclose(f);
814 }
815
816 void printLog(void)
817 {
818   char logname[100]; FILE *f; int i, j, size;
819   static int logged = 0;
820   if (logged)
821       return;
822   logged = 1;
823   CmiPrintf("Logging: %d\n", _Cmi_mynode);
824   sprintf(logname, "log.%d", _Cmi_mynode);
825   f = fopen(logname, "w");
826   if (f==0) KillEveryone("fopen problem");
827   for (i = 5000; i; i--)
828   {
829   /*for (i=0; i<size; i++) */
830     j = log_pos - i;
831     if (j < 0)
832     {
833         if (log_wrap)
834             j = 5000 + j;
835         else
836             j = 0;
837     };
838     {
839     logent ent = log+j;
840     fprintf(f, "%1.4f %d %c %d %d\n",
841             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
842     }
843   }
844   fclose(f);
845   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
846 }
847
848 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
849
850 #endif
851
852 #if !LOGGING
853
854 #define log_init() /*empty*/
855 #define log_done() /*empty*/
856 #define printLog() /*empty*/
857 #define LOG(t,s,k,d,q) /*empty*/
858
859 #endif
860
861 /******************************************************************************
862  *
863  * Node state
864  *
865  *****************************************************************************/
866
867
868 static CmiNodeLock    Cmi_scanf_mutex;
869 static double         Cmi_clock;
870 static double         Cmi_check_delay = 3.0;
871
872 /******************************************************************************
873  *
874  * OS Threads
875  * SMP implementation moved to machine-smp.c
876  *****************************************************************************/
877
878 /************************ No kernel SMP threads ***************/
879 #if CMK_SHARED_VARS_UNAVAILABLE
880
881 static volatile int memflag=0;
882 void CmiMemLock() { memflag++; }
883 void CmiMemUnlock() { memflag--; }
884
885 static volatile int comm_flag=0;
886 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
887 #ifndef MACHLOCK_DEBUG
888 #  define CmiCommLock() (comm_flag=1)
889 #  define CmiCommUnlock() (comm_flag=0)
890 #else /* Error-checking flag locks */
891 void CmiCommLock(void) {
892   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
893   comm_flag=1;
894 }
895 void CmiCommUnlock(void) {
896   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
897   comm_flag=0;
898 }
899 #endif
900
901 static struct CmiStateStruct Cmi_state;
902 int _Cmi_mype;
903 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
904 #define CmiGetState() (&Cmi_state)
905 #define CmiGetStateN(n) (&Cmi_state)
906
907 void CmiYield(void) { sleep(0); }
908
909 static void CommunicationInterrupt(int ignored)
910 {
911   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
912   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
913   { /* Already busy inside malloc, comm, or immediate messages */
914     MACHSTATE(5,"--SKIPPING SIGIO--");
915     return;
916   }
917   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
918   {
919     /*Make sure any malloc's we do in here are NOT migratable:*/
920     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
921 /*    _Cmi_myrank=1; */
922     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
923 /*    _Cmi_myrank=0; */
924     CmiIsomallocBlockListActivate(oldList);
925   }
926   MACHSTATE(2,"--END SIGIO--")
927 }
928
929 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
930
931 static void CmiStartThreads(char **argv)
932 {
933   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
934   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
935   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
936     KillEveryone
937       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
938   
939   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
940   _Cmi_mype = Cmi_nodestart;
941
942   /* Prepare Cpv's for immediate messages: */
943   _Cmi_myrank=1;
944   CommunicationServerInit();
945   _Cmi_myrank=0;
946   
947 #if !CMK_ASYNC_NOT_NEEDED
948   if (Cmi_asyncio)
949   {
950     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
951     if (!Cmi_netpoll) {
952       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
953       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
954     }
955 #if CMK_USE_GM || CMK_USE_MX
956       /* charmrun is serviced in interrupt for gm */
957     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
958 #endif
959   }
960 #endif
961 }
962
963 static void CmiDestoryLocks()
964 {
965   comm_flag = 0;
966   memflag = 0;
967 }
968
969 #endif
970
971 /*Network progress utility variables. Period controls the rate at
972   which the network poll is called */
973
974 CpvDeclare(unsigned , networkProgressCount);
975 int networkProgressPeriod;
976
977 CpvDeclare(void *, CmiLocalQueue);
978
979
980 #ifndef CmiMyPe
981 int CmiMyPe(void) 
982
983   return CmiGetState()->pe; 
984 }
985 #endif
986 #ifndef CmiMyRank
987 int CmiMyRank(void)
988 {
989   return CmiGetState()->rank;
990 }
991 #endif
992
993 CpvExtern(int,_charmEpoch);
994
995 /*Add a message to this processor's receive queue 
996   Must be called while holding comm. lock
997 */
998
999 extern double evacTime;
1000
1001 void CmiPushPE(int pe,void *msg)
1002 {
1003   CmiState cs=CmiGetStateN(pe);
1004         /*
1005                 FAULT_EVAC
1006         
1007         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
1008                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
1009         }*/
1010   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
1011   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
1012
1013 #if CMK_IMMEDIATE_MSG
1014   if (CmiIsImmediate(msg)) {
1015     CmiPushImmediateMsg(msg);
1016     return;
1017   }
1018 #endif
1019 #if !CMK_SMP_MULTIQ
1020   PCQueuePush(cs->recv,msg);
1021 #else
1022   PCQueuePush(cs->recv[CmiGetState()->myGrpIdx], msg);
1023 #endif
1024
1025 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1026   if (_Cmi_noprocforcommthread) 
1027 #endif
1028   CmiIdleLock_addMessage(&cs->idle);
1029 }
1030
1031 #if CMK_NODE_QUEUE_AVAILABLE
1032 /*Add a message to the node queue.  
1033   Must be called while holding comm. lock
1034 */
1035 static void CmiPushNode(void *msg)
1036 {
1037   CmiState cs=CmiGetStateN(0);
1038   
1039   MACHSTATE(2,"Pushing message into node queue");
1040   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
1041   
1042 #if CMK_IMMEDIATE_MSG
1043   if (CmiIsImmediate(msg)) {
1044     MACHSTATE(2,"Pushing Immediate message into queue");
1045     CmiPushImmediateMsg(msg);
1046     return;
1047   }
1048 #endif 
1049 /* 
1050  * if CMK_SMP_MULTIQ is enabled, then PCQUEUE's push lock
1051  * may be disabled. In this case, the lock for node recv
1052  * queue has to be used.
1053  * */
1054 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1055   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1056 #endif
1057   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
1058 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1059   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1060 #endif
1061
1062   /*Silly: always try to wake up processor 0, so at least *somebody*
1063     will be awake to handle the message*/
1064 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1065   if (_Cmi_noprocforcommthread) 
1066 #endif
1067   CmiIdleLock_addMessage(&cs->idle);
1068 }
1069 #endif
1070
1071 /***************************************************************
1072  Communication with charmrun:
1073  We can send (ctrl_sendone) and receive (ctrl_getone)
1074  messages on a TCP socket connected to charmrun.
1075  This is used for printfs, CCS, etc; and also for
1076  killing ourselves if charmrun dies.
1077 */
1078
1079 /*This flag prevents simultanious outgoing
1080 messages on the charmrun socket.  It is protected
1081 by the commlock.*/
1082 static int Cmi_charmrun_fd_sendflag=0;
1083
1084 /* ctrl_sendone */
1085 static int sendone_abort_fn(int code,const char *msg) {
1086         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
1087         machine_exit(1);
1088         return -1;
1089 }
1090
1091 static void ctrl_sendone_nolock(const char *type,
1092                                 const char *data1,int dataLen1,
1093                                 const char *data2,int dataLen2)
1094 {
1095   const void *bufs[3]; int lens[3]; int nBuffers=0;
1096   ChMessageHeader hdr;
1097   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
1098   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
1099   if (Cmi_charmrun_fd==-1) 
1100         charmrun_abort("ctrl_sendone called in standalone!\n");
1101   Cmi_charmrun_fd_sendflag=1;
1102   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
1103   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
1104   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
1105   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
1106   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
1107   Cmi_charmrun_fd_sendflag=0;
1108   skt_set_abort(oldAbort);
1109   MACHSTATE(2,"} ctrl_sendone_nolock");
1110 }
1111
1112 static void ctrl_sendone_locking(const char *type,
1113                                 const char *data1,int dataLen1,
1114                                 const char *data2,int dataLen2)
1115 {
1116   CmiCommLock();
1117   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
1118   CmiCommUnlock();
1119 }
1120
1121 #ifndef MEMORYUSAGE_OUTPUT
1122 #define MEMORYUSAGE_OUTPUT 0 
1123 #endif
1124 #if MEMORYUSAGE_OUTPUT 
1125 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
1126 static int memoryusage_counter;
1127 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
1128 #define memoryusage_output {\
1129   memoryusage_counter++;\
1130   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1131 #endif
1132
1133 static double Cmi_check_last;
1134
1135 /* if charmrun dies, we finish */
1136 static void pingCharmrun(void *ignored) 
1137 {
1138 #if MEMORYUSAGE_OUTPUT
1139   memoryusage_output;
1140   if(memoryusage_isOutput){
1141     memoryusage_counter = 0;
1142 #else
1143   {
1144 #endif 
1145
1146   double clock=GetClock();
1147   if (clock > Cmi_check_last + Cmi_check_delay) {
1148     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1149     Cmi_check_last = clock; 
1150 #if CMK_USE_GM || CMK_USE_MX
1151     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1152 #endif
1153     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1154     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1155     CmiCommLock();
1156     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1157     CmiCommUnlock();
1158   }
1159 #if 1
1160 #if CMK_USE_GM || CMK_USE_MX
1161   if (!Cmi_netpoll)
1162 #endif
1163   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1164 #endif
1165   }
1166 }
1167
1168 /* periodic charm ping, for gm and netpoll */
1169 static void pingCharmrunPeriodic(void *ignored)
1170 {
1171   pingCharmrun(ignored);
1172   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1173 }
1174
1175 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1176 static void charmrun_abort(const char *s)
1177 {
1178   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1179         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1180 CmiPrintStackTrace(0);
1181         abort();
1182   } else {
1183         char msgBuf[80];
1184         skt_set_abort(ignore_further_errors);
1185         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1186         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1187   }
1188 }
1189
1190 /* ctrl_getone */
1191
1192 #ifdef __FAULT__
1193 #include "machine-recover.c"
1194 #endif
1195
1196 static void node_addresses_store(ChMessage *msg);
1197
1198 static void ctrl_getone(void)
1199 {
1200   ChMessage msg;
1201   MACHSTATE(2,"ctrl_getone")
1202   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
1203   ChMessage_recv(Cmi_charmrun_fd,&msg);
1204   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1205
1206   if (strcmp(msg.header.type,"die")==0) {
1207     MACHSTATE(2,"ctrl_getone bye bye")
1208     fprintf(stderr,"aborting: %s\n",msg.data);
1209     log_done();
1210     ConverseCommonExit();
1211     machine_exit(0);
1212   } 
1213 #if CMK_CCS_AVAILABLE
1214   else if (strcmp(msg.header.type, "req_fw")==0) {
1215     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1216         /*Sadly, I *can't* do a:
1217       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1218         here, because I can't send converse messages in the
1219         communication thread.  I *can* poke this message into 
1220         any convenient processor's queue, though:  (OSL, 9/14/2000)
1221         */
1222     int pe=0;/*<- node-local processor number. Any one will do.*/
1223     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1224     MACHSTATE(2,"Incoming CCS request");
1225     CmiPushPE(pe,cmsg);
1226   }
1227 #endif
1228 #ifdef __FAULT__        
1229   else if(strcmp(msg.header.type,"crashnode")==0) {
1230         crash_node_handle(&msg);
1231   }
1232   else if(strcmp(msg.header.type,"initnodetab")==0) {
1233         /** A processor crashed and got recreated. So charmrun sent 
1234           across the whole nodetable data to update this processor*/
1235         node_addresses_store(&msg);
1236         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1237   }
1238 #endif
1239   else {
1240   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1241    * which is not available to the communication thread on an SMP version.
1242    */
1243     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1244     machine_exit(1);
1245   }
1246   
1247   MACHSTATE(2,"ctrl_getone done")
1248   ChMessage_free(&msg);
1249 }
1250
1251 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1252 /*Deliver this reply data to this reply socket.
1253   The data is forwarded to CCS server via charmrun.*/
1254 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1255 {
1256   MACHSTATE(2,"Outgoing CCS reply");
1257   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1258       repData,repLen);
1259   MACHSTATE(1,"Outgoing CCS reply away");
1260 }
1261 #endif
1262
1263 /*****************************************************************************
1264  *
1265  * CmiPrintf, CmiError, CmiScanf
1266  *
1267  *****************************************************************************/
1268 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1269 static void InternalPrintf(const char *f, va_list l)
1270 {
1271   ChMessage replymsg;
1272   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1273   CmiStdoutFlush();
1274   vsprintf(buffer, f, l);
1275   if(Cmi_syncprint) {
1276           CmiCommLock();
1277           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1278           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1279           ChMessage_free(&replymsg);
1280           CmiCommUnlock();
1281   } else {
1282           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1283   }
1284   InternalWriteToTerminal(0,buffer,strlen(buffer));
1285   CmiTmpFree(buffer);
1286 }
1287
1288 static void InternalError(const char *f, va_list l)
1289 {
1290   ChMessage replymsg;
1291   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1292   CmiStdoutFlush();
1293   vsprintf(buffer, f, l);
1294   if(Cmi_syncprint) {
1295           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1296           CmiCommLock();
1297           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1298           ChMessage_free(&replymsg);
1299           CmiCommUnlock();
1300   } else {
1301           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1302   }
1303   InternalWriteToTerminal(1,buffer,strlen(buffer));
1304   CmiTmpFree(buffer);
1305 }
1306
1307 static int InternalScanf(char *fmt, va_list l)
1308 {
1309   ChMessage replymsg;
1310   char *ptr[20];
1311   char *p; int nargs, i;
1312   nargs=0;
1313   p=fmt;
1314   while (*p) {
1315     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1316     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1317     if (p[0]=='%') { nargs++; p++; continue; }
1318     if (*p=='\n') *p=' '; p++;
1319   }
1320   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1321   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1322   CmiLock(Cmi_scanf_mutex);
1323   if (Cmi_charmrun_fd!=-1)
1324   {/*Send charmrun the format string*/
1325         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1326         /*Wait for the reply (characters to scan) from charmrun*/
1327         CmiCommLock();
1328         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1329         i = sscanf((char*)replymsg.data, fmt,
1330                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1331                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1332                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1333         ChMessage_free(&replymsg);
1334         CmiCommUnlock();
1335   } else
1336   {/*Just do the scanf normally*/
1337         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1338                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1339                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1340   }
1341   CmiUnlock(Cmi_scanf_mutex);
1342   return i;
1343 }
1344
1345 #if CMK_CMIPRINTF_IS_A_BUILTIN
1346
1347 /*New stdarg.h declarations*/
1348 void CmiPrintf(const char *fmt, ...)
1349 {
1350   CpdSystemEnter();
1351   {
1352   va_list p; va_start(p, fmt);
1353   if (Cmi_charmrun_fd!=-1)
1354     InternalPrintf(fmt, p);
1355   else
1356     vfprintf(stdout,fmt,p);
1357   va_end(p);
1358   }
1359   CpdSystemExit();
1360 }
1361
1362 void CmiError(const char *fmt, ...)
1363 {
1364   CpdSystemEnter();
1365   {
1366   va_list p; va_start (p, fmt);
1367   if (Cmi_charmrun_fd!=-1)
1368     InternalError(fmt, p);
1369   else
1370     vfprintf(stderr,fmt,p);
1371   va_end(p);
1372   }
1373   CpdSystemExit();
1374 }
1375
1376 int CmiScanf(const char *fmt, ...)
1377 {
1378   int i;
1379   CpdSystemEnter();
1380   {
1381   va_list p; va_start(p, fmt);
1382   i = InternalScanf((char *)fmt, p);
1383   va_end(p);
1384   }
1385   CpdSystemExit();
1386   return i;
1387 }
1388
1389 #endif
1390
1391 /***************************************************************************
1392  * Output redirection:
1393  *  When people don't use CkPrintf, like above, we'd still like to be able
1394  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1395  * which lets us read the characters sent to stdout at our lesiure.
1396  ***************************************************************************/
1397
1398 /*Can read from stdout or stderr using these fd's*/
1399 static int readStdout[2]; 
1400 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1401 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1402 #define readStdoutBufLen (16*1024)
1403 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1404 static int servicingStdout;
1405
1406 /*Initialization-- should only be called once per node*/
1407 static void CmiStdoutInit(void) {
1408         int i;
1409         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1410
1411 /*There's some way to do this same thing in windows, but I don't know how*/
1412 #if !defined(_WIN32) || defined(__CYGWIN__)
1413         /*Prevent buffering in stdio library:*/
1414         setbuf(stdout,NULL); setbuf(stderr,NULL);
1415
1416         /*Reopen stdout and stderr fd's as new pipes:*/
1417         for (i=0;i<2;i++) {
1418                 int pair[2];
1419                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1420                 
1421                 /*First, save a copy of the original stdout*/
1422                 writeStdout[i]=dup(srcFd);
1423 #if 0
1424                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1425                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1426 #else
1427                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1428                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1429                         {perror("building stdio redirection socketpair"); exit(1);}
1430 #endif
1431                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1432                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1433                 
1434 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1435                 CmiEnableNonblockingIO(srcFd);
1436 #endif
1437 #if CMK_SHARED_VARS_UNAVAILABLE
1438                 if (Cmi_asyncio)
1439                 {
1440   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1441                         CmiEnableAsyncIO(readStdout[i]);
1442                 }
1443 #endif
1444         }
1445 #else
1446 /*Windows system-- just fake reads for now*/
1447 # ifndef read
1448 #  define read(x,y,z) 0
1449 # endif
1450 # ifndef write
1451 #  define write(x,y,z) 
1452 # endif
1453 #endif
1454 }
1455
1456 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1457 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1458 {
1459         write(writeStdout[isStdErr],str,len);   
1460 }
1461
1462 /*
1463   Service this particular stdout pipe.  
1464   Must hold comm. lock.
1465 */
1466 static void CmiStdoutServiceOne(int i) {
1467         int nBytes;
1468         const static char *cmdName[2]={"print","printerr"};
1469         servicingStdout=1;
1470         while(1) {
1471                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1472                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1473                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1474                 if (nBytes<=0) break; /*Nothing to send*/
1475                 
1476                 /*Send these bytes off to charmrun*/
1477                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1478                 nBytes++; /*Include zero-terminator in message to charmrun*/
1479                 
1480                 if (nBytes>=readStdoutBufLen-100) 
1481                 { /*We must have filled up our output pipe-- most output libraries
1482                    don't handle this well (e.g., glibc printf just drops the line).*/
1483                         
1484                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1485                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1486                         nBytes--; /*Remove terminator from user's data*/
1487                         tooMuchLen=strlen(tooMuchWarn)+1;
1488                 }
1489                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1490                                     tooMuchWarn,tooMuchLen);
1491                 
1492                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1493         }
1494         servicingStdout=0;
1495         serviceStdout[i]=0; /*This pipe is now serviced*/
1496 }
1497
1498 /*Service all stdout pipes, whether it looks like they need it
1499   or not.  Used when you aren't sure if select() has been called recently.
1500   Must hold comm. lock.
1501 */
1502 static void CmiStdoutServiceAll(void) {
1503         int i;
1504         for (i=0;i<2;i++) {
1505                 if (readStdout[i]==0) continue; /*Pipe not open*/
1506                 CmiStdoutServiceOne(i);
1507         }
1508 }
1509
1510 /*Service any outstanding stdout pipes.
1511   Must hold comm. lock.
1512 */
1513 static void CmiStdoutService(void) {
1514         CmiStdoutServiceAll();
1515 }
1516
1517 /*Add our pipes to the pile for select() or poll().
1518   Both can be called with or without the comm. lock.
1519 */
1520 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1521         int i;
1522         for (i=0;i<2;i++) {
1523                 if (readStdout[i]==0) continue; /*Pipe not open*/
1524                 CMK_PIPE_ADDREAD(readStdout[i]);
1525         }
1526 }
1527 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1528         int i;
1529         for (i=0;i<2;i++) {
1530                 if (readStdout[i]==0) continue; /*Pipe not open*/
1531                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1532         }
1533 }
1534 static int CmiStdoutNeedsService(void) {
1535         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1536 }
1537
1538 /*Called every few milliseconds to flush the stdout pipes*/
1539 static void CmiStdoutFlush(void) {
1540         if (servicingStdout) return; /* might be called by SIGALRM */
1541         CmiCommLockOrElse( return; )
1542         CmiCommLock();
1543         CmiStdoutServiceAll();
1544         CmiCommUnlock();
1545 }
1546
1547 /***************************************************************************
1548  * Message Delivery:
1549  *
1550  ***************************************************************************/
1551
1552 #include "machine-dgram.c"
1553
1554
1555 #ifndef CmiNodeFirst
1556 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1557 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1558 #endif
1559
1560 #ifndef CmiNodeOf
1561 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1562 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1563 #endif
1564
1565
1566 /*****************************************************************************
1567  *
1568  * node_addresses
1569  *
1570  *  These two functions fill the node-table.
1571  *
1572  *
1573  *   This node, like all others, first sends its own address to charmrun
1574  *   using this command:
1575  *
1576  *     Type: nodeinfo
1577  *     Data: Big-endian 4-byte ints
1578  *           <my-node #><Dataport>
1579  *
1580  *   When charmrun has all the addresses, he sends this table to me:
1581  *
1582  *     Type: nodes
1583  *     Data: Big-endian 4-byte ints
1584  *           <number of nodes n>
1585  *           <#PEs><IP><Dataport> Node 0
1586  *           <#PEs><IP><Dataport> Node 1
1587  *           ...
1588  *           <#PEs><IP><Dataport> Node n-1
1589  *
1590  *****************************************************************************/
1591
1592 #if CMK_USE_IBVERBS
1593 void copyInfiAddr(ChInfiAddr *qpList);
1594 #endif
1595
1596 #if CMK_IBVERBS_FAST_START
1597 static void send_partial_init()
1598 {
1599   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1600         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1601 }       
1602 #endif
1603
1604
1605 /*Note: node_addresses_obtain is called before starting
1606   threads, so no locks are needed (or valid!)*/
1607 static void node_addresses_obtain(char **argv)
1608 {
1609   ChMessage nodetabmsg; /* info about all nodes*/
1610   MACHSTATE(3,"node_addresses_obtain { ");
1611   if (Cmi_charmrun_fd==-1) 
1612   {/*Standalone-- fake a single-node nodetab message*/
1613         int npes=1;
1614         ChSingleNodeinfo *fakeTab;
1615         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1616         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1617         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1618 #if CMK_SHARED_VARS_UNAVAILABLE
1619         if (npes!=1) {
1620                 fprintf(stderr,
1621                         "To use multiple processors, you must run this program as:\n"
1622                         " > charmrun +p%d %s <args>\n"
1623                         "or build the %s-smp version of Charm++.\n",
1624                         npes,argv[0],CMK_MACHINE_NAME);
1625                 exit(1);
1626         }
1627 #else
1628         /* standalone smp version reads ppn */
1629         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1630                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1631           npes = _Cmi_mynodesize;
1632 #endif
1633         /*This is a stupid hack: we expect the *number* of nodes
1634         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1635         (which happens to have exactly that layout!) and stuff
1636         a 1 into the "node number" slot
1637         */
1638         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1639         fakeTab->info.nPE=ChMessageInt_new(npes);
1640         fakeTab->info.dataport=ChMessageInt_new(0);
1641         fakeTab->info.IP=_skt_invalid_ip;
1642   }
1643   else 
1644   { /*Contact charmrun for machine info.*/
1645         ChSingleNodeinfo me;
1646
1647         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1648
1649 #if CMK_USE_IBVERBS
1650         {
1651                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1652                 me.info.qpList = malloc(qpListSize);
1653                 copyInfiAddr(me.info.qpList);
1654                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1655                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1656                 free(me.info.qpList);
1657         }
1658 #else
1659         /*The nPE and IP fields are set by charmrun--
1660           these values don't matter. */
1661         me.info.nPE=ChMessageInt_new(0);
1662         me.info.IP=_skt_invalid_ip;
1663         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1664 #ifdef CMK_USE_MX
1665         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1666 #endif
1667 #if CMK_USE_IBUD
1668         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1669         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1670         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1671         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1672 #endif
1673         me.info.dataport=ChMessageInt_new(dataport);
1674
1675         /*Send our node info. to charmrun.
1676         CommLock hasn't been initialized yet-- 
1677         use non-locking version*/
1678         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1679         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1680 #endif  //CMK_USE_IBVERBS
1681
1682         MACHSTATE(3,"initnode sent");
1683   
1684         /*We get the other node addresses from a message sent
1685           back via the charmrun control port.*/
1686         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1687                 CmiAbort("Timeout waiting for nodetab!\n");
1688         }
1689         MACHSTATE(2,"recv initnode {");
1690         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1691         MACHSTATE(2,"} recv initnode");
1692   }
1693 //#if CMK_USE_IBVERBS   
1694 //#else
1695   node_addresses_store(&nodetabmsg);
1696   ChMessage_free(&nodetabmsg);
1697 //#endif        
1698   MACHSTATE(3,"} node_addresses_obtain ");
1699 }
1700
1701 #if CMK_NODE_QUEUE_AVAILABLE
1702
1703 /***********************************************************************
1704  * DeliverOutgoingNodeMessage()
1705  *
1706  * This function takes care of delivery of outgoing node messages from the
1707  * sender end. Broadcast messages are divided into sets of messages that 
1708  * are bound to the local node, and to remote nodes. For local
1709  * transmission, the messages are directly pushed into the recv
1710  * queues. For non-local transmission, the function DeliverViaNetwork()
1711  * is called
1712  ***********************************************************************/
1713 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1714 {
1715   int i, rank, dst; OtherNode node;
1716
1717   dst = ogm->dst;
1718   switch (dst) {
1719   case NODE_BROADCAST_ALL:
1720     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1721     /*case-fallthrough (no break)-- deliver to all other processors*/
1722   case NODE_BROADCAST_OTHERS:
1723 #if CMK_BROADCAST_SPANNING_TREE
1724     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1725 #elif CMK_BROADCAST_HYPERCUBE
1726     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1727 #else
1728     for (i=0; i<_Cmi_numnodes; i++)
1729       if (i!=_Cmi_mynode)
1730         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
1731 #endif
1732     GarbageCollectMsg(ogm);
1733     break;
1734   default:
1735     node = nodes+dst;
1736     rank=DGRAM_NODEMESSAGE;
1737     if (dst != _Cmi_mynode) {
1738       DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1739       GarbageCollectMsg(ogm);
1740     } else {
1741       if (ogm->freemode == 'A') {
1742         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1743         ogm->freemode = 'X';
1744       } else {
1745         CmiPushNode(ogm->data);
1746         FreeOutgoingMsg(ogm);
1747       }
1748     }
1749   }
1750 }
1751
1752 #else
1753
1754 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1755
1756 #endif
1757
1758 #if CMK_C_INLINE
1759 inline static
1760 #endif
1761 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
1762 #if CMK_USE_SYSVSHM
1763         {
1764 #if SYSVSHM_STATS
1765         double _startValidTime = CmiWallTimer();
1766 #endif
1767         int ret=CmiValidSysvshm(ogm,node);
1768 #if SYSVSHM_STATS
1769         sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1770 #endif                  
1771         MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
1772         if(ret){
1773                 CmiSendMessageSysvshm(ogm,node,rank,broot);
1774         }else{
1775                 DeliverViaNetwork(ogm, node, rank, broot,copy);
1776         }
1777         } 
1778 #elif CMK_USE_PXSHM
1779      {
1780 #if PXSHM_STATS
1781         double _startValidTime = CmiWallTimer();
1782 #endif
1783       int ret=CmiValidPxshm(ogm,node);
1784 #if PXSHM_STATS
1785         pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1786 #endif                  
1787       MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
1788       if(ret){
1789          CmiSendMessagePxshm(ogm,node,rank,broot);
1790        }else{
1791          DeliverViaNetwork(ogm, node, rank, broot,copy);
1792        }
1793       } 
1794 #else
1795       DeliverViaNetwork(ogm, node, rank, broot, copy);
1796 #endif                  
1797         
1798 }
1799
1800
1801
1802 /***********************************************************************
1803  * DeliverOutgoingMessage()
1804  *
1805  * This function takes care of delivery of outgoing messages from the
1806  * sender end. Broadcast messages are divided into sets of messages that 
1807  * are bound to the local node, and to remote nodes. For local
1808  * transmission, the messages are directly pushed into the recv
1809  * queues. For non-local transmission, the function DeliverViaNetwork()
1810  * is called
1811  ***********************************************************************/
1812 int DeliverOutgoingMessage(OutgoingMsg ogm)
1813 {
1814   int i, rank, dst; OtherNode node;
1815         
1816   int network = 1;
1817
1818         
1819   dst = ogm->dst;
1820
1821   switch (dst) {
1822   case PE_BROADCAST_ALL:
1823 #if !CMK_SMP_NOT_RELAX_LOCK       
1824     CmiCommLock();
1825 #endif
1826     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1827       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1828     }
1829 #if CMK_BROADCAST_SPANNING_TREE
1830     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1831 #elif CMK_BROADCAST_HYPERCUBE
1832     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1833 #else
1834     for (i=0; i<_Cmi_numnodes; i++)
1835       if (i!=_Cmi_mynode){
1836         /*FAULT_EVAC : is the target processor valid*/
1837         if(CmiNodeAlive(i)){
1838           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1839         }
1840       } 
1841 #endif
1842     GarbageCollectMsg(ogm);
1843 #if !CMK_SMP_NOT_RELAX_LOCK               
1844     CmiCommUnlock();
1845 #endif    
1846     break;
1847   case PE_BROADCAST_OTHERS:
1848 #if !CMK_SMP_NOT_RELAX_LOCK               
1849     CmiCommLock();
1850 #endif  
1851     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1852       if (rank + Cmi_nodestart != ogm->src) {
1853         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1854       }
1855 #if CMK_BROADCAST_SPANNING_TREE
1856     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1857 #elif CMK_BROADCAST_HYPERCUBE
1858     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1859 #else
1860     for (i = 0; i<_Cmi_numnodes; i++)
1861       if (i!=_Cmi_mynode){
1862         /*FAULT_EVAC : is the target processor valid*/
1863         if(CmiNodeAlive(i)){
1864           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1865         }
1866       } 
1867 #endif
1868     GarbageCollectMsg(ogm);
1869 #if !CMK_SMP_NOT_RELAX_LOCK               
1870     CmiCommUnlock();
1871 #endif    
1872     break;
1873   default:
1874 #ifndef CMK_OPTIMIZE
1875     if (dst<0 || dst>=CmiNumPes())
1876       CmiAbort("Send to out-of-bounds processor!");
1877 #endif
1878     node = nodes_by_pe[dst];
1879     rank = dst - node->nodestart;
1880     if (node->nodestart != Cmi_nodestart) {
1881 #if !CMK_SMP_NOT_RELAX_LOCK                     
1882         CmiCommLock();
1883 #endif          
1884         DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1885         GarbageCollectMsg(ogm);
1886 #if !CMK_SMP_NOT_RELAX_LOCK                     
1887         CmiCommUnlock();
1888 #endif          
1889     } else {
1890       network = 0;
1891       if (ogm->freemode == 'A') {
1892         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1893         ogm->freemode = 'X';
1894       } else {
1895         CmiPushPE(rank, ogm->data);
1896         FreeOutgoingMsg(ogm);
1897       }
1898     }
1899   }
1900 #if CMK_MULTICORE
1901   network = 0;
1902 #endif
1903   return network;
1904 }
1905
1906
1907 /******************************************************************************
1908  *
1909  * CmiGetNonLocal
1910  *
1911  * The design of this system is that the communication thread does all the
1912  * work, to eliminate as many locking issues as possible.  This is the only
1913  * part of the code that happens in the receiver-thread.
1914  *
1915  * This operation is fairly cheap, it might be worthwhile to inline
1916  * the code into CmiDeliverMsgs to reduce function call overhead.
1917  *
1918  *****************************************************************************/
1919
1920 #if CMK_NODE_QUEUE_AVAILABLE
1921 char *CmiGetNonLocalNodeQ(void)
1922 {
1923   char *result = 0;
1924   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1925     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1926     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1927     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1928   }
1929   return result;
1930 }
1931 #endif
1932
1933 void *CmiGetNonLocal(void)
1934 {
1935 #if CMK_SMP_MULTIQ
1936   int i;
1937 #endif
1938
1939   CmiState cs = CmiGetState();
1940   CmiIdleLock_checkMessage(&cs->idle);
1941
1942 #if !CMK_SMP_MULTIQ
1943   return (void *) PCQueuePop(cs->recv);
1944 #else
1945   void *retVal = NULL;
1946   for(i=cs->curPolledIdx; i<MULTIQ_GRPSIZE; i++){
1947     retVal = (void *)PCQueuePop(cs->recv[i]);
1948     if(retVal!=NULL) {
1949         cs->curPolledIdx = i+1;
1950         return retVal;
1951     }
1952   }
1953   cs->curPolledIdx=0;
1954   return NULL;
1955 #endif
1956 }
1957
1958
1959 /**
1960  * Set up an OutgoingMsg structure for this message.
1961  */
1962 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1963   OutgoingMsg ogm;
1964   MallocOutgoingMsg(ogm);
1965   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1966   ogm->size = size;
1967   ogm->data = data;
1968   ogm->src = cs->pe;
1969   ogm->dst = pe;
1970   ogm->freemode = freemode;
1971   ogm->refcount = 0;
1972   return (CmiCommHandle)ogm;    
1973 }
1974
1975 #if CMK_NODE_QUEUE_AVAILABLE
1976
1977 /******************************************************************************
1978  *
1979  * CmiGeneralNodeSend
1980  *
1981  * Description: This is a generic message sending routine. All the
1982  * converse message send functions are implemented in terms of this
1983  * function. (By setting appropriate flags (eg freemode) that tell
1984  * CmiGeneralSend() how exactly to handle the particular case of
1985  * message send)
1986  *
1987  *****************************************************************************/
1988
1989 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
1990 {
1991   CmiState cs = CmiGetState(); OutgoingMsg ogm;
1992   MACHSTATE(1,"CmiGeneralNodeSend {");
1993
1994   if (freemode == 'S') {
1995     char *copy = (char *)CmiAlloc(size);
1996     if (!copy)
1997       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
1998     memcpy(copy, data, size);
1999     data = copy; freemode = 'F';
2000   }
2001
2002 #if CMK_IMMEDIATE_MSG
2003     /* execute the immediate message right away */
2004   if (node == CmiMyNode() && CmiIsImmediate(data)) {
2005     CmiPushImmediateMsg(data);
2006       /* only communication thread executes immediate messages in SMP */
2007     if (!_immRunning) CmiHandleImmediate();
2008     return;
2009   }
2010 #endif
2011
2012   CmiMsgHeaderSetLength(data, size);
2013   ogm=PrepareOutgoing(cs,node,size,freemode,data);
2014   CmiCommLock();
2015   DeliverOutgoingNodeMessage(ogm);
2016   CmiCommUnlock();
2017   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2018   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2019   MACHSTATE(1,"} CmiGeneralNodeSend");
2020   return (CmiCommHandle)ogm;
2021 }
2022
2023 #endif
2024
2025
2026 /******************************************************************************
2027  *
2028  * CmiGeneralSend
2029  *
2030  * Description: This is a generic message sending routine. All the
2031  * converse message send functions are implemented in terms of this
2032  * function. (By setting appropriate flags (eg freemode) that tell
2033  * CmiGeneralSend() how exactly to handle the particular case of
2034  * message send)
2035  *
2036  *****************************************************************************/
2037
2038 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2039 {
2040   int sendonnetwork;
2041   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2042   MACHSTATE(1,"CmiGeneralSend {");
2043
2044   if (freemode == 'S') {
2045 #if CMK_USE_GM
2046     if (pe != cs->pe) {
2047       freemode = 'G';
2048     }
2049     else
2050 #endif
2051     {
2052     char *copy = (char *)CmiAlloc(size);
2053     if (!copy)
2054       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2055     memcpy(copy, data, size);
2056     data = copy; freemode = 'F';
2057     }
2058   }
2059   
2060   if (pe == cs->pe) {
2061 #if ! CMK_SMP
2062     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
2063                       The SMP comm thread never gets here, because of the pe test. */
2064 #endif
2065     {
2066 #if CMK_IMMEDIATE_MSG
2067       /* execute the immediate message right away */
2068       /* but to avoid infinite recursive call, don't do this if _immRunning */
2069     if (CmiIsImmediate(data)) {
2070       CmiPushImmediateMsg(data);
2071       CmiHandleImmediate();
2072       return 0;
2073     }
2074 #endif
2075     CdsFifo_Enqueue(cs->localqueue, data);
2076     if (freemode == 'A') {
2077       MallocOutgoingMsg(ogm);
2078       ogm->freemode = 'X';
2079       return ogm;
2080     } else return 0;
2081     }
2082   }
2083
2084 #if CMK_PERSISTENT_COMM
2085   if (phs) {
2086       CmiAssert(phsSize == 1);
2087       CmiSendPersistentMsg(*phs, pe, size, data);
2088       return NULL;
2089   }
2090 #endif
2091
2092   CmiMsgHeaderSetLength(data, size);
2093   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
2094
2095   #if CMK_SMP_NOT_RELAX_LOCK  
2096   CmiCommLock();
2097 #endif  
2098   
2099   sendonnetwork = DeliverOutgoingMessage(ogm);
2100   
2101 #if CMK_SMP_NOT_RELAX_LOCK  
2102   CmiCommUnlock();
2103 #endif  
2104   
2105   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2106 #if CMK_USE_SYSVSHM
2107         CommunicationServerSysvshm();
2108 #elif CMK_USE_PXSHM
2109         CommunicationServerPxshm();
2110 #endif
2111 #if !CMK_SHARED_VARS_UNAVAILABLE
2112 #if !CMK_SMP_NOT_SKIP_COMMSERVER
2113   if (sendonnetwork!=0)   /* only call server when we send msg on network in SMP */
2114 #endif
2115 #endif
2116   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2117   MACHSTATE(1,"}  CmiGeneralSend");
2118   return (CmiCommHandle)ogm;
2119 }
2120
2121
2122 void CmiSyncSendFn(int p, int s, char *m)
2123
2124   CQdCreate(CpvAccess(cQdState), 1);
2125   CmiGeneralSend(p,s,'S',m); 
2126 }
2127
2128 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2129
2130   CQdCreate(CpvAccess(cQdState), 1);
2131   return CmiGeneralSend(p,s,'A',m); 
2132 }
2133
2134 void CmiFreeSendFn(int p, int s, char *m)
2135
2136   CQdCreate(CpvAccess(cQdState), 1);
2137   CmiGeneralSend(p,s,'F',m); 
2138 }
2139
2140 void CmiSyncBroadcastFn(int s, char *m)
2141
2142   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2143   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
2144 }
2145
2146 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2147
2148   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2149   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
2150 }
2151
2152 void CmiFreeBroadcastFn(int s, char *m)
2153
2154   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
2155   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
2156 }
2157
2158 void CmiSyncBroadcastAllFn(int s, char *m)
2159
2160   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2161   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
2162 }
2163
2164 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2165
2166   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2167   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
2168 }
2169
2170 void CmiFreeBroadcastAllFn(int s, char *m)
2171
2172   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2173   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
2174 }
2175
2176 #if CMK_NODE_QUEUE_AVAILABLE
2177
2178 void CmiSyncNodeSendFn(int p, int s, char *m)
2179
2180   CQdCreate(CpvAccess(cQdState), 1);
2181   CmiGeneralNodeSend(p,s,'S',m); 
2182 }
2183
2184 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
2185
2186   CQdCreate(CpvAccess(cQdState), 1);
2187   return CmiGeneralNodeSend(p,s,'A',m); 
2188 }
2189
2190 void CmiFreeNodeSendFn(int p, int s, char *m)
2191
2192   CQdCreate(CpvAccess(cQdState), 1);
2193   CmiGeneralNodeSend(p,s,'F',m); 
2194 }
2195
2196 void CmiSyncNodeBroadcastFn(int s, char *m)
2197
2198   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2199   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
2200 }
2201
2202 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
2203
2204   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2205   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
2206 }
2207
2208 void CmiFreeNodeBroadcastFn(int s, char *m)
2209
2210   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2211   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
2212 }
2213
2214 void CmiSyncNodeBroadcastAllFn(int s, char *m)
2215
2216   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2217   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
2218 }
2219
2220 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
2221
2222   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2223   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
2224 }
2225
2226 void CmiFreeNodeBroadcastAllFn(int s, char *m)
2227
2228   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2229   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
2230 }
2231 #endif
2232
2233 /******************************************************************************
2234  *
2235  * Comm Handle manipulation.
2236  *
2237  *****************************************************************************/
2238
2239 int CmiAsyncMsgSent(CmiCommHandle handle)
2240 {
2241   return (((OutgoingMsg)handle)->freemode == 'X');
2242 }
2243
2244 void CmiReleaseCommHandle(CmiCommHandle handle)
2245 {
2246   FreeOutgoingMsg(((OutgoingMsg)handle));
2247 }
2248
2249 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
2250
2251 /*****************************************************************************
2252  *
2253  * NET version List-Cast and Multicast Code
2254  *
2255  ****************************************************************************/
2256                                                                                 
2257 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2258 {
2259   int i;
2260   for(i=0;i<npes;i++) {
2261     CmiReference(msg);
2262     CmiSyncSendAndFree(pes[i], len, msg);
2263   }
2264 }
2265                                                                                 
2266 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2267 {
2268   CmiError("ListSend not implemented.");
2269   return (CmiCommHandle) 0;
2270 }
2271                                                                                 
2272 /* 
2273   because in all net versions, the message buffer after CmiSyncSendAndFree
2274   returns is not changed, we can use memory reference trick to avoid 
2275   memory copying here
2276 */
2277 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2278 {
2279   int i;
2280   for(i=0;i<npes;i++) {
2281     CmiReference(msg);
2282     CmiSyncSendAndFree(pes[i], len, msg);
2283   }
2284   CmiFree(msg);
2285 }
2286
2287 #endif
2288
2289 #if CMK_BROADCAST_SPANNING_TREE
2290 /*
2291   if root is 1, it is called from the broadcast root, only ogm is needed;
2292   if root is 0, it is called in the tree, ogm must be NULL and msg, size and startpe are needed
2293   note: function leaves msg buffer untouched
2294 */
2295 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
2296 {
2297   CmiState cs = CmiGetState();
2298   int i;
2299
2300   if (root) startpe = _Cmi_mynode;
2301   else ogm = NULL;
2302
2303   CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
2304
2305   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
2306     int p = _Cmi_mynode-startpe;
2307     if (p<0) p+=_Cmi_numnodes;
2308     p = BROADCAST_SPANNING_FACTOR*p + i;
2309     if (p > _Cmi_numnodes - 1) break;
2310     p += startpe;
2311     p = p%_Cmi_numnodes;
2312     CmiAssert(p!=_Cmi_mynode);
2313     /* CmiPrintf("SendSpanningChildren: %d => %d\n", _Cmi_mynode, p); */
2314     if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2315   //  DeliverViaNetwork(ogm, nodes + p, noderank, startpe, 1);
2316     DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
2317   }
2318   if (!root && ogm) GarbageCollectMsg(ogm);
2319 }
2320 #endif
2321
2322 #if CMK_BROADCAST_HYPERCUBE
2323 int log_of_2 (int i) {
2324   int m;
2325   for (m=0; i>(1<<m); ++m);
2326   return m;
2327 }
2328
2329 /* called from root - send msg along the hypercube in broadcast.
2330   note: function leaves msg buffer untouched
2331 */
2332 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
2333 {
2334   CmiState cs = CmiGetState();
2335   int i, k, npes, tmp;
2336   int *dest_pes;
2337
2338   if (root) {
2339     msg = ogm->data;
2340     srcpe = CmiMyNode();
2341   }
2342   else ogm = NULL;
2343
2344   tmp = srcpe ^ CmiMyNode();
2345   k = log_of_2(CmiNumNodes()) + 2;
2346   if (tmp) {
2347      do {--k;} while (!(tmp>>k));
2348   }
2349
2350         MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
2351
2352   dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
2353   k--;
2354   npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
2355   
2356   for (i = 0; i < npes; i++) {
2357     int p = dest_pes[i];
2358     /* CmiPrintf("SendHypercube: %d => %d (%d)\n", cs->pe, p, i); */
2359     if (!root && ! ogm) 
2360       ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2361     DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
2362   }
2363   if (!root && ogm) GarbageCollectMsg(ogm);
2364   CmiTmpFree(dest_pes);
2365 }
2366 #endif
2367
2368 /*
2369 #if CMK_IMMEDIATE_MSG
2370 void CmiProbeImmediateMsg()
2371 {
2372   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2373 }
2374 #endif
2375 */
2376
2377 /* Network progress function is used to poll the network when for
2378    messages. This flushes receive buffers on some implementations*/ 
2379 void CmiMachineProgressImpl()
2380 {
2381 #if CMK_USE_SYSVSHM
2382         CommunicationServerSysvshm();
2383 #elif CMK_USE_PXSHM
2384         CommunicationServerPxshm();
2385 #endif
2386   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2387 }
2388
2389 /******************************************************************************
2390  *
2391  * Main code, Init, and Exit
2392  *
2393  *****************************************************************************/
2394 extern void CthInit(char **argv);
2395 extern void ConverseCommonInit(char **);
2396
2397 static char     **Cmi_argv;
2398 static char     **Cmi_argvcopy;
2399 static CmiStartFn Cmi_startfn;   /* The start function */
2400 static int        Cmi_usrsched;  /* Continue after start function finishes? */
2401
2402 static void ConverseRunPE(int everReturn)
2403 {
2404   CmiIdleState *s=CmiNotifyGetState();
2405   CmiState cs;
2406   char** CmiMyArgv;
2407   CmiNodeAllBarrier();
2408   cs = CmiGetState();
2409   CpvInitialize(void *,CmiLocalQueue);
2410   CpvAccess(CmiLocalQueue) = cs->localqueue;
2411
2412   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
2413   if (CmiMyRank())
2414     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
2415   else
2416     CmiMyArgv = Cmi_argv;
2417   CthInit(CmiMyArgv);
2418
2419 #if CMK_USE_GM
2420   CmiCheckGmStatus();
2421 #endif
2422
2423   ConverseCommonInit(CmiMyArgv);
2424
2425   /* initialize the network progress counter*/
2426   /* Network progress function is used to poll the network when for
2427      messages. This flushes receive buffers on some  implementations*/ 
2428   CpvInitialize(int , networkProgressCount);
2429   CpvAccess(networkProgressCount) = 0;
2430
2431   /* better to show the status here */
2432   if (CmiMyPe() == 0) {
2433     if (Cmi_netpoll == 1) {
2434       CmiPrintf("Charm++: scheduler running in netpoll mode.\n");
2435     }
2436 #if CMK_SHARED_VARS_UNAVAILABLE
2437     else {
2438       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
2439         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
2440     }
2441 #endif
2442   }
2443 #if MEMORYUSAGE_OUTPUT
2444   memoryusage_counter = 0;
2445 #endif
2446 #if CMK_USE_GM || CMK_USE_MX
2447   if (Cmi_charmrun_fd != -1)
2448 #endif
2449   {
2450   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
2451       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
2452   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
2453       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
2454 #if CMK_USE_SYSVSHM
2455          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
2456          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
2457 #elif CMK_USE_PXSHM
2458                 //TODO: add pxshm notify idle
2459          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
2460          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
2461 #endif
2462   }
2463
2464 #if CMK_SHARED_VARS_UNAVAILABLE
2465   if (Cmi_netpoll) /*Repeatedly call CommServer*/
2466     CcdCallOnConditionKeep(CcdPERIODIC, 
2467         (CcdVoidFn) CommunicationPeriodic, NULL);
2468   else /*Only need this for retransmits*/
2469     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
2470         (CcdVoidFn) CommunicationPeriodic, NULL);
2471 #endif
2472
2473   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
2474     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
2475 #if CMK_SHARED_VARS_UNAVAILABLE
2476     if (!Cmi_asyncio) {
2477     /* gm cannot live with setitimer */
2478     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
2479     }
2480     else {
2481     /*Occasionally ping charmrun, to test if it's dead*/
2482     struct itimerval i;
2483     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
2484 #if MEMORYUSAGE_OUTPUT
2485     i.it_interval.tv_sec = 0;
2486     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2487     i.it_value.tv_sec = 0;
2488     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2489 #else
2490     i.it_interval.tv_sec = 1;
2491     i.it_interval.tv_usec = 0;
2492     i.it_value.tv_sec = 1;
2493     i.it_value.tv_usec = 0;
2494 #endif
2495     setitimer(ITIMER_REAL, &i, NULL);
2496     }
2497
2498 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
2499     /*Occasionally check for retransmissions, outgoing acks, etc.*/
2500     /*no need for GM case */
2501     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
2502 #endif
2503 #endif
2504     
2505     /*Initialize the clock*/
2506     Cmi_clock=GetClock();
2507   }
2508
2509 #ifdef IGET_FLOWCONTROL 
2510   /* Call the function once to determine the amount of physical memory available */
2511   getAvailSysMem();
2512   /* Call the function to periodically call the token adapt function */
2513   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
2514   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
2515         (CcdVoidFn) TokenUpdatePeriodic, NULL);
2516 #endif
2517   
2518 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
2519   srand((int)(1024.0*CmiWallTimer()));
2520   if (CmiMyPe()==0)
2521     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
2522         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
2523 #endif
2524
2525 #ifdef __ONESIDED_IMPL
2526 #ifdef __ONESIDED_NO_HARDWARE
2527   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
2528   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
2529   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2530   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2531 #endif
2532 #ifdef __ONESIDED_GM_HARDWARE
2533   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2534   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2535 #endif
2536 #endif
2537
2538   /* communication thread */
2539   if (CmiMyRank() == CmiMyNodeSize()) {
2540     if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2541     if (Cmi_charmrun_fd!=-1)
2542           while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
2543   }
2544   else{
2545     if (!everReturn) {
2546       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2547       /* turn on immediate messages only now
2548        node barrier previously should take care of the node synchronization */
2549       _immediateReady = 1;
2550       if (Cmi_usrsched==0) CsdScheduler(-1);
2551       ConverseExit();
2552     }else{
2553       _immediateReady = 1;
2554     }
2555   }
2556 }
2557
2558 void ConverseExit(void)
2559 {
2560   MACHSTATE(2,"ConverseExit {");
2561   machine_initiated_shutdown=1;
2562   if (CmiMyRank()==0) {
2563     if(Cmi_print_stats)
2564       printNetStatistics();
2565     log_done();
2566   }
2567 #if CMK_USE_SYSVSHM
2568         CmiExitSysvshm();
2569 #elif CMK_USE_PXSHM
2570         CmiExitPxshm();
2571 #endif
2572   ConverseCommonExit();               /* should be called by every rank */
2573   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
2574   if (CmiMyRank()==0) CmiStdoutFlush();
2575   if (Cmi_charmrun_fd==-1) {
2576     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
2577     else while (1) CmiYield();
2578   }
2579   else {
2580         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
2581 #if CMK_SHARED_VARS_UNAVAILABLE
2582         Cmi_check_delay = 1.0;          /* speed up checking of charmrun */
2583         while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2584 #elif CMK_MULTICORE
2585         if (!Cmi_commthread && CmiMyRank()==0) {
2586           Cmi_check_delay = 1.0;        /* speed up checking of charmrun */
2587           while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2588         }
2589 #endif
2590   }
2591   MACHSTATE(2,"} ConverseExit");
2592
2593 /*Comm. thread will kill us.*/
2594   while (1) CmiYield();
2595 }
2596
2597 static void set_signals(void)
2598 {
2599   if(!Cmi_truecrash) {
2600     signal(SIGSEGV, KillOnAllSigs);
2601     signal(SIGFPE, KillOnAllSigs);
2602     signal(SIGILL, KillOnAllSigs);
2603     signal(SIGINT, KillOnAllSigs);
2604     signal(SIGTERM, KillOnAllSigs);
2605     signal(SIGABRT, KillOnAllSigs);
2606 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2607     signal(SIGQUIT, KillOnAllSigs);
2608     signal(SIGBUS, KillOnAllSigs);
2609 #     if CMK_HANDLE_SIGUSR
2610     signal(SIGUSR1, HandleUserSignals);
2611     signal(SIGUSR2, HandleUserSignals);
2612 #     endif
2613 #   endif /*UNIX*/
2614   }
2615 }
2616
2617 /*Socket idle function to use before addresses have been
2618   obtained.  During the real program, we idle with CmiYield.
2619 */
2620 static void obtain_idleFn(void) {sleep(0);}
2621
2622 static int net_default_skt_abort(int code,const char *msg)
2623 {
2624   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2625   machine_exit(1);
2626   return -1;
2627 }
2628
2629 #if MACHINE_DEBUG_LOG
2630 FILE *debugLog = NULL;
2631 #endif
2632
2633 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
2634 {
2635 #if MACHINE_DEBUG
2636   debugLog=NULL;
2637 #endif
2638 #if CMK_USE_HP_MAIN_FIX
2639 #if FOR_CPLUS
2640   _main(argc,argv);
2641 #endif
2642 #endif
2643   Cmi_startfn = fn; Cmi_usrsched = usc;
2644   Cmi_netpoll = 0;
2645 #if CMK_NETPOLL
2646   Cmi_netpoll = 1;
2647 #endif
2648 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2649   Cmi_idlepoll = 0;
2650 #else
2651   Cmi_idlepoll = 1;
2652 #endif
2653   Cmi_truecrash = 0;
2654   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
2655       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2656     /* netpoll disable signal */
2657   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2658     /* idlepoll use poll instead if sleep when idle */
2659   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2660     /* idlesleep use sleep instead if busywait when idle */
2661   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2662   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2663
2664   Cmi_asyncio = 1;
2665 #if CMK_ASYNC_NOT_NEEDED
2666   Cmi_asyncio = 0;
2667 #endif
2668   if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2669   if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2670 #if CMK_MULTICORE
2671   if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
2672     Cmi_commthread = 1;
2673 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2674     _Cmi_noprocforcommthread = 1;   /* worker thread go sleep */
2675 #endif
2676     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2677   }
2678 #endif
2679
2680   skt_init();
2681   /* use special abort handler instead of default_skt_abort to 
2682      prevent exit trapped by atexit_check() due to the exit() call  */
2683   skt_set_abort(net_default_skt_abort);
2684   atexit(machine_atexit_check);
2685   parse_netstart();
2686 #if ! CMK_SMP && ! defined(_WIN32)
2687   /* only get forks in non-smp mode */
2688   parse_forks();
2689 #endif
2690   extract_args(argv);
2691   log_init();
2692   Cmi_scanf_mutex = CmiCreateLock();
2693
2694 #if MACHINE_DEBUG_LOG
2695   {
2696     char ln[200];
2697     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2698     debugLog=fopen(ln,"w");
2699   }
2700 #endif
2701
2702     /* NOTE: can not acutally call timer before timerInit ! GZ */
2703   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2704
2705   skt_set_idle(obtain_idleFn);
2706   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2707         set_signals();
2708 #if CMK_USE_TCP
2709         dataskt=skt_server(&dataport);
2710 #elif !CMK_USE_GM && !CMK_USE_MX
2711         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2712 #else
2713           /* GM and MX do not need to create any socket for communication */
2714         dataskt=-1;
2715 #endif
2716         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2717         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2718         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2719         CmiStdoutInit();
2720   } else {/*Standalone operation*/
2721         printf("Charm++: standalone mode (not using charmrun)\n");
2722         dataskt=-1;
2723         Cmi_charmrun_fd=-1;
2724   }
2725
2726   CmiMachineInit(argv);
2727
2728   node_addresses_obtain(argv);
2729   MACHSTATE(5,"node_addresses_obtain done");
2730
2731 #if CMK_USE_IBVERBS
2732   if (Cmi_charmrun_fd==-1) CmiAbort("Fatal error: standalone mode is not supported in ibverbs. \n");
2733 #endif
2734
2735   CmiCommunicationInit(argv);
2736
2737 #if CMK_USE_SYSVSHM
2738   CmiInitSysvshm(argv);
2739 #elif CMK_USE_PXSHM
2740   CmiInitPxshm(argv);
2741 #endif
2742
2743   skt_set_idle(CmiYield);
2744   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2745
2746   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2747       Cmi_check_delay=1.0e30;
2748
2749   CsvInitialize(CmiNodeState, NodeState);
2750   CmiNodeStateInit(&CsvAccess(NodeState));
2751  
2752
2753   /* Network progress function is used to poll the network when for
2754      messages. This flushes receive buffers on some  implementations*/ 
2755   networkProgressPeriod = 0;  
2756   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2757
2758   Cmi_argvcopy = CmiCopyArgs(argv);
2759   Cmi_argv = argv; 
2760
2761   CmiStartThreads(argv);
2762
2763 #if CMK_USE_AMMASSO
2764   CmiAmmassoOpenQueuePairs();
2765 #endif
2766
2767   ConverseRunPE(everReturn);
2768 }
2769
2770 #if CMK_PERSISTENT_COMM
2771
2772 int persistentSendMsgHandlerIdx;
2773
2774 static void sendPerMsgHandler(char *msg)
2775 {
2776   int msgSize;
2777   void *destAddr, *destSizeAddr;
2778
2779   msgSize = CmiMsgHeaderGetLength(msg);
2780   msgSize -= 2*sizeof(void *);
2781   destAddr = *(void **)(msg + msgSize);
2782   destSizeAddr = *(void **)(msg + msgSize + sizeof(void*));
2783 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2784   CmiSetHandler(msg, CmiGetXHandler(msg));
2785   *((int *)destSizeAddr) = msgSize;
2786   memcpy(destAddr, msg, msgSize);
2787 }
2788
2789 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2790 {
2791   CmiAssert(h!=NULL);
2792   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2793   CmiAssert(slot->used == 1);
2794   CmiAssert(slot->destPE == destPE);
2795   if (size > slot->sizeMax) {
2796     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2797     CmiAbort("Abort: Invalid size\n");
2798   }
2799
2800 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2801
2802   if (slot->destAddress[0]) {
2803     int newsize = size + sizeof(void *)*2;
2804     char *newmsg = (char*)CmiAlloc(newsize);
2805     memcpy(newmsg, m, size);
2806     memcpy(newmsg+size, &slot->destAddress[0], sizeof(void *));
2807     memcpy(newmsg+size+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2808     CmiFree(m);
2809     CmiMsgHeaderSetLength(newmsg, size + sizeof(void *)*2);
2810     CmiSetXHandler(newmsg, CmiGetHandler(newmsg));
2811     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2812     phs = NULL; phsSize = 0;
2813     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2814   }
2815   else {
2816 #if 1
2817     if (slot->messageBuf != NULL) {
2818       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2819       CmiAbort("");
2820     }
2821     slot->messageBuf = m;
2822     slot->messageSize = size;
2823 #else
2824     /* normal send */
2825     PersistentHandle  *phs_tmp = phs;
2826     int phsSize_tmp = phsSize;
2827     phs = NULL; phsSize = 0;
2828     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2829     CmiSyncSendAndFree(slot->destPE, size, m);
2830     phs = phs_tmp; phsSize = phsSize_tmp;
2831 #endif
2832   }
2833 }
2834
2835 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2836 {
2837   char *dupmsg = (char *) CmiAlloc(size);
2838   memcpy(dupmsg, msg, size);
2839
2840   /*  CmiPrintf("Setting root to %d\n", 0); */
2841   if (CmiMyPe()==destPE) {
2842     CQdCreate(CpvAccess(cQdState), 1);
2843     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2844   }
2845   else
2846     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2847 }
2848
2849 /* called in PumpMsgs */
2850 int PumpPersistent()
2851 {
2852   PersistentReceivesTable *slot = persistentReceivesTableHead;
2853   int status = 0;
2854   while (slot) {
2855     unsigned int size = *(slot->recvSizePtr[0]);
2856     if (size > 0)
2857     {
2858       char *msg = slot->messagePtr[0];
2859 /*CmiPrintf("size: %d msg:%p %p\n", size, msg, slot->messagePtr);*/
2860
2861 #if 0
2862       void *dupmsg;
2863       dupmsg = CmiAlloc(size);
2864       
2865       _MEMCHECK(dupmsg);
2866       memcpy(dupmsg, msg, size);
2867       msg = dupmsg;
2868 #else
2869       /* return messagePtr directly and user MUST make sure not to delete it. */
2870       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2871
2872       CmiReference(msg);
2873 #endif
2874
2875       CmiPushPE(CMI_DEST_RANK(msg), msg);
2876 #if CMK_BROADCAST_SPANNING_TREE
2877       if (CMI_BROADCAST_ROOT(msg))
2878           SendSpanningChildren(size, msg);
2879 #endif
2880       *(slot->recvSizePtr[0]) = 0;
2881       status = 1;
2882     }
2883     slot = slot->next;
2884   }
2885   return status;
2886 }
2887
2888 void *PerAlloc(int size)
2889 {
2890   return CmiAlloc(size);
2891 }
2892                                                                                 
2893 void PerFree(char *msg)
2894 {
2895     CmiFree(msg);
2896 }
2897
2898 void persist_machine_init() 
2899 {
2900   persistentSendMsgHandlerIdx =
2901        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
2902 }
2903
2904 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
2905 {
2906   int i;
2907   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
2908     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
2909     _MEMCHECK(buf);
2910     memset(buf, 0, maxBytes+sizeof(int)*2);
2911     slot->messagePtr[i] = buf;
2912     slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
2913   }
2914   slot->sizeMax = maxBytes;
2915 }
2916
2917 #endif
2918
2919
2920 #if CMK_CELL
2921
2922 #include "spert_ppu.h"
2923
2924 void machine_OffloadAPIProgress() {
2925   CmiCommLock();
2926   OffloadAPIProgress();
2927   CmiCommUnlock();
2928 }
2929 #endif
2930
2931
2932
2933 /*@}*/