fix compilation error due to no return for net clang
[charm.git] / src / arch / net / machine.c
1
2 /** @file
3  * Basic NET implementation of Converse machine layer
4  * @ingroup NET
5  */
6
7 /** @defgroup NET
8  * NET implementation of machine layer, ethernet in particular
9  * @ingroup Machine
10  *
11  * THE DATAGRAM STREAM
12  *
13  * Messages are sent using UDP datagrams.  The sender allocates a
14  * struct for each datagram to be sent.  These structs stick around
15  * until slightly after the datagram is acknowledged.
16  *
17  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18  * Each node has an OtherNode struct for every other node in the
19  * system.  The OtherNode struct contains:
20  *
21  *   send_queue   (all datagram-structs not yet transmitted)
22  *   send_window  (all datagram-structs transmitted but not ack'd)
23  *
24  * When an acknowledgement comes in, all packets in the send-window
25  * are either marked as acknowledged or pushed back into the send
26  * queue for retransmission.
27  *
28  * THE OUTGOING MESSAGE
29  *
30  * When you send or broadcast a message, the first thing the system
31  * does is system creates an OutgoingMsg struct to represent the
32  * operation.  The OutgoingMsg contains a very direct expression
33  * of what you want to do:
34  *
35  * OutgoingMsg:
36  *
37  *   size      --- size of message in bytes
38  *   data      --- pointer to the buffer containing the message
39  *   src       --- processor which sent the message
40  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
41  *   freemode  --- see below.
42  *   refcount  --- see below.
43  *
44  * The OutgoingMsg is kept around until the transmission is done, then
45  * it is garbage collected --- the refcount and freemode fields are
46  * to assist garbage collection.
47  *
48  * The freemode indicates which kind of buffer-management policy was
49  * used (sync, async, or freeing).  The sync policy is handled
50  * superficially by immediately converting sync sends into freeing
51  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
52  * (freeing).  If the freemode is 'F', then garbage collection
53  * involves freeing the data and the OutgoingMsg structure itself.  If
54  * the freemode is 'A', then the only cleanup is to change the
55  * freemode to 'X', a condition which is then detectable by
56  * CmiAsyncMsgSent.  In this case, the actual freeing of the
57  * OutgoingMsg is done by CmiReleaseCommHandle.
58  *
59  * When the transmission is initiated, the system computes how many
60  * datagrams need to be sent, total.  This number is stored in the
61  * refcount field.  Each time a datagram is delivered, the refcount
62  * is decremented, when it reaches zero, cleanup is performed.  There
63  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
64  * is a send (not a broadcast) and can be performed with shared
65  * memory, the entire datagram system is bypassed, the message is
66  * simply delivered and freed, not using the refcount mechanism at
67  * all.  Exception 2: If the message is a broadcast, then part of the
68  * broadcast that can be done via shared memory is performed prior to
69  * initiating the datagram/refcount system.
70  *
71  * DATAGRAM FORMATS AND MESSAGE FORMATS
72  *
73  * Datagrams have this format:
74  *
75  *   srcpe   (16 bits) --- source processor number.
76  *   magic   ( 8 bits) --- magic number to make sure DG is good.
77  *   dstrank ( 8 bits) --- destination processor rank.
78  *   seqno   (32 bits) --- packet sequence number.
79  *   data    (XX byte) --- user data.
80  *
81  * The only reason the srcpe is in there is because the receiver needs
82  * to know which receive window to use.  The dstrank field is needed
83  * because transmission is node-to-node.  Once the message is
84  * assembled by the node, it must be delivered to the appropriate PE.
85  * The dstrank field is used to encode certain special-case scenarios.
86  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87  * and should be delivered to all processors in the node.  If the dstrank
88  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89  * which case the srcpe is the number of the acknowledger, the seqno is
90  * always zero, and the user data is a list of the seqno's being
91  * acknowledged.  There may be other dstrank codes for special functions.
92  *
93  * To send a message, one chops it up into datagrams and stores those
94  * datagrams in a send-queue.  These outgoing datagrams aren't stored
95  * in the explicit format shown above.  Instead, they are stored as
96  * ImplicitDgrams, which contain the datagram header and a pointer to
97  * the user data (which is in the user message buffer, which is in the
98  * OutgoingMsg).  At transmission time these are combined together.
99
100  * The combination of the datagram header with the user's data is
101  * performed right in the user's message buffer.  Note that the
102  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
103  * of the user's message with a datagram header, sends the datagram
104  * straight from the user's message buffer, then restores the user's
105  * buffer to its original state.  There is a small problem with the
106  * first datagram of the message: one needs 64 bits of space to store
107  * the datagram header.  To make sure this space is there, we added a
108  * 64-bit unused space to the front of the Cmi message header.  In
109  * addition to this, we also add 32 bits to the Cmi message header
110  * to make room for a length-field, making it possible to identify
111  * message boundaries.
112  *
113  * CONCURRENCY CONTROL
114  *
115  * This has changed recently.
116  *
117  * EFFICIENCY NOTES
118  *
119  * The sender-side does little copying.  The async and freeing send
120  * routines do no copying at all.  The sync send routines copy the
121  * message, then use the freeing-send routines.  The other alternative
122  * is to not copy the message, and use the async send mechanism
123  * combined with a blocking wait.  Blocking wait seems like a bad
124  * idea, since it could take a VERY long time to get all those
125  * datagrams out the door.
126  *
127  * The receiver side, unfortunately, must copy.  To avoid copying,
128  * it would have to receive directly into a preallocated message buffer.
129  * Unfortunately, this can't work: there's no way to know how much
130  * memory to preallocate, and there's no way to know which datagram
131  * is coming next.  Thus, we receive into fixed-size (large) datagram
132  * buffers.  These are then inspected, and the messages extracted from
133  * them.
134  *
135  * Note that we are allocating a large number of structs: OutgoingMsg's,
136  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
137  * is a fixed-size structure.  Thus, we can do memory allocation by
138  * simply keeping a linked-list of unused structs around.  The only
139  * place where expensive memory allocation is performed is in the
140  * sync routines.
141  *
142  * Since the datagrams from one node to another are fully ordered,
143  * there is slightly more ordering than is needed: in theory, the
144  * datagrams of one message don't need to be ordered relative to the
145  * datagrams of another.  This was done to simplify the sequencing
146  * mechanisms: implementing a fully-ordered stream is much simpler
147  * than a partially-ordered one.  It also makes it possible to
148  * modularize, layering the message transmitter on top of the
149  * datagram-sequencer.  In other words, it was just easier this way.
150  * Hopefully, this won't cause serious degradation: LAN's rarely get
151  * datagrams out of order anyway.
152  *
153  * A potential efficiency problem is the lack of message-combining.
154  * One datagram could conceivably contain several messages.  This
155  * might be more efficient, it's not clear how much overhead is
156  * involved in sending a short datagram.  Message-combining isn't
157  * really ``integrated'' into the design of this software, but you
158  * could fudge it as follows.  Whenever you pull a short datagram from
159  * the send-queue, check the next one to see if it's also a short
160  * datagram.  If so, pack them together into a ``combined'' datagram.
161  * At the receive side, simply check for ``combined'' datagrams, and
162  * treat them as if they were simply two datagrams.  This would
163  * require extra copying.  I have no idea if this would be worthwhile.
164  *
165  *****************************************************************************/
166
167 /**
168  * @addtogroup NET
169  * @{
170  */
171
172 /*****************************************************************************
173  *
174  * Include Files
175  *
176  ****************************************************************************/
177
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
180
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf".  This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
186
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf.  Has to be defined up here because we probably 
189 haven't properly guessed this compiler's prototype for "printf".
190 */
191 static void InternalPrintf(const char *f, va_list l);
192 int printf(const char *fmt, ...) {
193         int nChar;
194         va_list p; va_start(p, fmt);
195         InternalPrintf(fmt,p);
196         va_end(p);
197         return 10;
198 }
199 #endif
200
201
202 #include "converse.h"
203 #include "memory-isomalloc.h"
204
205 #include <stdio.h>
206 #include <stdlib.h>
207 #include <ctype.h>
208 #include <fcntl.h>
209 #include <errno.h>
210 #include <setjmp.h>
211 #include <signal.h>
212 #include <string.h>
213
214 /* define machine debug */
215 #include "machine.h"
216
217 /******************* Producer-Consumer Queues ************************/
218 #include "pcqueue.h"
219
220 #include "machine-smp.h"
221
222 #if CMK_USE_KQUEUE
223 #include <sys/event.h>
224 int _kq = -1;
225 #endif
226
227 #if CMK_USE_POLL
228 #include <poll.h>
229 #endif
230
231 #if CMK_USE_GM
232 #include "gm.h"
233 struct gm_port *gmport = NULL;
234 int  portFinish = 0;
235 #endif
236
237 #if CMK_USE_MX
238 #include "myriexpress.h"
239 mx_endpoint_t      endpoint;
240 mx_endpoint_addr_t endpoint_addr;
241 int MX_FILTER   =  123456;
242 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
243 #endif
244
245 #if CMK_USE_AMMASSO
246   #include "clustercore/ccil_api.h"
247 #endif
248
249 #if CMK_MULTICORE
250 int Cmi_commthread = 0;
251 #endif
252
253 #include "conv-ccs.h"
254 #include "ccs-server.h"
255 #include "sockRoutines.h"
256
257 #if defined(_WIN32) && ! defined(__CYGWIN__)
258 /*For windows systems:*/
259 #  include <windows.h>
260 #  include <wincon.h>
261 #  include <sys/types.h>
262 #  include <sys/timeb.h>
263 #  define fdopen _fdopen
264 #  define SIGBUS -1  /*These signals don't exist in Win32*/
265 #  define SIGKILL -1
266 #  define SIGQUIT -1
267 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
268
269 #else /*UNIX*/
270 #  include <pwd.h>
271 #  include <unistd.h>
272 #  include <fcntl.h>
273 #  include <sys/file.h>
274 #endif
275
276 #if CMK_PERSISTENT_COMM
277 #include "persist_impl.h"
278 #endif
279
280 #define PRINTBUFSIZE 16384
281
282 #ifdef __ONESIDED_IMPL
283 #ifdef __ONESIDED_NO_HARDWARE
284 int putSrcHandler;
285 int putDestHandler;
286 int getSrcHandler;
287 int getDestHandler;
288 #include "conv-onesided.c"
289 #endif
290 #endif
291
292
293 static void CommunicationServer(int withDelayMs, int where);
294
295 void CmiHandleImmediate();
296 extern int CmemInsideMem();
297 extern void CmemCallWhenMemAvail();
298 static void ConverseRunPE(int everReturn);
299 void CmiYield(void);
300 void ConverseCommonExit(void);
301
302 static unsigned int dataport=0;
303 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
304 static SOCKET       dataskt;
305
306 extern void TokenUpdatePeriodic();
307 extern void getAvailSysMem();
308
309 #define BROADCAST_SPANNING_FACTOR               4
310
311 /****************************************************************************
312  *
313  * Handling Errors
314  *
315  * Errors should be handled by printing a message on stderr and
316  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
317  * communication should be made.  The other processes will notice the
318  * abnormal termination and will deal with it.
319  *
320  * Rationale: if an error triggers an attempt to send a message,
321  * the attempt to send a message is likely to trigger another error,
322  * leading to an infinite loop and a process that spins instead of
323  * shutting down.
324  *
325  *****************************************************************************/
326
327 static int machine_initiated_shutdown=0;
328 static int already_in_signal_handler=0;
329
330 static void CmiDestroyLocks();
331
332 void CmiMachineExit();
333
334 #if CMK_USE_SYSVSHM /* define teardown function before use */
335 void tearDownSharedBuffers();
336 #endif 
337
338 static void machine_exit(int status)
339 {
340   MACHSTATE(3,"     machine_exit");
341   machine_initiated_shutdown=1;
342
343   CmiDestroyLocks();            /* destroy locks to prevent dead locking */
344   EmergencyExit();
345
346 #if CMK_USE_GM
347   if (gmport) { 
348     gm_close(gmport); gmport = 0;
349     gm_finalize();
350   }
351 #endif
352 #if CMK_USE_SYSVSHM
353   tearDownSharedBuffers();
354 #endif
355   CmiMachineExit();
356   exit(status);
357 }
358
359 static void charmrun_abort(const char*);
360
361 static void KillEveryone(const char *msg)
362 {
363   charmrun_abort(msg);
364   machine_exit(1);
365 }
366
367 static void KillEveryoneCode(n)
368 int n;
369 {
370   char _s[100];
371   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
372   charmrun_abort(_s);
373   machine_exit(1);
374 }
375
376 CpvExtern(int, freezeModeFlag);
377
378 static void KillOnAllSigs(int sigNo)
379 {
380   const char *sig="unknown signal";
381   const char *suggestion="";
382   if (machine_initiated_shutdown ||
383       already_in_signal_handler) 
384         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
385   already_in_signal_handler=1;
386
387 #if CMK_CCS_AVAILABLE
388   if (CpvAccess(cmiArgDebugFlag)) {
389     int reply = 0;
390     CpdNotify(CPD_SIGNAL,sigNo);
391 #if ! CMK_BIGSIM_CHARM
392     CcsSendReplyNoError(4,&reply);/*Send an empty reply if not*/
393     CpvAccess(freezeModeFlag) = 1;
394     CpdFreezeModeScheduler();
395 #else
396     CpdFreeze();
397 #endif
398   }
399 #endif
400   
401   CmiDestroyLocks();
402
403   if (sigNo==SIGSEGV) {
404      sig="segmentation violation";
405      suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
406   }
407   if (sigNo==SIGFPE) {
408      sig="floating point exception";
409      suggestion="Check for integer or floating-point division by zero.\n";
410   }
411   if (sigNo==SIGBUS) {
412      sig="bus error";
413      suggestion="Check for misaligned reads or writes to memory.\n";
414   }
415   if (sigNo==SIGILL) {
416      sig="illegal instruction";
417      suggestion="Check for calls to uninitialized function pointers.\n";
418   }
419   if (sigNo==SIGKILL) sig="caught signal KILL";
420   if (sigNo==SIGQUIT) sig="caught signal QUIT";
421   if (sigNo==SIGTERM) sig="caught signal TERM";
422   MACHSTATE1(5,"     Caught signal %s ",sig);
423 /*ifdef this part*/
424 #ifdef __FAULT__
425   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
426                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
427   }else{
428 #else
429         {
430 #endif
431    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
432         "Signal: %s\n",CmiMyPe(),sig);
433         if (0!=suggestion[0])
434                 CmiError("Suggestion: %s",suggestion);
435         CmiPrintStackTrace(1);
436         charmrun_abort(sig);
437         machine_exit(1);                
438         }       
439 }
440
441 static void machine_atexit_check(void)
442 {
443   if (!machine_initiated_shutdown)
444     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
445   printf("Program finished.\n");
446 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
447   fgetc(stdin);
448 #endif
449 }
450
451 #if !defined(_WIN32) || defined(__CYGWIN__)
452 static void HandleUserSignals(int signum)
453 {
454   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
455   CcdRaiseCondition(condnum);
456 }
457 #endif
458
459 static void PerrorExit(const char *msg)
460 {
461   perror(msg);
462   machine_exit(1);
463 }
464
465 /*****************************************************************************
466  *
467  *     Utility routines for network machine interface.
468  *
469  *****************************************************************************/
470
471 /*
472 Horrific #defines to hide the differences between select() and poll().
473  */
474 #if CMK_USE_POLL /*poll() version*/
475 # define CMK_PIPE_DECL(delayMs) \
476         struct pollfd fds[10]; \
477         int nFds_sto=0; int *nFds=&nFds_sto; \
478         int pollDelayMs=delayMs;
479 # define CMK_PIPE_SUB fds,nFds
480 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
481
482 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
483 # define CMK_PIPE_ADDREAD(rd_fd) \
484         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
485 # define CMK_PIPE_ADDWRITE(wr_fd) \
486         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
487 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
488 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
489
490 #elif CMK_USE_KQUEUE /* kqueue version */
491
492 # define CMK_PIPE_DECL(delayMs) \
493         if (_kq == -1) _kq = kqueue(); \
494     struct kevent ke_sto; \
495     struct kevent* ke = &ke_sto; \
496     struct timespec tmo; \
497     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
498 # define CMK_PIPE_SUB ke
499 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
500
501 # define CMK_PIPE_PARAM struct kevent* ke
502 # define CMK_PIPE_ADDREAD(rd_fd) \
503         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
504                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
505 # define CMK_PIPE_ADDWRITE(wr_fd) \
506         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
507                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
508 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
509 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
510
511 #else /*select() version*/
512
513 # define CMK_PIPE_DECL(delayMs) \
514         fd_set rfds_sto,wfds_sto;\
515         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
516         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
517 # define CMK_PIPE_SUB rfds,wfds
518 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
519
520 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
521 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
522 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
523 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
524 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
525 #endif
526
527 static void CMK_PIPE_CHECKERR(void) {
528 #if defined(_WIN32) && !defined(__CYGWIN__)
529 /* Win32 socket seems to randomly return inexplicable errors
530 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
531         int err=WSAGetLastError();
532         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
533 o,err);
534 */
535 #else /*UNIX machine*/
536         if (errno!=EINTR)
537                 KillEveryone("Socket error in CheckSocketsReady!\n");
538 #endif
539 }
540
541
542 static void CmiStdoutFlush(void);
543 static int  CmiStdoutNeedsService(void);
544 static void CmiStdoutService(void);
545 static void CmiStdoutAdd(CMK_PIPE_PARAM);
546 static void CmiStdoutCheck(CMK_PIPE_PARAM);
547
548
549 double GetClock(void)
550 {
551 #if defined(_WIN32) && !defined(__CYGWIN__)
552   struct _timeb tv; 
553   _ftime(&tv);
554   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
555 #else
556   struct timeval tv; int ok;
557   ok = gettimeofday(&tv, NULL);
558   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
559   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
560 #endif
561 }
562
563 char *CopyMsg(char *msg, int len)
564 {
565   char *copy = (char *)CmiAlloc(len);
566   if (!copy)
567       fprintf(stderr, "Out of memory\n");
568   memcpy(copy, msg, len);
569   return copy;
570 }
571
572 /***********************************************************************
573  *
574  * Abort function:
575  *
576  ************************************************************************/
577
578 static int  Cmi_truecrash;
579 static int already_aborting=0;
580 void CmiAbort(const char *message)
581 {
582   if (already_aborting) machine_exit(1);
583   already_aborting=1;
584         {
585 /*       char str[100];
586          sprintf(str,"dead.%d",CmiMyNode());
587          FILE *fp = fopen(str,"w");
588          fprintf(fp,"%s",message);
589          fclose(fp);*/
590         }
591   MACHSTATE1(5,"CmiAbort(%s)",message);
592
593 #if CMK_CCS_AVAILABLE
594   /* if CharmDebug is attached simply try to send a message to it */
595   if (CpvAccess(cmiArgDebugFlag)) {
596     CpdNotify(CPD_ABORT, message);
597     CpdFreeze();
598   }
599 #endif
600   
601   /* CmiDestroyLocks();  */
602
603   {
604 /*    char str[22];
605     snprintf(str,18,"dead.%d",CmiMyPe());
606     FILE *fp = fopen(str,"w");
607     fprintf(fp,"Abort:%s\n",message);
608     fclose(fp);*/
609   }
610
611   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
612         "Reason: %s\n",CmiMyPe(),message);
613   CmiPrintStackTrace(0);
614   
615   /*Send off any remaining prints*/
616   CmiStdoutFlush();
617   
618   if(Cmi_truecrash) {
619     printf("CHARM++ FATAL ERROR: %s\n", message);
620     *(int *)NULL = 0; /*Write to null, causing bus error*/
621   } else {
622     charmrun_abort(message);
623     machine_exit(1);
624   }
625 }
626
627
628 /******************************************************************************
629  *
630  * CmiEnableAsyncIO
631  *
632  * The net and tcp versions use a bunch of unix processes talking to each
633  * other via file descriptors.  We need for a signal SIGIO to be generated
634  * each time a message arrives, making it possible to write a signal
635  * handler to handle the messages.  The vast majority of unixes can,
636  * in fact, do this.  However, there isn't any standard for how this is
637  * supposed to be done, so each version of UNIX has a different set of
638  * calls to turn this signal on.  So, there is like one version here for
639  * every major brand of UNIX.
640  *
641  *****************************************************************************/
642
643 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
644 #include <fcntl.h>
645 void CmiEnableAsyncIO(int fd)
646 {
647   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
648     CmiError("setting socket owner: %s\n", strerror(errno)) ;
649     exit(1);
650   }
651   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
652     CmiError("setting socket async: %s\n", strerror(errno)) ;
653     exit(1);
654   }
655 }
656 #else
657 void CmiEnableAsyncIO(int fd) { }
658 #endif
659
660 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
661 #if !defined(_WIN32) || defined(__CYGWIN__)
662 void CmiEnableNonblockingIO(int fd) {
663   int on=1;
664   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
665     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
666     exit(1);
667   }
668 }
669 #else
670 void CmiEnableNonblockingIO(int fd) { }
671 #endif
672
673
674 /******************************************************************************
675 *
676 * Configuration Data
677 *
678 * This data is all read in from the NETSTART variable (provided by the
679 * charmrun) and from the command-line arguments.  Once read in, it is never
680  * modified.
681  *
682  *****************************************************************************/
683
684 int               _Cmi_mynode;    /* Which address space am I */
685 int               _Cmi_mynodesize;/* Number of processors in my address space */
686 int               _Cmi_numnodes;  /* Total number of address spaces */
687 int               _Cmi_numpes;    /* Total number of processors */
688 static int        Cmi_nodestart; /* First processor in this address space */
689 static skt_ip_t   Cmi_self_IP;
690 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
691 static int        Cmi_charmrun_port;
692 static int        Cmi_charmrun_pid;
693 static int        Cmi_charmrun_fd=-1;
694 /* Magic number to be used for sanity check in messege header */
695 static int                              Cmi_net_magic;
696
697 static int    Cmi_netpoll;
698 static int    Cmi_asyncio;
699 static int    Cmi_idlepoll;
700 static int    Cmi_syncprint;
701 static int Cmi_print_stats = 0;
702
703 #if ! defined(_WIN32)\r
704 /* parse forks only used in non-smp mode */
705 static void parse_forks(void) {
706   char *forkstr;
707   int nread;
708   int forks;
709   int i,pid;
710   forkstr=getenv("CmiMyForks");
711   if(forkstr!=0) { /* charmrun */
712         nread = sscanf(forkstr,"%d",&forks);
713         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
714                 pid=fork();
715                 if(pid<0) CmiAbort("Fork returned an error");
716                 if(pid==0) { /* forked process */
717                         /* reset mynode,pe & exit loop */
718                         _Cmi_mynode+=i;
719 #if ! CMK_SMP\r
720                         _Cmi_mype+=i;
721 #endif\r
722                         break;
723                 }
724         }
725   }
726 }
727 #endif
728 static void parse_magic(void)
729 {
730         char* nm;       
731         int nread;
732   nm = getenv("NETMAGIC");
733   if (nm!=0) 
734   {/*Read values set by Charmrun*/
735         nread = sscanf(nm, "%d",&Cmi_net_magic);
736         }
737 }
738 static void parse_netstart(void)
739 {
740   char *ns;
741   int nread;
742   int port;
743   ns = getenv("NETSTART");
744   if (ns!=0) 
745   {/*Read values set by Charmrun*/
746         char Cmi_charmrun_name[1024];
747         nread = sscanf(ns, "%d%s%d%d%d",
748                  &_Cmi_mynode,
749                  Cmi_charmrun_name, &Cmi_charmrun_port,
750                  &Cmi_charmrun_pid, &port);
751         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
752
753         if (nread!=5) {
754                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
755                 exit(1);
756         }
757   } else 
758   {/*No charmrun-- set flag values for standalone operation*/
759         _Cmi_mynode=0;
760         Cmi_charmrun_IP=_skt_invalid_ip;
761         Cmi_charmrun_port=0;
762         Cmi_charmrun_pid=0;
763         dataport = -1;
764   }
765 #if CMK_USE_IBVERBS | CMK_USE_IBUD
766     {
767         char *cmi_num_nodes = getenv("CmiNumNodes");
768         if(cmi_num_nodes != NULL){
769                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
770         }
771     }
772 #endif  
773 }
774
775 static void extract_common_args(char **argv)
776 {
777   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
778     Cmi_print_stats = 1;
779 }
780
781 /* for SMP */
782 #include "machine-smp.c"
783
784 CsvDeclare(CmiNodeState, NodeState);
785
786 /* Immediate message support */
787 #define CMI_DEST_RANK(msg)      *(int *)(msg)
788 #include "immediate.c"
789
790 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
791 #include "machine-commthd-util.c"
792 #endif
793
794 /******************************************************************************
795  *
796  * Packet Performance Logging
797  *
798  * This module is designed to give a detailed log of the packets and their
799  * acknowledgements, for performance tuning.  It can be disabled.
800  *
801  *****************************************************************************/
802
803 #define LOGGING 0
804
805 #if LOGGING
806
807 typedef struct logent {
808   double time;
809   int seqno;
810   int srcpe;
811   int dstpe;
812   int kind;
813 } *logent;
814
815
816 logent log;
817 int    log_pos;
818 int    log_wrap;
819
820 static void log_init(void)
821 {
822   log = (logent)malloc(50000 * sizeof(struct logent));
823   _MEMCHECK(log);
824   log_pos = 0;
825   log_wrap = 0;
826 }
827
828 static void log_done(void)
829 {
830   char logname[100]; FILE *f; int i, size;
831   sprintf(logname, "log.%d", _Cmi_mynode);
832   f = fopen(logname, "w");
833   if (f==0) KillEveryone("fopen problem");
834   if (log_wrap) size = 50000; else size=log_pos;
835   for (i=0; i<size; i++) {
836     logent ent = log+i;
837     fprintf(f, "%1.4f %d %c %d %d\n",
838             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
839   }
840   fclose(f);
841 }
842
843 void printLog(void)
844 {
845   char logname[100]; FILE *f; int i, j, size;
846   static int logged = 0;
847   if (logged)
848       return;
849   logged = 1;
850   CmiPrintf("Logging: %d\n", _Cmi_mynode);
851   sprintf(logname, "log.%d", _Cmi_mynode);
852   f = fopen(logname, "w");
853   if (f==0) KillEveryone("fopen problem");
854   for (i = 5000; i; i--)
855   {
856   /*for (i=0; i<size; i++) */
857     j = log_pos - i;
858     if (j < 0)
859     {
860         if (log_wrap)
861             j = 5000 + j;
862         else
863             j = 0;
864     };
865     {
866     logent ent = log+j;
867     fprintf(f, "%1.4f %d %c %d %d\n",
868             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
869     }
870   }
871   fclose(f);
872   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
873 }
874
875 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
876
877 #endif
878
879 #if !LOGGING
880
881 #define log_init() /*empty*/
882 #define log_done() /*empty*/
883 #define printLog() /*empty*/
884 #define LOG(t,s,k,d,q) /*empty*/
885
886 #endif
887
888 /******************************************************************************
889  *
890  * Node state
891  *
892  *****************************************************************************/
893
894
895 static CmiNodeLock    Cmi_scanf_mutex;
896 static double         Cmi_clock;
897 static double         Cmi_check_delay = 3.0;
898
899 /******************************************************************************
900  *
901  * OS Threads
902  * SMP implementation moved to machine-smp.c
903  *****************************************************************************/
904
905 /************************ No kernel SMP threads ***************/
906 #if CMK_SHARED_VARS_UNAVAILABLE
907
908 static volatile int memflag=0;
909 void CmiMemLock() { memflag++; }
910 void CmiMemUnlock() { memflag--; }
911
912 static volatile int comm_flag=0;
913 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
914 #ifndef MACHLOCK_DEBUG
915 #  define CmiCommLock() (comm_flag=1)
916 #  define CmiCommUnlock() (comm_flag=0)
917 #else /* Error-checking flag locks */
918 void CmiCommLock(void) {
919   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
920   comm_flag=1;
921 }
922 void CmiCommUnlock(void) {
923   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
924   comm_flag=0;
925 }
926 #endif
927
928 static struct CmiStateStruct Cmi_state;
929 int _Cmi_mype;
930 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
931 #define CmiGetState() (&Cmi_state)
932 #define CmiGetStateN(n) (&Cmi_state)
933
934 void CmiYield(void) { sleep(0); }
935
936 static void CommunicationInterrupt(int ignored)
937 {
938   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
939   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
940   { /* Already busy inside malloc, comm, or immediate messages */
941     MACHSTATE(5,"--SKIPPING SIGIO--");
942     return;
943   }
944   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
945   {
946     /*Make sure any malloc's we do in here are NOT migratable:*/
947     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
948 /*    _Cmi_myrank=1; */
949     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
950 /*    _Cmi_myrank=0; */
951     CmiIsomallocBlockListActivate(oldList);
952   }
953   MACHSTATE(2,"--END SIGIO--")
954 }
955
956 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
957
958 static void CmiStartThreads(char **argv)
959 {
960   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
961   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
962   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
963     KillEveryone
964       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
965   
966   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
967   _Cmi_mype = Cmi_nodestart;
968
969   /* Prepare Cpv's for immediate messages: */
970   _Cmi_myrank=1;
971   CommunicationServerInit();
972   _Cmi_myrank=0;
973   
974 #if !CMK_ASYNC_NOT_NEEDED
975   if (Cmi_asyncio)
976   {
977     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
978     if (!Cmi_netpoll) {
979       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
980       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
981     }
982 #if CMK_USE_GM || CMK_USE_MX
983       /* charmrun is serviced in interrupt for gm */
984     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
985 #endif
986   }
987 #endif
988 }
989
990 static void CmiDestroyLocks()
991 {
992   comm_flag = 0;
993   memflag = 0;
994 }
995
996 #endif
997
998 /*Network progress utility variables. Period controls the rate at
999   which the network poll is called */
1000
1001 CpvDeclare(unsigned , networkProgressCount);
1002 int networkProgressPeriod;
1003
1004 CpvDeclare(void *, CmiLocalQueue);
1005
1006
1007 #ifndef CmiMyPe
1008 int CmiMyPe() 
1009
1010   return CmiGetState()->pe; 
1011 }
1012 #endif
1013 #ifndef CmiMyRank
1014 int CmiMyRank()
1015 {
1016   return CmiGetState()->rank;
1017 }
1018 #endif
1019
1020 CpvExtern(int,_charmEpoch);
1021
1022 /*Add a message to this processor's receive queue 
1023   Must be called while holding comm. lock
1024 */
1025
1026 extern double evacTime;
1027
1028 void CmiPushPE(int pe,void *msg)
1029 {
1030   CmiState cs=CmiGetStateN(pe);
1031         /*
1032                 FAULT_EVAC
1033         
1034         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
1035                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
1036         }*/
1037   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
1038   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
1039
1040 #if CMK_IMMEDIATE_MSG
1041   if (CmiIsImmediate(msg)) {
1042     CmiPushImmediateMsg(msg);
1043     return;
1044   }
1045 #endif
1046 #if !CMK_SMP_MULTIQ
1047   PCQueuePush(cs->recv,msg);
1048 #else
1049   PCQueuePush(cs->recv[CmiGetState()->myGrpIdx], msg);
1050 #endif
1051
1052 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1053   if (_Cmi_sleepOnIdle)
1054 #endif
1055   CmiIdleLock_addMessage(&cs->idle);
1056 }
1057
1058 #if CMK_NODE_QUEUE_AVAILABLE
1059 /*Add a message to the node queue.  
1060   Must be called while holding comm. lock
1061 */
1062 static void CmiPushNode(void *msg)
1063 {
1064   CmiState cs=CmiGetStateN(0);
1065   
1066   MACHSTATE(2,"Pushing message into node queue");
1067   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
1068   
1069 #if CMK_IMMEDIATE_MSG
1070   if (CmiIsImmediate(msg)) {
1071     MACHSTATE(2,"Pushing Immediate message into queue");
1072     CmiPushImmediateMsg(msg);
1073     return;
1074   }
1075 #endif 
1076 /* 
1077  * if CMK_SMP_MULTIQ is enabled, then PCQUEUE's push lock
1078  * may be disabled. In this case, the lock for node recv
1079  * queue has to be used.
1080  * */
1081 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1082   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1083 #endif
1084   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
1085 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1086   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1087 #endif
1088
1089   /*Silly: always try to wake up processor 0, so at least *somebody*
1090     will be awake to handle the message*/
1091 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1092   if (_Cmi_sleepOnIdle)
1093 #endif
1094   CmiIdleLock_addMessage(&cs->idle);
1095 }
1096 #endif
1097
1098 /***************************************************************
1099  Communication with charmrun:
1100  We can send (ctrl_sendone) and receive (ctrl_getone)
1101  messages on a TCP socket connected to charmrun.
1102  This is used for printfs, CCS, etc; and also for
1103  killing ourselves if charmrun dies.
1104 */
1105
1106 /*This flag prevents simultanious outgoing
1107 messages on the charmrun socket.  It is protected
1108 by the commlock.*/
1109 static int Cmi_charmrun_fd_sendflag=0;
1110
1111 /* ctrl_sendone */
1112 static int sendone_abort_fn(int code,const char *msg) {
1113         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
1114         machine_exit(1);
1115         return -1;
1116 }
1117
1118 static void ctrl_sendone_nolock(const char *type,
1119                                 const char *data1,int dataLen1,
1120                                 const char *data2,int dataLen2)
1121 {
1122   const void *bufs[3]; int lens[3]; int nBuffers=0;
1123   ChMessageHeader hdr;
1124   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
1125   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
1126   if (Cmi_charmrun_fd==-1) 
1127         charmrun_abort("ctrl_sendone called in standalone!\n");
1128   Cmi_charmrun_fd_sendflag=1;
1129   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
1130   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
1131   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
1132   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
1133   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
1134   Cmi_charmrun_fd_sendflag=0;
1135   skt_set_abort(oldAbort);
1136   MACHSTATE(2,"} ctrl_sendone_nolock");
1137 }
1138
1139 static void ctrl_sendone_locking(const char *type,
1140                                 const char *data1,int dataLen1,
1141                                 const char *data2,int dataLen2)
1142 {
1143   CmiCommLock();
1144   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
1145   CmiCommUnlock();
1146 }
1147
1148 #ifndef MEMORYUSAGE_OUTPUT
1149 #define MEMORYUSAGE_OUTPUT 0 
1150 #endif
1151 #if MEMORYUSAGE_OUTPUT 
1152 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
1153 static int memoryusage_counter;
1154 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
1155 #define memoryusage_output {\
1156   memoryusage_counter++;\
1157   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1158 #endif
1159
1160 static double Cmi_check_last;
1161
1162 /* if charmrun dies, we finish */
1163 static void pingCharmrun(void *ignored) 
1164 {
1165 #if MEMORYUSAGE_OUTPUT
1166   memoryusage_output;
1167   if(memoryusage_isOutput){
1168     memoryusage_counter = 0;
1169 #else
1170   {
1171 #endif 
1172
1173   double clock=GetClock();
1174   if (clock > Cmi_check_last + Cmi_check_delay) {
1175     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1176     Cmi_check_last = clock; 
1177 #if CMK_USE_GM || CMK_USE_MX
1178     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1179 #endif
1180     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1181     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1182     CmiCommLock();
1183     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1184     CmiCommUnlock();
1185   }
1186 #if 1
1187 #if CMK_USE_GM || CMK_USE_MX
1188   if (!Cmi_netpoll)
1189 #endif
1190   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1191 #endif
1192   }
1193 }
1194
1195 /* periodic charm ping, for gm and netpoll */
1196 static void pingCharmrunPeriodic(void *ignored)
1197 {
1198   pingCharmrun(ignored);
1199   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1200 }
1201
1202 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1203 static void charmrun_abort(const char *s)
1204 {
1205   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1206         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1207 CmiPrintStackTrace(0);
1208         abort();
1209   } else {
1210         char msgBuf[80];
1211         skt_set_abort(ignore_further_errors);
1212         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1213         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1214   }
1215 }
1216
1217 /* ctrl_getone */
1218
1219 #ifdef __FAULT__
1220 #include "machine-recover.c"
1221 #endif
1222
1223 static void node_addresses_store(ChMessage *msg);
1224
1225 static int barrierReceived = 0;
1226
1227 static void ctrl_getone(void)
1228 {
1229   ChMessage msg;
1230   MACHSTATE(2,"ctrl_getone")
1231   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
1232   ChMessage_recv(Cmi_charmrun_fd,&msg);
1233   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1234
1235   if (strcmp(msg.header.type,"die")==0) {
1236     MACHSTATE(2,"ctrl_getone bye bye")
1237     fprintf(stderr,"aborting: %s\n",msg.data);
1238     log_done();
1239     ConverseCommonExit();
1240     machine_exit(0);
1241   } 
1242 #if CMK_CCS_AVAILABLE
1243   else if (strcmp(msg.header.type, "req_fw")==0) {
1244     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1245         /*Sadly, I *can't* do a:
1246       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1247         here, because I can't send converse messages in the
1248         communication thread.  I *can* poke this message into 
1249         any convenient processor's queue, though:  (OSL, 9/14/2000)
1250         */
1251     int pe=0;/*<- node-local processor number. Any one will do.*/
1252     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1253     MACHSTATE(2,"Incoming CCS request");
1254     if (cmsg!=NULL) CmiPushPE(pe,cmsg);
1255   }
1256 #endif
1257 #ifdef __FAULT__        
1258   else if(strcmp(msg.header.type,"crashnode")==0) {
1259         crash_node_handle(&msg);
1260   }
1261   else if(strcmp(msg.header.type,"initnodetab")==0) {
1262         /** A processor crashed and got recreated. So charmrun sent 
1263           across the whole nodetable data to update this processor*/
1264         node_addresses_store(&msg);
1265         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1266   }
1267 #endif
1268   else if(strcmp(msg.header.type,"barrier")==0) {
1269         barrierReceived = 1;
1270   }
1271   else if(strcmp(msg.header.type,"barrier0")==0) {
1272         barrierReceived = 2;
1273   }
1274   else {
1275   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1276    * which is not available to the communication thread on an SMP version.
1277    */
1278     /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1279     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1280     machine_exit(1);
1281   }
1282   
1283   MACHSTATE(2,"ctrl_getone done")
1284   ChMessage_free(&msg);
1285 }
1286
1287 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1288 /*Deliver this reply data to this reply socket.
1289   The data is forwarded to CCS server via charmrun.*/
1290 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1291 {
1292   MACHSTATE(2,"Outgoing CCS reply");
1293   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1294       repData,repLen);
1295   MACHSTATE(1,"Outgoing CCS reply away");
1296 }
1297 #endif
1298
1299 /*****************************************************************************
1300  *
1301  * CmiPrintf, CmiError, CmiScanf
1302  *
1303  *****************************************************************************/
1304 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1305 static void InternalPrintf(const char *f, va_list l)
1306 {
1307   ChMessage replymsg;
1308   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1309   CmiStdoutFlush();
1310   vsprintf(buffer, f, l);
1311   if(Cmi_syncprint) {
1312           CmiCommLock();
1313           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1314           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1315           ChMessage_free(&replymsg);
1316           CmiCommUnlock();
1317   } else {
1318           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1319   }
1320   InternalWriteToTerminal(0,buffer,strlen(buffer));
1321   CmiTmpFree(buffer);
1322 }
1323
1324 static void InternalError(const char *f, va_list l)
1325 {
1326   ChMessage replymsg;
1327   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1328   CmiStdoutFlush();
1329   vsprintf(buffer, f, l);
1330   if(Cmi_syncprint) {
1331           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1332           CmiCommLock();
1333           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1334           ChMessage_free(&replymsg);
1335           CmiCommUnlock();
1336   } else {
1337           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1338   }
1339   InternalWriteToTerminal(1,buffer,strlen(buffer));
1340   CmiTmpFree(buffer);
1341 }
1342
1343 static int InternalScanf(char *fmt, va_list l)
1344 {
1345   ChMessage replymsg;
1346   char *ptr[20];
1347   char *p; int nargs, i;
1348   nargs=0;
1349   p=fmt;
1350   while (*p) {
1351     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1352     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1353     if (p[0]=='%') { nargs++; p++; continue; }
1354     if (*p=='\n') *p=' '; p++;
1355   }
1356   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1357   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1358   CmiLock(Cmi_scanf_mutex);
1359   if (Cmi_charmrun_fd!=-1)
1360   {/*Send charmrun the format string*/
1361         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1362         /*Wait for the reply (characters to scan) from charmrun*/
1363         CmiCommLock();
1364         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1365         i = sscanf((char*)replymsg.data, fmt,
1366                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1367                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1368                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1369         ChMessage_free(&replymsg);
1370         CmiCommUnlock();
1371   } else
1372   {/*Just do the scanf normally*/
1373         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1374                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1375                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1376   }
1377   CmiUnlock(Cmi_scanf_mutex);
1378   return i;
1379 }
1380
1381 #if CMK_CMIPRINTF_IS_A_BUILTIN
1382
1383 /*New stdarg.h declarations*/
1384 void CmiPrintf(const char *fmt, ...)
1385 {
1386   CpdSystemEnter();
1387   {
1388   va_list p; va_start(p, fmt);
1389   if (Cmi_charmrun_fd!=-1)
1390     InternalPrintf(fmt, p);
1391   else
1392     vfprintf(stdout,fmt,p);
1393   va_end(p);
1394   }
1395   CpdSystemExit();
1396 }
1397
1398 void CmiError(const char *fmt, ...)
1399 {
1400   CpdSystemEnter();
1401   {
1402   va_list p; va_start (p, fmt);
1403   if (Cmi_charmrun_fd!=-1)
1404     InternalError(fmt, p);
1405   else
1406     vfprintf(stderr,fmt,p);
1407   va_end(p);
1408   }
1409   CpdSystemExit();
1410 }
1411
1412 int CmiScanf(const char *fmt, ...)
1413 {
1414   int i;
1415   CpdSystemEnter();
1416   {
1417   va_list p; va_start(p, fmt);
1418   i = InternalScanf((char *)fmt, p);
1419   va_end(p);
1420   }
1421   CpdSystemExit();
1422   return i;
1423 }
1424
1425 #endif
1426
1427 /***************************************************************************
1428  * Output redirection:
1429  *  When people don't use CkPrintf, like above, we'd still like to be able
1430  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1431  * which lets us read the characters sent to stdout at our lesiure.
1432  ***************************************************************************/
1433
1434 /*Can read from stdout or stderr using these fd's*/
1435 static int readStdout[2]; 
1436 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1437 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1438 #define readStdoutBufLen (16*1024)
1439 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1440 static int servicingStdout;
1441
1442 /*Initialization-- should only be called once per node*/
1443 static void CmiStdoutInit(void) {
1444         int i;
1445         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1446
1447 /*There's some way to do this same thing in windows, but I don't know how*/
1448 #if !defined(_WIN32) || defined(__CYGWIN__)
1449         /*Prevent buffering in stdio library:*/
1450         setbuf(stdout,NULL); setbuf(stderr,NULL);
1451
1452         /*Reopen stdout and stderr fd's as new pipes:*/
1453         for (i=0;i<2;i++) {
1454                 int pair[2];
1455                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1456                 
1457                 /*First, save a copy of the original stdout*/
1458                 writeStdout[i]=dup(srcFd);
1459 #if 0
1460                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1461                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1462 #else
1463                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1464                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1465                         {perror("building stdio redirection socketpair"); exit(1);}
1466 #endif
1467                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1468                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1469                 
1470 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1471                 CmiEnableNonblockingIO(srcFd);
1472 #endif
1473 #if CMK_SHARED_VARS_UNAVAILABLE
1474                 if (Cmi_asyncio)
1475                 {
1476   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1477                         CmiEnableAsyncIO(readStdout[i]);
1478                 }
1479 #endif
1480         }
1481 #else
1482 /*Windows system-- just fake reads for now*/
1483 # ifndef read
1484 #  define read(x,y,z) 0
1485 # endif
1486 # ifndef write
1487 #  define write(x,y,z) 
1488 # endif
1489 #endif
1490 }
1491
1492 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1493 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1494 {
1495         write(writeStdout[isStdErr],str,len);   
1496 }
1497
1498 /*
1499   Service this particular stdout pipe.  
1500   Must hold comm. lock.
1501 */
1502 static void CmiStdoutServiceOne(int i) {
1503         int nBytes;
1504         const static char *cmdName[2]={"print","printerr"};
1505         servicingStdout=1;
1506         while(1) {
1507                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1508                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1509                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1510                 if (nBytes<=0) break; /*Nothing to send*/
1511                 
1512                 /*Send these bytes off to charmrun*/
1513                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1514                 nBytes++; /*Include zero-terminator in message to charmrun*/
1515                 
1516                 if (nBytes>=readStdoutBufLen-100) 
1517                 { /*We must have filled up our output pipe-- most output libraries
1518                    don't handle this well (e.g., glibc printf just drops the line).*/
1519                         
1520                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1521                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1522                         nBytes--; /*Remove terminator from user's data*/
1523                         tooMuchLen=strlen(tooMuchWarn)+1;
1524                 }
1525                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1526                                     tooMuchWarn,tooMuchLen);
1527                 
1528                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1529         }
1530         servicingStdout=0;
1531         serviceStdout[i]=0; /*This pipe is now serviced*/
1532 }
1533
1534 /*Service all stdout pipes, whether it looks like they need it
1535   or not.  Used when you aren't sure if select() has been called recently.
1536   Must hold comm. lock.
1537 */
1538 static void CmiStdoutServiceAll(void) {
1539         int i;
1540         for (i=0;i<2;i++) {
1541                 if (readStdout[i]==0) continue; /*Pipe not open*/
1542                 CmiStdoutServiceOne(i);
1543         }
1544 }
1545
1546 /*Service any outstanding stdout pipes.
1547   Must hold comm. lock.
1548 */
1549 static void CmiStdoutService(void) {
1550         CmiStdoutServiceAll();
1551 }
1552
1553 /*Add our pipes to the pile for select() or poll().
1554   Both can be called with or without the comm. lock.
1555 */
1556 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1557         int i;
1558         for (i=0;i<2;i++) {
1559                 if (readStdout[i]==0) continue; /*Pipe not open*/
1560                 CMK_PIPE_ADDREAD(readStdout[i]);
1561         }
1562 }
1563 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1564         int i;
1565         for (i=0;i<2;i++) {
1566                 if (readStdout[i]==0) continue; /*Pipe not open*/
1567                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1568         }
1569 }
1570 static int CmiStdoutNeedsService(void) {
1571         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1572 }
1573
1574 /*Called every few milliseconds to flush the stdout pipes*/
1575 static void CmiStdoutFlush(void) {
1576         if (servicingStdout) return; /* might be called by SIGALRM */
1577         CmiCommLockOrElse( return; )
1578         CmiCommLock();
1579         CmiStdoutServiceAll();
1580         CmiCommUnlock();
1581 }
1582
1583 /***************************************************************************
1584  * Message Delivery:
1585  *
1586  ***************************************************************************/
1587
1588 #include "machine-dgram.c"
1589
1590
1591 #ifndef CmiNodeFirst
1592 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1593 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1594 #endif
1595
1596 #ifndef CmiNodeOf
1597 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1598 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1599 #endif
1600
1601
1602 /*****************************************************************************
1603  *
1604  * node_addresses
1605  *
1606  *  These two functions fill the node-table.
1607  *
1608  *
1609  *   This node, like all others, first sends its own address to charmrun
1610  *   using this command:
1611  *
1612  *     Type: nodeinfo
1613  *     Data: Big-endian 4-byte ints
1614  *           <my-node #><Dataport>
1615  *
1616  *   When charmrun has all the addresses, he sends this table to me:
1617  *
1618  *     Type: nodes
1619  *     Data: Big-endian 4-byte ints
1620  *           <number of nodes n>
1621  *           <#PEs><IP><Dataport> Node 0
1622  *           <#PEs><IP><Dataport> Node 1
1623  *           ...
1624  *           <#PEs><IP><Dataport> Node n-1
1625  *
1626  *****************************************************************************/
1627
1628 #if CMK_USE_IBVERBS
1629 void copyInfiAddr(ChInfiAddr *qpList);
1630 #endif
1631
1632 #if CMK_IBVERBS_FAST_START
1633 static void send_partial_init()
1634 {
1635   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1636         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1637 }       
1638 #endif
1639
1640
1641 /*Note: node_addresses_obtain is called before starting
1642   threads, so no locks are needed (or valid!)*/
1643 static void node_addresses_obtain(char **argv)
1644 {
1645   ChMessage nodetabmsg; /* info about all nodes*/
1646   MACHSTATE(3,"node_addresses_obtain { ");
1647   if (Cmi_charmrun_fd==-1) 
1648   {/*Standalone-- fake a single-node nodetab message*/
1649         int npes=1;
1650         ChSingleNodeinfo *fakeTab;
1651         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1652         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1653         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1654 #if CMK_SHARED_VARS_UNAVAILABLE
1655         if (npes!=1) {
1656                 fprintf(stderr,
1657                         "To use multiple processors, you must run this program as:\n"
1658                         " > charmrun +p%d %s <args>\n"
1659                         "or build the %s-smp version of Charm++.\n",
1660                         npes,argv[0],CMK_MACHINE_NAME);
1661                 exit(1);
1662         }
1663 #else
1664         /* standalone smp version reads ppn */
1665         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1666                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1667           npes = _Cmi_mynodesize;
1668 #endif
1669         /*This is a stupid hack: we expect the *number* of nodes
1670         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1671         (which happens to have exactly that layout!) and stuff
1672         a 1 into the "node number" slot
1673         */
1674         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1675         fakeTab->info.nPE=ChMessageInt_new(npes);
1676         fakeTab->info.dataport=ChMessageInt_new(0);
1677         fakeTab->info.IP=_skt_invalid_ip;
1678   }
1679   else 
1680   { /*Contact charmrun for machine info.*/
1681         ChSingleNodeinfo me;
1682
1683         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1684
1685 #if CMK_USE_IBVERBS
1686         {
1687                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1688                 me.info.qpList = malloc(qpListSize);
1689                 copyInfiAddr(me.info.qpList);
1690                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1691                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1692                 free(me.info.qpList);
1693         }
1694 #else
1695         /*The nPE fields are set by charmrun--
1696           these values don't matter. 
1697           Set IP in case it is mpiexec mode where charmrun does not have IP yet
1698         */
1699         me.info.nPE=ChMessageInt_new(0);
1700         /* me.info.IP=_skt_invalid_ip; */
1701         me.info.IP=skt_innode_my_ip();
1702         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1703 #ifdef CMK_USE_MX
1704         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1705 #endif
1706 #if CMK_USE_IBUD
1707         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1708         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1709         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1710         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1711 #endif
1712         me.info.dataport=ChMessageInt_new(dataport);
1713
1714         /*Send our node info. to charmrun.
1715         CommLock hasn't been initialized yet-- 
1716         use non-locking version*/
1717         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1718         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1719 #endif  //CMK_USE_IBVERBS
1720
1721         MACHSTATE(3,"initnode sent");
1722   
1723         /*We get the other node addresses from a message sent
1724           back via the charmrun control port.*/
1725         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1726                 CmiAbort("Timeout waiting for nodetab!\n");
1727         }
1728         MACHSTATE(2,"recv initnode {");
1729         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1730         MACHSTATE(2,"} recv initnode");
1731   }
1732 //#if CMK_USE_IBVERBS   
1733 //#else
1734   node_addresses_store(&nodetabmsg);
1735   ChMessage_free(&nodetabmsg);
1736 //#endif        
1737   MACHSTATE(3,"} node_addresses_obtain ");
1738 }
1739
1740 #if CMK_NODE_QUEUE_AVAILABLE
1741
1742 /***********************************************************************
1743  * DeliverOutgoingNodeMessage()
1744  *
1745  * This function takes care of delivery of outgoing node messages from the
1746  * sender end. Broadcast messages are divided into sets of messages that 
1747  * are bound to the local node, and to remote nodes. For local
1748  * transmission, the messages are directly pushed into the recv
1749  * queues. For non-local transmission, the function DeliverViaNetwork()
1750  * is called
1751  ***********************************************************************/
1752 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1753 {
1754   int i, rank, dst; OtherNode node;
1755
1756   dst = ogm->dst;
1757   switch (dst) {
1758   case NODE_BROADCAST_ALL:
1759     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1760     /*case-fallthrough (no break)-- deliver to all other processors*/
1761   case NODE_BROADCAST_OTHERS:
1762 #if CMK_BROADCAST_SPANNING_TREE
1763     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1764 #elif CMK_BROADCAST_HYPERCUBE
1765     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1766 #else
1767     for (i=0; i<_Cmi_numnodes; i++)
1768       if (i!=_Cmi_mynode)
1769         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
1770 #endif
1771     GarbageCollectMsg(ogm);
1772     break;
1773   default:
1774     node = nodes+dst;
1775     rank=DGRAM_NODEMESSAGE;
1776     if (dst != _Cmi_mynode) {
1777       DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1778       GarbageCollectMsg(ogm);
1779     } else {
1780       if (ogm->freemode == 'A') {
1781         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1782         ogm->freemode = 'X';
1783       } else {
1784         CmiPushNode(ogm->data);
1785         FreeOutgoingMsg(ogm);
1786       }
1787     }
1788   }
1789 }
1790
1791 #else
1792
1793 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1794
1795 #endif
1796
1797 #if CMK_C_INLINE
1798 inline static
1799 #endif
1800 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
1801 #if CMK_USE_SYSVSHM
1802         {
1803 #if SYSVSHM_STATS
1804         double _startValidTime = CmiWallTimer();
1805 #endif
1806         int ret=CmiValidSysvshm(ogm,node);
1807 #if SYSVSHM_STATS
1808         sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1809 #endif                  
1810         MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
1811         if(ret){
1812                 CmiSendMessageSysvshm(ogm,node,rank,broot);
1813         }else{
1814                 DeliverViaNetwork(ogm, node, rank, broot,copy);
1815         }
1816         } 
1817 #elif CMK_USE_PXSHM
1818      {
1819 #if PXSHM_STATS
1820         double _startValidTime = CmiWallTimer();
1821 #endif
1822       int ret=CmiValidPxshm(ogm,node);
1823 #if PXSHM_STATS
1824         pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1825 #endif                  
1826       MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
1827       if(ret){
1828          CmiSendMessagePxshm(ogm,node,rank,broot);
1829        }else{
1830          DeliverViaNetwork(ogm, node, rank, broot,copy);
1831        }
1832       } 
1833 #else
1834       DeliverViaNetwork(ogm, node, rank, broot, copy);
1835 #endif                  
1836         
1837 }
1838
1839
1840
1841 /***********************************************************************
1842  * DeliverOutgoingMessage()
1843  *
1844  * This function takes care of delivery of outgoing messages from the
1845  * sender end. Broadcast messages are divided into sets of messages that 
1846  * are bound to the local node, and to remote nodes. For local
1847  * transmission, the messages are directly pushed into the recv
1848  * queues. For non-local transmission, the function DeliverViaNetwork()
1849  * is called
1850  ***********************************************************************/
1851 int DeliverOutgoingMessage(OutgoingMsg ogm)
1852 {
1853   int i, rank, dst; OtherNode node;
1854         
1855   int network = 1;
1856
1857         
1858   dst = ogm->dst;
1859
1860   switch (dst) {
1861   case PE_BROADCAST_ALL:
1862 #if !CMK_SMP_NOT_RELAX_LOCK       
1863     CmiCommLock();
1864 #endif
1865     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1866       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1867     }
1868 #if CMK_BROADCAST_SPANNING_TREE
1869     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1870 #elif CMK_BROADCAST_HYPERCUBE
1871     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1872 #else
1873     for (i=0; i<_Cmi_numnodes; i++)
1874       if (i!=_Cmi_mynode){
1875         /*FAULT_EVAC : is the target processor valid*/
1876         if(CmiNodeAlive(i)){
1877           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1878         }
1879       } 
1880 #endif
1881     GarbageCollectMsg(ogm);
1882 #if !CMK_SMP_NOT_RELAX_LOCK               
1883     CmiCommUnlock();
1884 #endif    
1885     break;
1886   case PE_BROADCAST_OTHERS:
1887 #if !CMK_SMP_NOT_RELAX_LOCK               
1888     CmiCommLock();
1889 #endif  
1890     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1891       if (rank + Cmi_nodestart != ogm->src) {
1892         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1893       }
1894 #if CMK_BROADCAST_SPANNING_TREE
1895     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1896 #elif CMK_BROADCAST_HYPERCUBE
1897     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1898 #else
1899     for (i = 0; i<_Cmi_numnodes; i++)
1900       if (i!=_Cmi_mynode){
1901         /*FAULT_EVAC : is the target processor valid*/
1902         if(CmiNodeAlive(i)){
1903           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1904         }
1905       } 
1906 #endif
1907     GarbageCollectMsg(ogm);
1908 #if !CMK_SMP_NOT_RELAX_LOCK               
1909     CmiCommUnlock();
1910 #endif    
1911     break;
1912   default:
1913 #ifndef CMK_OPTIMIZE
1914     if (dst<0 || dst>=CmiNumPes())
1915       CmiAbort("Send to out-of-bounds processor!");
1916 #endif
1917     node = nodes_by_pe[dst];
1918     rank = dst - node->nodestart;
1919     if (node->nodestart != Cmi_nodestart) {
1920 #if !CMK_SMP_NOT_RELAX_LOCK                     
1921         CmiCommLock();
1922 #endif          
1923         DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1924         GarbageCollectMsg(ogm);
1925 #if !CMK_SMP_NOT_RELAX_LOCK                     
1926         CmiCommUnlock();
1927 #endif          
1928     } else {
1929       network = 0;
1930       if (ogm->freemode == 'A') {
1931         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1932         ogm->freemode = 'X';
1933       } else {
1934         CmiPushPE(rank, ogm->data);
1935         FreeOutgoingMsg(ogm);
1936       }
1937     }
1938   }
1939 #if CMK_MULTICORE
1940   network = 0;
1941 #endif
1942   return network;
1943 }
1944
1945
1946 /******************************************************************************
1947  *
1948  * CmiGetNonLocal
1949  *
1950  * The design of this system is that the communication thread does all the
1951  * work, to eliminate as many locking issues as possible.  This is the only
1952  * part of the code that happens in the receiver-thread.
1953  *
1954  * This operation is fairly cheap, it might be worthwhile to inline
1955  * the code into CmiDeliverMsgs to reduce function call overhead.
1956  *
1957  *****************************************************************************/
1958
1959 #if CMK_NODE_QUEUE_AVAILABLE
1960 char *CmiGetNonLocalNodeQ(void)
1961 {
1962   char *result = 0;
1963   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1964     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1965     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1966     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1967   }
1968   return result;
1969 }
1970 #endif
1971
1972 void *CmiGetNonLocal(void)
1973 {
1974 #if CMK_SMP_MULTIQ
1975   int i;
1976 #endif
1977
1978   CmiState cs = CmiGetState();
1979   CmiIdleLock_checkMessage(&cs->idle);
1980
1981 #if !CMK_SMP_MULTIQ
1982   return (void *) PCQueuePop(cs->recv);
1983 #else
1984   void *retVal = NULL;
1985   for(i=cs->curPolledIdx; i<MULTIQ_GRPSIZE; i++){
1986     retVal = (void *)PCQueuePop(cs->recv[i]);
1987     if(retVal!=NULL) {
1988         cs->curPolledIdx = i+1;
1989         return retVal;
1990     }
1991   }
1992   cs->curPolledIdx=0;
1993   return NULL;
1994 #endif
1995 }
1996
1997
1998 /**
1999  * Set up an OutgoingMsg structure for this message.
2000  */
2001 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
2002   OutgoingMsg ogm;
2003   MallocOutgoingMsg(ogm);
2004   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
2005   ogm->size = size;
2006   ogm->data = data;
2007   ogm->src = cs->pe;
2008   ogm->dst = pe;
2009   ogm->freemode = freemode;
2010   ogm->refcount = 0;
2011   return (CmiCommHandle)ogm;    
2012 }
2013
2014 #if CMK_NODE_QUEUE_AVAILABLE
2015
2016 /******************************************************************************
2017  *
2018  * CmiGeneralNodeSend
2019  *
2020  * Description: This is a generic message sending routine. All the
2021  * converse message send functions are implemented in terms of this
2022  * function. (By setting appropriate flags (eg freemode) that tell
2023  * CmiGeneralSend() how exactly to handle the particular case of
2024  * message send)
2025  *
2026  *****************************************************************************/
2027
2028 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
2029 {
2030   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2031   MACHSTATE(1,"CmiGeneralNodeSend {");
2032
2033   if (freemode == 'S') {
2034     char *copy = (char *)CmiAlloc(size);
2035     if (!copy)
2036       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2037     memcpy(copy, data, size);
2038     data = copy; freemode = 'F';
2039   }
2040
2041 #if CMK_IMMEDIATE_MSG
2042     /* execute the immediate message right away */
2043   if (node == CmiMyNode() && CmiIsImmediate(data)) {
2044     CmiPushImmediateMsg(data);
2045       /* only communication thread executes immediate messages in SMP */
2046     if (!_immRunning) CmiHandleImmediate();
2047     return NULL;
2048   }
2049 #endif
2050
2051   CmiMsgHeaderSetLength(data, size);
2052   ogm=PrepareOutgoing(cs,node,size,freemode,data);
2053   CmiCommLock();
2054   DeliverOutgoingNodeMessage(ogm);
2055   CmiCommUnlock();
2056   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2057   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2058   MACHSTATE(1,"} CmiGeneralNodeSend");
2059   return (CmiCommHandle)ogm;
2060 }
2061
2062 #endif
2063
2064
2065 /******************************************************************************
2066  *
2067  * CmiGeneralSend
2068  *
2069  * Description: This is a generic message sending routine. All the
2070  * converse message send functions are implemented in terms of this
2071  * function. (By setting appropriate flags (eg freemode) that tell
2072  * CmiGeneralSend() how exactly to handle the particular case of
2073  * message send)
2074  *
2075  *****************************************************************************/
2076
2077 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2078 {
2079   int sendonnetwork;
2080   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2081   MACHSTATE(1,"CmiGeneralSend {");
2082
2083   if (freemode == 'S') {
2084 #if CMK_USE_GM
2085     if (pe != cs->pe) {
2086       freemode = 'G';
2087     }
2088     else
2089 #endif
2090     {
2091     char *copy = (char *)CmiAlloc(size);
2092     if (!copy)
2093       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2094     memcpy(copy, data, size);
2095     data = copy; freemode = 'F';
2096     }
2097   }
2098   
2099   if (pe == cs->pe) {
2100 #if ! CMK_SMP
2101     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
2102                       The SMP comm thread never gets here, because of the pe test. */
2103 #endif
2104     {
2105 #if CMK_IMMEDIATE_MSG
2106       /* execute the immediate message right away */
2107       /* but to avoid infinite recursive call, don't do this if _immRunning */
2108     if (CmiIsImmediate(data)) {
2109       CmiPushImmediateMsg(data);
2110       CmiHandleImmediate();
2111       return 0;
2112     }
2113 #endif
2114     CdsFifo_Enqueue(cs->localqueue, data);
2115     if (freemode == 'A') {
2116       MallocOutgoingMsg(ogm);
2117       ogm->freemode = 'X';
2118       return ogm;
2119     } else return 0;
2120     }
2121   }
2122
2123 #if CMK_PERSISTENT_COMM
2124   if (phs) {
2125       CmiAssert(phsSize == 1);
2126       CmiSendPersistentMsg(*phs, pe, size, data);
2127       return NULL;
2128   }
2129 #endif
2130
2131   CmiMsgHeaderSetLength(data, size);
2132   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
2133
2134   #if CMK_SMP_NOT_RELAX_LOCK  
2135   CmiCommLock();
2136 #endif  
2137   
2138   sendonnetwork = DeliverOutgoingMessage(ogm);
2139   
2140 #if CMK_SMP_NOT_RELAX_LOCK  
2141   CmiCommUnlock();
2142 #endif  
2143   
2144   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2145 #if CMK_USE_SYSVSHM
2146         CommunicationServerSysvshm();
2147 #elif CMK_USE_PXSHM
2148         CommunicationServerPxshm();
2149 #endif
2150 #if !CMK_SHARED_VARS_UNAVAILABLE
2151 #if !CMK_SMP_NOT_SKIP_COMMSERVER
2152   if (sendonnetwork!=0)   /* only call server when we send msg on network in SMP */
2153 #endif
2154 #endif
2155   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2156   MACHSTATE(1,"}  CmiGeneralSend");
2157   return (CmiCommHandle)ogm;
2158 }
2159
2160
2161 void CmiSyncSendFn(int p, int s, char *m)
2162
2163   CQdCreate(CpvAccess(cQdState), 1);
2164   CmiGeneralSend(p,s,'S',m); 
2165 }
2166
2167 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2168
2169   CQdCreate(CpvAccess(cQdState), 1);
2170   return CmiGeneralSend(p,s,'A',m); 
2171 }
2172
2173 void CmiFreeSendFn(int p, int s, char *m)
2174
2175   CQdCreate(CpvAccess(cQdState), 1);
2176   CmiGeneralSend(p,s,'F',m); 
2177 }
2178
2179 void CmiSyncBroadcastFn(int s, char *m)
2180
2181   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2182   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
2183 }
2184
2185 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2186
2187   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2188   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
2189 }
2190
2191 void CmiFreeBroadcastFn(int s, char *m)
2192
2193   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
2194   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
2195 }
2196
2197 void CmiSyncBroadcastAllFn(int s, char *m)
2198
2199   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2200   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
2201 }
2202
2203 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2204
2205   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2206   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
2207 }
2208
2209 void CmiFreeBroadcastAllFn(int s, char *m)
2210
2211   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2212   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
2213 }
2214
2215 #if CMK_NODE_QUEUE_AVAILABLE
2216
2217 void CmiSyncNodeSendFn(int p, int s, char *m)
2218
2219   CQdCreate(CpvAccess(cQdState), 1);
2220   CmiGeneralNodeSend(p,s,'S',m); 
2221 }
2222
2223 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
2224
2225   CQdCreate(CpvAccess(cQdState), 1);
2226   return CmiGeneralNodeSend(p,s,'A',m); 
2227 }
2228
2229 void CmiFreeNodeSendFn(int p, int s, char *m)
2230
2231   CQdCreate(CpvAccess(cQdState), 1);
2232   CmiGeneralNodeSend(p,s,'F',m); 
2233 }
2234
2235 void CmiSyncNodeBroadcastFn(int s, char *m)
2236
2237   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2238   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
2239 }
2240
2241 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
2242
2243   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2244   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
2245 }
2246
2247 void CmiFreeNodeBroadcastFn(int s, char *m)
2248
2249   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2250   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
2251 }
2252
2253 void CmiSyncNodeBroadcastAllFn(int s, char *m)
2254
2255   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2256   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
2257 }
2258
2259 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
2260
2261   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2262   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
2263 }
2264
2265 void CmiFreeNodeBroadcastAllFn(int s, char *m)
2266
2267   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2268   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
2269 }
2270 #endif
2271
2272 /******************************************************************************
2273  *
2274  * Comm Handle manipulation.
2275  *
2276  *****************************************************************************/
2277
2278 int CmiAsyncMsgSent(CmiCommHandle handle)
2279 {
2280   return (((OutgoingMsg)handle)->freemode == 'X');
2281 }
2282
2283 void CmiReleaseCommHandle(CmiCommHandle handle)
2284 {
2285   FreeOutgoingMsg(((OutgoingMsg)handle));
2286 }
2287
2288 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
2289
2290 /*****************************************************************************
2291  *
2292  * NET version List-Cast and Multicast Code
2293  *
2294  ****************************************************************************/
2295                                                                                 
2296 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2297 {
2298   int i;
2299   for(i=0;i<npes;i++) {
2300     CmiReference(msg);
2301     CmiSyncSendAndFree(pes[i], len, msg);
2302   }
2303 }
2304                                                                                 
2305 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2306 {
2307   CmiError("ListSend not implemented.");
2308   return (CmiCommHandle) 0;
2309 }
2310                                                                                 
2311 /* 
2312   because in all net versions, the message buffer after CmiSyncSendAndFree
2313   returns is not changed, we can use memory reference trick to avoid 
2314   memory copying here
2315 */
2316 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2317 {
2318   int i;
2319   for(i=0;i<npes;i++) {
2320     CmiReference(msg);
2321     CmiSyncSendAndFree(pes[i], len, msg);
2322   }
2323   CmiFree(msg);
2324 }
2325
2326 #endif
2327
2328 #if CMK_BROADCAST_SPANNING_TREE
2329 /*
2330   if root is 1, it is called from the broadcast root, only ogm is needed;
2331   if root is 0, it is called in the tree, ogm must be NULL and msg, size and startpe are needed
2332   note: function leaves msg buffer untouched
2333 */
2334 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
2335 {
2336   CmiState cs = CmiGetState();
2337   int i;
2338
2339   if (root) startpe = _Cmi_mynode;
2340   else ogm = NULL;
2341
2342   CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
2343
2344   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
2345     int p = _Cmi_mynode-startpe;
2346     if (p<0) p+=_Cmi_numnodes;
2347     p = BROADCAST_SPANNING_FACTOR*p + i;
2348     if (p > _Cmi_numnodes - 1) break;
2349     p += startpe;
2350     p = p%_Cmi_numnodes;
2351     CmiAssert(p!=_Cmi_mynode);
2352     /* CmiPrintf("SendSpanningChildren: %d => %d\n", _Cmi_mynode, p); */
2353     if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2354   //  DeliverViaNetwork(ogm, nodes + p, noderank, startpe, 1);
2355     DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
2356   }
2357   if (!root && ogm) GarbageCollectMsg(ogm);
2358 }
2359 #endif
2360
2361 #if CMK_BROADCAST_HYPERCUBE
2362 int log_of_2 (int i) {
2363   int m;
2364   for (m=0; i>(1<<m); ++m);
2365   return m;
2366 }
2367
2368 /* called from root - send msg along the hypercube in broadcast.
2369   note: function leaves msg buffer untouched
2370 */
2371 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
2372 {
2373   CmiState cs = CmiGetState();
2374   int i, k, npes, tmp;
2375   int *dest_pes;
2376
2377   if (root) {
2378     msg = ogm->data;
2379     srcpe = CmiMyNode();
2380   }
2381   else ogm = NULL;
2382
2383   tmp = srcpe ^ CmiMyNode();
2384   k = log_of_2(CmiNumNodes()) + 2;
2385   if (tmp) {
2386      do {--k;} while (!(tmp>>k));
2387   }
2388
2389         MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
2390
2391   dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
2392   k--;
2393   npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
2394   
2395   for (i = 0; i < npes; i++) {
2396     int p = dest_pes[i];
2397     /* CmiPrintf("SendHypercube: %d => %d (%d)\n", cs->pe, p, i); */
2398     if (!root && ! ogm) 
2399       ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2400     DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
2401   }
2402   if (!root && ogm) GarbageCollectMsg(ogm);
2403   CmiTmpFree(dest_pes);
2404 }
2405 #endif
2406
2407 /*
2408 #if CMK_IMMEDIATE_MSG
2409 void CmiProbeImmediateMsg()
2410 {
2411   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2412 }
2413 #endif
2414 */
2415
2416 /* Network progress function is used to poll the network when for
2417    messages. This flushes receive buffers on some implementations*/ 
2418 void CmiMachineProgressImpl()
2419 {
2420 #if CMK_USE_SYSVSHM
2421         CommunicationServerSysvshm();
2422 #elif CMK_USE_PXSHM
2423         CommunicationServerPxshm();
2424 #endif
2425   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2426 }
2427
2428 /******************************************************************************
2429  *
2430  * Main code, Init, and Exit
2431  *
2432  *****************************************************************************/
2433
2434 #if CMK_BARRIER_USE_COMMON_CODE
2435
2436 /* happen at node level */
2437 /* must be called on every PE including communication processors */
2438 int CmiBarrier()
2439 {
2440   int len, size, i;
2441   int status;
2442   int numnodes = CmiNumNodes();
2443   static int barrier_phase = 0;
2444
2445   if (Cmi_charmrun_fd == -1) return 0;                // standalone
2446   if (numnodes == 1) {
2447     CmiNodeAllBarrier();
2448     return 0;
2449   }
2450
2451   if (CmiMyRank() == 0) {
2452     ctrl_sendone_locking("barrier",NULL,0,NULL,0);
2453     while (barrierReceived != 1) {
2454       CmiCommLock();
2455       ctrl_getone();
2456       CmiCommUnlock();
2457     }
2458     barrierReceived = 0;
2459     barrier_phase ++;
2460   }
2461
2462   CmiNodeAllBarrier();
2463   /* printf("[%d] OUT of barrier %d \n", CmiMyPe(), barrier_phase); */
2464   return 0;
2465 }
2466
2467
2468 int CmiBarrierZero()
2469 {
2470   int i;
2471   int numnodes = CmiNumNodes();
2472   ChMessage msg;
2473
2474   if (Cmi_charmrun_fd == -1) return 0;                // standalone
2475   if (numnodes == 1) {
2476     CmiNodeAllBarrier();
2477     return 0;
2478   }
2479
2480   if (CmiMyRank() == 0) {
2481     char str[64];
2482     sprintf(str, "%d", CmiMyNode());
2483     ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
2484     if (CmiMyNode() == 0) {
2485       while (barrierReceived != 2) {
2486         CmiCommLock();
2487         ctrl_getone();
2488         CmiCommUnlock();
2489       }
2490       barrierReceived = 0;
2491     }
2492   }
2493
2494   CmiNodeAllBarrier();
2495   return 0;
2496 }
2497
2498 #endif
2499
2500 /******************************************************************************
2501  *
2502  * Main code, Init, and Exit
2503  *
2504  *****************************************************************************/
2505 extern void CthInit(char **argv);
2506 extern void ConverseCommonInit(char **);
2507
2508 static char     **Cmi_argv;
2509 static char     **Cmi_argvcopy;
2510 static CmiStartFn Cmi_startfn;   /* The start function */
2511 static int        Cmi_usrsched;  /* Continue after start function finishes? */
2512
2513 static void ConverseRunPE(int everReturn)
2514 {
2515   CmiIdleState *s=CmiNotifyGetState();
2516   CmiState cs;
2517   char** CmiMyArgv;
2518   CmiNodeAllBarrier();
2519   cs = CmiGetState();
2520   CpvInitialize(void *,CmiLocalQueue);
2521   CpvAccess(CmiLocalQueue) = cs->localqueue;
2522
2523   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
2524   if (CmiMyRank())
2525     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
2526   else
2527     CmiMyArgv = Cmi_argv;
2528   CthInit(CmiMyArgv);
2529
2530 #if CMK_USE_GM
2531   CmiCheckGmStatus();
2532 #endif
2533
2534   ConverseCommonInit(CmiMyArgv);
2535
2536   /* initialize the network progress counter*/
2537   /* Network progress function is used to poll the network when for
2538      messages. This flushes receive buffers on some  implementations*/ 
2539   CpvInitialize(int , networkProgressCount);
2540   CpvAccess(networkProgressCount) = 0;
2541
2542   /* better to show the status here */
2543   if (CmiMyPe() == 0) {
2544     if (Cmi_netpoll == 1) {
2545       CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
2546     }
2547 #if CMK_SHARED_VARS_UNAVAILABLE
2548     else {
2549       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
2550         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
2551     }
2552 #endif
2553   }
2554
2555 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
2556   CmiInitNotifyCommThdScheme();
2557 #endif
2558
2559 #if MEMORYUSAGE_OUTPUT
2560   memoryusage_counter = 0;
2561 #endif
2562 #if CMK_USE_GM || CMK_USE_MX
2563   if (Cmi_charmrun_fd != -1)
2564 #endif
2565   {
2566   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
2567       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
2568   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
2569       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
2570 #if CMK_USE_SYSVSHM
2571          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
2572          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
2573 #elif CMK_USE_PXSHM
2574                 //TODO: add pxshm notify idle
2575          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
2576          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
2577 #endif
2578   }
2579
2580 #if CMK_SHARED_VARS_UNAVAILABLE
2581   if (Cmi_netpoll) /*Repeatedly call CommServer*/
2582     CcdCallOnConditionKeep(CcdPERIODIC, 
2583         (CcdVoidFn) CommunicationPeriodic, NULL);
2584   else /*Only need this for retransmits*/
2585     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
2586         (CcdVoidFn) CommunicationPeriodic, NULL);
2587 #endif
2588
2589   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
2590     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
2591 #if CMK_SHARED_VARS_UNAVAILABLE
2592     if (!Cmi_asyncio) {
2593     /* gm cannot live with setitimer */
2594     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
2595     }
2596     else {
2597     /*Occasionally ping charmrun, to test if it's dead*/
2598     struct itimerval i;
2599     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
2600 #if MEMORYUSAGE_OUTPUT
2601     i.it_interval.tv_sec = 0;
2602     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2603     i.it_value.tv_sec = 0;
2604     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2605 #else
2606     i.it_interval.tv_sec = 1;
2607     i.it_interval.tv_usec = 0;
2608     i.it_value.tv_sec = 1;
2609     i.it_value.tv_usec = 0;
2610 #endif
2611     setitimer(ITIMER_REAL, &i, NULL);
2612     }
2613
2614 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
2615     /*Occasionally check for retransmissions, outgoing acks, etc.*/
2616     /*no need for GM case */
2617     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
2618 #endif
2619 #endif
2620     
2621     /*Initialize the clock*/
2622     Cmi_clock=GetClock();
2623   }
2624
2625 #ifdef IGET_FLOWCONTROL 
2626   /* Call the function once to determine the amount of physical memory available */
2627   getAvailSysMem();
2628   /* Call the function to periodically call the token adapt function */
2629   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
2630   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
2631         (CcdVoidFn) TokenUpdatePeriodic, NULL);
2632 #endif
2633   
2634 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
2635   srand((int)(1024.0*CmiWallTimer()));
2636   if (CmiMyPe()==0)
2637     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
2638         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
2639 #endif
2640
2641 #ifdef __ONESIDED_IMPL
2642 #ifdef __ONESIDED_NO_HARDWARE
2643   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
2644   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
2645   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2646   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2647 #endif
2648 #ifdef __ONESIDED_GM_HARDWARE
2649   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2650   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2651 #endif
2652 #endif
2653
2654   /* communication thread */
2655   if (CmiMyRank() == CmiMyNodeSize()) {
2656     if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2657     if (Cmi_charmrun_fd!=-1)
2658           while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
2659   }
2660   else{
2661     if (!everReturn) {
2662       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2663       /* turn on immediate messages only now
2664        node barrier previously should take care of the node synchronization */
2665       _immediateReady = 1;
2666       if (Cmi_usrsched==0) CsdScheduler(-1);
2667       ConverseExit();
2668     }else{
2669       _immediateReady = 1;
2670     }
2671   }
2672 }
2673
2674 void ConverseExit(void)
2675 {
2676   MACHSTATE(2,"ConverseExit {");
2677   machine_initiated_shutdown=1;
2678   if (CmiMyRank()==0) {
2679     if(Cmi_print_stats)
2680       printNetStatistics();
2681     log_done();
2682   }
2683 #if CMK_USE_SYSVSHM
2684         CmiExitSysvshm();
2685 #elif CMK_USE_PXSHM
2686         CmiExitPxshm();
2687 #endif
2688   ConverseCommonExit();               /* should be called by every rank */
2689   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
2690   if (CmiMyRank()==0) CmiStdoutFlush();
2691   if (Cmi_charmrun_fd==-1) {
2692     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
2693     else while (1) CmiYield();
2694   }
2695   else {
2696         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
2697 #if CMK_SHARED_VARS_UNAVAILABLE
2698         Cmi_check_delay = 1.0;          /* speed up checking of charmrun */
2699         while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2700 #elif CMK_MULTICORE
2701         if (!Cmi_commthread && CmiMyRank()==0) {
2702           Cmi_check_delay = 1.0;        /* speed up checking of charmrun */
2703           while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2704         }
2705 #endif
2706   }
2707   MACHSTATE(2,"} ConverseExit");
2708
2709 /*Comm. thread will kill us.*/
2710   while (1) CmiYield();
2711 }
2712
2713 static void set_signals(void)
2714 {
2715   if(!Cmi_truecrash) {
2716     signal(SIGSEGV, KillOnAllSigs);
2717     signal(SIGFPE, KillOnAllSigs);
2718     signal(SIGILL, KillOnAllSigs);
2719     signal(SIGINT, KillOnAllSigs);
2720     signal(SIGTERM, KillOnAllSigs);
2721     signal(SIGABRT, KillOnAllSigs);
2722 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2723     signal(SIGQUIT, KillOnAllSigs);
2724     signal(SIGBUS, KillOnAllSigs);
2725 #     if CMK_HANDLE_SIGUSR
2726     signal(SIGUSR1, HandleUserSignals);
2727     signal(SIGUSR2, HandleUserSignals);
2728 #     endif
2729 #   endif /*UNIX*/
2730   }
2731 }
2732
2733 /*Socket idle function to use before addresses have been
2734   obtained.  During the real program, we idle with CmiYield.
2735 */
2736 static void obtain_idleFn(void) {sleep(0);}
2737
2738 static int net_default_skt_abort(int code,const char *msg)
2739 {
2740   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2741   machine_exit(1);
2742   return -1;
2743 }
2744
2745 #if MACHINE_DEBUG_LOG
2746 FILE *debugLog = NULL;
2747 #endif
2748
2749 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
2750 {
2751 #if MACHINE_DEBUG
2752   debugLog=NULL;
2753 #endif
2754 #if CMK_USE_HP_MAIN_FIX
2755 #if FOR_CPLUS
2756   _main(argc,argv);
2757 #endif
2758 #endif
2759   Cmi_startfn = fn; Cmi_usrsched = usc;
2760   Cmi_netpoll = 0;
2761 #if CMK_NETPOLL
2762   Cmi_netpoll = 1;
2763 #endif
2764 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2765   Cmi_idlepoll = 0;
2766 #else
2767   Cmi_idlepoll = 1;
2768 #endif
2769   Cmi_truecrash = 0;
2770   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
2771       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2772     /* netpoll disable signal */
2773   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2774   if (CmiGetArgFlagDesc(argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2775     /* idlepoll use poll instead if sleep when idle */
2776   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2777     /* idlesleep use sleep instead if busywait when idle */
2778   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2779   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2780
2781   Cmi_asyncio = 1;
2782 #if CMK_ASYNC_NOT_NEEDED
2783   Cmi_asyncio = 0;
2784 #endif
2785   if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2786   if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2787 #if CMK_MULTICORE
2788   if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
2789     Cmi_commthread = 1;
2790 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2791     _Cmi_sleepOnIdle = 1;   /* worker thread go sleep */
2792 #endif
2793     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2794   }
2795 #endif
2796
2797   skt_init();
2798   /* use special abort handler instead of default_skt_abort to 
2799      prevent exit trapped by atexit_check() due to the exit() call  */
2800   skt_set_abort(net_default_skt_abort);
2801   atexit(machine_atexit_check);
2802   parse_netstart();
2803   parse_magic();
2804 #if ! defined(_WIN32)\r
2805   /* only get forks in non-smp mode */
2806   parse_forks();
2807 #endif
2808   extract_args(argv);
2809   log_init();
2810   Cmi_scanf_mutex = CmiCreateLock();
2811
2812 #if MACHINE_DEBUG_LOG
2813   {
2814     char ln[200];
2815     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2816     debugLog=fopen(ln,"w");
2817   }
2818 #endif
2819
2820     /* NOTE: can not acutally call timer before timerInit ! GZ */
2821   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2822
2823   skt_set_idle(obtain_idleFn);
2824   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2825         set_signals();
2826 #if CMK_USE_TCP
2827         dataskt=skt_server(&dataport);
2828 #elif !CMK_USE_GM && !CMK_USE_MX
2829         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2830 #else
2831           /* GM and MX do not need to create any socket for communication */
2832         dataskt=-1;
2833 #endif
2834         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2835         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2836         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2837         skt_tcp_no_nagle(Cmi_charmrun_fd);
2838         CmiStdoutInit();
2839   } else {/*Standalone operation*/
2840         printf("Charm++: standalone mode (not using charmrun)\n");
2841         dataskt=-1;
2842         Cmi_charmrun_fd=-1;
2843   }
2844
2845   CmiMachineInit(argv);
2846
2847   node_addresses_obtain(argv);
2848   MACHSTATE(5,"node_addresses_obtain done");
2849
2850   CmiCommunicationInit(argv);
2851
2852 #if CMK_USE_SYSVSHM
2853   CmiInitSysvshm(argv);
2854 #elif CMK_USE_PXSHM
2855   CmiInitPxshm(argv);
2856 #endif
2857
2858   skt_set_idle(CmiYield);
2859   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2860
2861   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2862       Cmi_check_delay=1.0e30;
2863
2864   CsvInitialize(CmiNodeState, NodeState);
2865   CmiNodeStateInit(&CsvAccess(NodeState));
2866  
2867 #if CMK_SMP && CMK_LEVERAGE_COMMTHREAD
2868   CsvInitialize(PCQueue, notifyCommThdMsgBuffer);
2869 #endif
2870
2871   /* Network progress function is used to poll the network when for
2872      messages. This flushes receive buffers on some  implementations*/ 
2873   networkProgressPeriod = 0;  
2874   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2875
2876   Cmi_argvcopy = CmiCopyArgs(argv);
2877   Cmi_argv = argv; 
2878
2879   CmiStartThreads(argv);
2880
2881 #if CMK_USE_AMMASSO
2882   CmiAmmassoOpenQueuePairs();
2883 #endif
2884
2885   ConverseRunPE(everReturn);
2886 }
2887
2888 #if CMK_PERSISTENT_COMM
2889
2890 int persistentSendMsgHandlerIdx;
2891
2892 static void sendPerMsgHandler(char *msg)
2893 {
2894   int msgSize;
2895   void *destAddr, *destSizeAddr;
2896   int ep;
2897
2898   msgSize = CmiMsgHeaderGetLength(msg);
2899   msgSize -= (2*sizeof(void *)+sizeof(int));
2900   ep = *(int*)(msg+msgSize);
2901   destAddr = *(void **)(msg + msgSize + sizeof(int));
2902   destSizeAddr = *(void **)(msg + msgSize + sizeof(int) + sizeof(void*));
2903 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2904   CmiSetHandler(msg, ep);
2905   *((int *)destSizeAddr) = msgSize;
2906   memcpy(destAddr, msg, msgSize);
2907 }
2908
2909 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2910 {
2911   CmiAssert(h!=NULL);
2912   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2913   CmiAssert(slot->used == 1);
2914   CmiAssert(slot->destPE == destPE);
2915   if (size > slot->sizeMax) {
2916     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2917     CmiAbort("Abort: Invalid size\n");
2918   }
2919
2920 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2921
2922   if (slot->destAddress[0]) {
2923     int oldep = CmiGetHandler(m);
2924     int newsize = size + sizeof(void *)*2 + sizeof(int);
2925     char *newmsg = (char*)CmiAlloc(newsize);
2926     memcpy(newmsg, m, size);
2927     memcpy(newmsg+size, &oldep, sizeof(int));
2928     memcpy(newmsg+size+sizeof(int), &slot->destAddress[0], sizeof(void *));
2929     memcpy(newmsg+size+sizeof(int)+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2930     CmiFree(m);
2931     CmiMsgHeaderSetLength(newmsg, newsize);
2932     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2933     phs = NULL; phsSize = 0;
2934     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2935   }
2936   else {
2937 #if 1
2938     /* buffer until ready */
2939     if (slot->messageBuf != NULL) {
2940       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2941       CmiAbort("");
2942     }
2943     slot->messageBuf = m;
2944     slot->messageSize = size;
2945 #else
2946     /* normal send */
2947     PersistentHandle  *phs_tmp = phs;
2948     int phsSize_tmp = phsSize;
2949     phs = NULL; phsSize = 0;
2950     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2951     CmiSyncSendAndFree(slot->destPE, size, m);
2952     phs = phs_tmp; phsSize = phsSize_tmp;
2953 #endif
2954   }
2955 }
2956
2957 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2958 {
2959   char *dupmsg = (char *) CmiAlloc(size);
2960   memcpy(dupmsg, msg, size);
2961
2962   /*  CmiPrintf("Setting root to %d\n", 0); */
2963   if (CmiMyPe()==destPE) {
2964     CQdCreate(CpvAccess(cQdState), 1);
2965     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2966   }
2967   else
2968     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2969 }
2970
2971 /* called in PumpMsgs */
2972 int PumpPersistent()
2973 {
2974   PersistentReceivesTable *slot = persistentReceivesTableHead;
2975   int status = 0;
2976   while (slot) {
2977     unsigned int size = *(slot->recvSizePtr[0]);
2978     if (size > 0)
2979     {
2980       char *msg = slot->messagePtr[0];
2981 /*CmiPrintf("[%d] size: %d rank:%d msg:%p %p\n", CmiMyPe(), size, CMI_DEST_RANK(msg), msg, slot->messagePtr);*/
2982
2983 #if 0
2984       void *dupmsg;
2985       dupmsg = CmiAlloc(size);
2986       
2987       _MEMCHECK(dupmsg);
2988       memcpy(dupmsg, msg, size);
2989       msg = dupmsg;
2990 #else
2991       /* return messagePtr directly and user MUST make sure not to delete it. */
2992       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2993
2994       CmiReference(msg);
2995 #endif
2996
2997       CmiPushPE(CMI_DEST_RANK(msg), msg);
2998 #if CMK_BROADCAST_SPANNING_TREE
2999       if (CMI_BROADCAST_ROOT(msg))
3000           SendSpanningChildren(size, msg);
3001 #endif
3002       *(slot->recvSizePtr[0]) = 0;
3003       status = 1;
3004     }
3005     slot = slot->next;
3006   }
3007   return status;
3008 }
3009
3010 void *PerAlloc(int size)
3011 {
3012   return CmiAlloc(size);
3013 }
3014                                                                                 
3015 void PerFree(char *msg)
3016 {
3017     CmiFree(msg);
3018 }
3019
3020 void persist_machine_init() 
3021 {
3022   persistentSendMsgHandlerIdx =
3023        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
3024 }
3025
3026 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
3027 {
3028   int i;
3029   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
3030     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
3031     _MEMCHECK(buf);
3032     memset(buf, 0, maxBytes+sizeof(int)*2);
3033     slot->messagePtr[i] = buf;
3034     slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
3035     *(slot->recvSizePtr[0]) = 0;
3036   }
3037   slot->sizeMax = maxBytes;
3038 }
3039
3040 #endif
3041
3042
3043 #if CMK_CELL
3044
3045 #include "spert_ppu.h"
3046
3047 void machine_OffloadAPIProgress() {
3048   CmiCommLock();
3049   OffloadAPIProgress();
3050   CmiCommUnlock();
3051 }
3052 #endif
3053
3054
3055
3056 /*@}*/