guard cmiArgDebugFlag with CCS macro
[charm.git] / src / arch / net / machine.c
1
2 /** @file
3  * Basic NET implementation of Converse machine layer
4  * @ingroup NET
5  */
6
7 /** @defgroup NET
8  * NET implementation of machine layer, ethernet in particular
9  * @ingroup Machine
10  *
11  * THE DATAGRAM STREAM
12  *
13  * Messages are sent using UDP datagrams.  The sender allocates a
14  * struct for each datagram to be sent.  These structs stick around
15  * until slightly after the datagram is acknowledged.
16  *
17  * Datagrams are transmitted node-to-node (as opposed to pe-to-pe).
18  * Each node has an OtherNode struct for every other node in the
19  * system.  The OtherNode struct contains:
20  *
21  *   send_queue   (all datagram-structs not yet transmitted)
22  *   send_window  (all datagram-structs transmitted but not ack'd)
23  *
24  * When an acknowledgement comes in, all packets in the send-window
25  * are either marked as acknowledged or pushed back into the send
26  * queue for retransmission.
27  *
28  * THE OUTGOING MESSAGE
29  *
30  * When you send or broadcast a message, the first thing the system
31  * does is system creates an OutgoingMsg struct to represent the
32  * operation.  The OutgoingMsg contains a very direct expression
33  * of what you want to do:
34  *
35  * OutgoingMsg:
36  *
37  *   size      --- size of message in bytes
38  *   data      --- pointer to the buffer containing the message
39  *   src       --- processor which sent the message
40  *   dst       --- destination processor (-1=broadcast, -2=broadcast all)
41  *   freemode  --- see below.
42  *   refcount  --- see below.
43  *
44  * The OutgoingMsg is kept around until the transmission is done, then
45  * it is garbage collected --- the refcount and freemode fields are
46  * to assist garbage collection.
47  *
48  * The freemode indicates which kind of buffer-management policy was
49  * used (sync, async, or freeing).  The sync policy is handled
50  * superficially by immediately converting sync sends into freeing
51  * sends.  Thus, the freemode can either be 'A' (async) or 'F'
52  * (freeing).  If the freemode is 'F', then garbage collection
53  * involves freeing the data and the OutgoingMsg structure itself.  If
54  * the freemode is 'A', then the only cleanup is to change the
55  * freemode to 'X', a condition which is then detectable by
56  * CmiAsyncMsgSent.  In this case, the actual freeing of the
57  * OutgoingMsg is done by CmiReleaseCommHandle.
58  *
59  * When the transmission is initiated, the system computes how many
60  * datagrams need to be sent, total.  This number is stored in the
61  * refcount field.  Each time a datagram is delivered, the refcount
62  * is decremented, when it reaches zero, cleanup is performed.  There
63  * are two exceptions to this rule.  Exception 1: if the OutgoingMsg
64  * is a send (not a broadcast) and can be performed with shared
65  * memory, the entire datagram system is bypassed, the message is
66  * simply delivered and freed, not using the refcount mechanism at
67  * all.  Exception 2: If the message is a broadcast, then part of the
68  * broadcast that can be done via shared memory is performed prior to
69  * initiating the datagram/refcount system.
70  *
71  * DATAGRAM FORMATS AND MESSAGE FORMATS
72  *
73  * Datagrams have this format:
74  *
75  *   srcpe   (16 bits) --- source processor number.
76  *   magic   ( 8 bits) --- magic number to make sure DG is good.
77  *   dstrank ( 8 bits) --- destination processor rank.
78  *   seqno   (32 bits) --- packet sequence number.
79  *   data    (XX byte) --- user data.
80  *
81  * The only reason the srcpe is in there is because the receiver needs
82  * to know which receive window to use.  The dstrank field is needed
83  * because transmission is node-to-node.  Once the message is
84  * assembled by the node, it must be delivered to the appropriate PE.
85  * The dstrank field is used to encode certain special-case scenarios.
86  * If the dstrank is DGRAM_BROADCAST, the transmission is a broadcast,
87  * and should be delivered to all processors in the node.  If the dstrank
88  * is DGRAM_ACKNOWLEDGE, the datagram is an acknowledgement datagram, in
89  * which case the srcpe is the number of the acknowledger, the seqno is
90  * always zero, and the user data is a list of the seqno's being
91  * acknowledged.  There may be other dstrank codes for special functions.
92  *
93  * To send a message, one chops it up into datagrams and stores those
94  * datagrams in a send-queue.  These outgoing datagrams aren't stored
95  * in the explicit format shown above.  Instead, they are stored as
96  * ImplicitDgrams, which contain the datagram header and a pointer to
97  * the user data (which is in the user message buffer, which is in the
98  * OutgoingMsg).  At transmission time these are combined together.
99
100  * The combination of the datagram header with the user's data is
101  * performed right in the user's message buffer.  Note that the
102  * datagram header is exactly 64 bits.  One simply overwrites 64 bits
103  * of the user's message with a datagram header, sends the datagram
104  * straight from the user's message buffer, then restores the user's
105  * buffer to its original state.  There is a small problem with the
106  * first datagram of the message: one needs 64 bits of space to store
107  * the datagram header.  To make sure this space is there, we added a
108  * 64-bit unused space to the front of the Cmi message header.  In
109  * addition to this, we also add 32 bits to the Cmi message header
110  * to make room for a length-field, making it possible to identify
111  * message boundaries.
112  *
113  * CONCURRENCY CONTROL
114  *
115  * This has changed recently.
116  *
117  * EFFICIENCY NOTES
118  *
119  * The sender-side does little copying.  The async and freeing send
120  * routines do no copying at all.  The sync send routines copy the
121  * message, then use the freeing-send routines.  The other alternative
122  * is to not copy the message, and use the async send mechanism
123  * combined with a blocking wait.  Blocking wait seems like a bad
124  * idea, since it could take a VERY long time to get all those
125  * datagrams out the door.
126  *
127  * The receiver side, unfortunately, must copy.  To avoid copying,
128  * it would have to receive directly into a preallocated message buffer.
129  * Unfortunately, this can't work: there's no way to know how much
130  * memory to preallocate, and there's no way to know which datagram
131  * is coming next.  Thus, we receive into fixed-size (large) datagram
132  * buffers.  These are then inspected, and the messages extracted from
133  * them.
134  *
135  * Note that we are allocating a large number of structs: OutgoingMsg's,
136  * ImplicitDgrams, ExplicitDgrams.  By design, each of these structs
137  * is a fixed-size structure.  Thus, we can do memory allocation by
138  * simply keeping a linked-list of unused structs around.  The only
139  * place where expensive memory allocation is performed is in the
140  * sync routines.
141  *
142  * Since the datagrams from one node to another are fully ordered,
143  * there is slightly more ordering than is needed: in theory, the
144  * datagrams of one message don't need to be ordered relative to the
145  * datagrams of another.  This was done to simplify the sequencing
146  * mechanisms: implementing a fully-ordered stream is much simpler
147  * than a partially-ordered one.  It also makes it possible to
148  * modularize, layering the message transmitter on top of the
149  * datagram-sequencer.  In other words, it was just easier this way.
150  * Hopefully, this won't cause serious degradation: LAN's rarely get
151  * datagrams out of order anyway.
152  *
153  * A potential efficiency problem is the lack of message-combining.
154  * One datagram could conceivably contain several messages.  This
155  * might be more efficient, it's not clear how much overhead is
156  * involved in sending a short datagram.  Message-combining isn't
157  * really ``integrated'' into the design of this software, but you
158  * could fudge it as follows.  Whenever you pull a short datagram from
159  * the send-queue, check the next one to see if it's also a short
160  * datagram.  If so, pack them together into a ``combined'' datagram.
161  * At the receive side, simply check for ``combined'' datagrams, and
162  * treat them as if they were simply two datagrams.  This would
163  * require extra copying.  I have no idea if this would be worthwhile.
164  *
165  *****************************************************************************/
166
167 /**
168  * @addtogroup NET
169  * @{
170  */
171
172 /*****************************************************************************
173  *
174  * Include Files
175  *
176  ****************************************************************************/
177
178 #define _GNU_SOURCE 1
179 #include <stdarg.h> /*<- was <varargs.h>*/
180
181 #define CMK_USE_PRINTF_HACK 0
182 #if CMK_USE_PRINTF_HACK
183 /*HACK: turn printf into CmiPrintf, by just defining our own
184 external symbol "printf".  This may be more trouble than it's worth,
185 since the only advantage is that it works properly with +syncprint.
186
187 This version *won't* work with fprintf(stdout,...) or C++ or Fortran I/O,
188 because they don't call printf.  Has to be defined up here because we probably 
189 haven't properly guessed this compiler's prototype for "printf".
190 */
191 static void InternalPrintf(const char *f, va_list l);
192 int printf(const char *fmt, ...) {
193         int nChar;
194         va_list p; va_start(p, fmt);
195         InternalPrintf(fmt,p);
196         va_end(p);
197         return 10;
198 }
199 #endif
200
201
202 #include "converse.h"
203 #include "memory-isomalloc.h"
204
205 #include <stdio.h>
206 #include <stdlib.h>
207 #include <ctype.h>
208 #include <fcntl.h>
209 #include <errno.h>
210 #include <setjmp.h>
211 #include <signal.h>
212 #include <string.h>
213
214 /* define machine debug */
215 #include "machine.h"
216
217 /******************* Producer-Consumer Queues ************************/
218 #include "pcqueue.h"
219
220 #include "machine-smp.h"
221
222 #if CMK_USE_KQUEUE
223 #include <sys/event.h>
224 int _kq = -1;
225 #endif
226
227 #if CMK_USE_POLL
228 #include <poll.h>
229 #endif
230
231 #if CMK_USE_GM
232 #include "gm.h"
233 struct gm_port *gmport = NULL;
234 int  portFinish = 0;
235 #endif
236
237 #if CMK_USE_MX
238 #include "myriexpress.h"
239 mx_endpoint_t      endpoint;
240 mx_endpoint_addr_t endpoint_addr;
241 int MX_FILTER   =  123456;
242 static uint64_t Cmi_nic_id=0; /* Machine-specific identifier (MX-only) */
243 #endif
244
245 #if CMK_USE_AMMASSO
246   #include "clustercore/ccil_api.h"
247 #endif
248
249 #if CMK_MULTICORE
250 int Cmi_commthread = 0;
251 #endif
252
253 #include "conv-ccs.h"
254 #include "ccs-server.h"
255 #include "sockRoutines.h"
256
257 #if defined(_WIN32) && ! defined(__CYGWIN__)
258 /*For windows systems:*/
259 #  include <windows.h>
260 #  include <wincon.h>
261 #  include <sys/types.h>
262 #  include <sys/timeb.h>
263 #  define fdopen _fdopen
264 #  define SIGBUS -1  /*These signals don't exist in Win32*/
265 #  define SIGKILL -1
266 #  define SIGQUIT -1
267 /*#  define SIGTERM -1*/       /* VC++ ver 8 now has SIGTERM */
268
269 #else /*UNIX*/
270 #  include <pwd.h>
271 #  include <unistd.h>
272 #  include <fcntl.h>
273 #  include <sys/file.h>
274 #endif
275
276 #if CMK_PERSISTENT_COMM
277 #include "persist_impl.h"
278 #endif
279
280 #define PRINTBUFSIZE 16384
281
282 #ifdef __ONESIDED_IMPL
283 #ifdef __ONESIDED_NO_HARDWARE
284 int putSrcHandler;
285 int putDestHandler;
286 int getSrcHandler;
287 int getDestHandler;
288 #include "conv-onesided.c"
289 #endif
290 #endif
291
292 static void CommunicationServer(int withDelayMs, int where);
293
294 void CmiHandleImmediate();
295 extern int CmemInsideMem();
296 extern void CmemCallWhenMemAvail();
297 static void ConverseRunPE(int everReturn);
298 void CmiYield(void);
299 void ConverseCommonExit(void);
300
301 static unsigned int dataport=0;
302 static int Cmi_mach_id=0; /* Machine-specific identifier (GM-only) */
303 static SOCKET       dataskt;
304
305 extern void TokenUpdatePeriodic();
306 extern void getAvailSysMem();
307
308 #define BROADCAST_SPANNING_FACTOR               4
309
310 /****************************************************************************
311  *
312  * Handling Errors
313  *
314  * Errors should be handled by printing a message on stderr and
315  * calling exit(1).  Nothing should be sent to charmrun, no attempt at
316  * communication should be made.  The other processes will notice the
317  * abnormal termination and will deal with it.
318  *
319  * Rationale: if an error triggers an attempt to send a message,
320  * the attempt to send a message is likely to trigger another error,
321  * leading to an infinite loop and a process that spins instead of
322  * shutting down.
323  *
324  *****************************************************************************/
325
326 static int machine_initiated_shutdown=0;
327 static int already_in_signal_handler=0;
328
329 static void CmiDestoryLocks();
330
331 void CmiMachineExit();
332
333 #if CMK_USE_SYSVSHM /* define teardown function before use */
334 void tearDownSharedBuffers();
335 #endif 
336
337 static void machine_exit(int status)
338 {
339   MACHSTATE(3,"     machine_exit");
340   machine_initiated_shutdown=1;
341
342   CmiDestoryLocks();            /* destory locks to prevent dead locking */
343   EmergencyExit();
344
345 #if CMK_USE_GM
346   if (gmport) { 
347     gm_close(gmport); gmport = 0;
348     gm_finalize();
349   }
350 #endif
351 #if CMK_USE_SYSVSHM
352   tearDownSharedBuffers();
353 #endif
354   CmiMachineExit();
355   exit(status);
356 }
357
358 static void charmrun_abort(const char*);
359
360 static void KillEveryone(const char *msg)
361 {
362   charmrun_abort(msg);
363   machine_exit(1);
364 }
365
366 static void KillEveryoneCode(n)
367 int n;
368 {
369   char _s[100];
370   sprintf(_s, "[%d] Fatal error #%d\n", CmiMyPe(), n);
371   charmrun_abort(_s);
372   machine_exit(1);
373 }
374
375 CpvExtern(int, freezeModeFlag);
376
377 static void KillOnAllSigs(int sigNo)
378 {
379   const char *sig="unknown signal";
380   const char *suggestion="";
381   if (machine_initiated_shutdown ||
382       already_in_signal_handler) 
383         machine_exit(1); /*Don't infinite loop if there's a signal during a signal handler-- just die.*/
384   already_in_signal_handler=1;
385
386 #if CMK_CCS_AVAILABLE
387   if (CpvAccess(cmiArgDebugFlag)) {
388     int reply = 0;
389     CpdNotify(CPD_SIGNAL,sigNo);
390 #if ! CMK_BLUEGENE_CHARM
391     CcsSendReplyNoError(4,&reply);/*Send an empty reply if not*/
392     CpvAccess(freezeModeFlag) = 1;
393     CpdFreezeModeScheduler();
394 #else
395     CpdFreeze();
396 #endif
397   }
398 #endif
399   
400   CmiDestoryLocks();            /* destory locks */
401
402   if (sigNo==SIGSEGV) {
403      sig="segmentation violation";
404      suggestion="Try running with '++debug', or linking with '-memory paranoid' (memory paranoid requires '+netpoll' at runtime).\n";
405   }
406   if (sigNo==SIGFPE) {
407      sig="floating point exception";
408      suggestion="Check for integer or floating-point division by zero.\n";
409   }
410   if (sigNo==SIGBUS) {
411      sig="bus error";
412      suggestion="Check for misaligned reads or writes to memory.\n";
413   }
414   if (sigNo==SIGILL) {
415      sig="illegal instruction";
416      suggestion="Check for calls to uninitialized function pointers.\n";
417   }
418   if (sigNo==SIGKILL) sig="caught signal KILL";
419   if (sigNo==SIGQUIT) sig="caught signal QUIT";
420   if (sigNo==SIGTERM) sig="caught signal TERM";
421   MACHSTATE1(5,"     Caught signal %s ",sig);
422 /*ifdef this part*/
423 #ifdef __FAULT__
424   if(sigNo == SIGKILL || sigNo == SIGQUIT || sigNo == SIGTERM){
425                 CmiPrintf("[%d] Caught but ignoring signal\n",CmiMyPe());
426   }else{
427 #else
428         {
429 #endif
430    CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
431         "Signal: %s\n",CmiMyPe(),sig);
432         if (0!=suggestion[0])
433                 CmiError("Suggestion: %s",suggestion);
434         CmiPrintStackTrace(1);
435         charmrun_abort(sig);
436         machine_exit(1);                
437         }       
438 }
439
440 static void machine_atexit_check(void)
441 {
442   if (!machine_initiated_shutdown)
443     CmiAbort("unexpected call to exit by user program. Must use CkExit, not exit!");
444   printf("Program finished.\n");
445 #if 0 /*Wait for the user to press any key (for Win32 debugging)*/
446   fgetc(stdin);
447 #endif
448 }
449
450 #if !defined(_WIN32) || defined(__CYGWIN__)
451 static void HandleUserSignals(int signum)
452 {
453   int condnum = ((signum==SIGUSR1) ? CcdSIGUSR1 : CcdSIGUSR2);
454   CcdRaiseCondition(condnum);
455 }
456 #endif
457
458 static void PerrorExit(const char *msg)
459 {
460   perror(msg);
461   machine_exit(1);
462 }
463
464 /*****************************************************************************
465  *
466  *     Utility routines for network machine interface.
467  *
468  *****************************************************************************/
469
470 /*
471 Horrific #defines to hide the differences between select() and poll().
472  */
473 #if CMK_USE_POLL /*poll() version*/
474 # define CMK_PIPE_DECL(delayMs) \
475         struct pollfd fds[10]; \
476         int nFds_sto=0; int *nFds=&nFds_sto; \
477         int pollDelayMs=delayMs;
478 # define CMK_PIPE_SUB fds,nFds
479 # define CMK_PIPE_CALL() poll(fds, *nFds, pollDelayMs); *nFds=0
480
481 # define CMK_PIPE_PARAM struct pollfd *fds,int *nFds
482 # define CMK_PIPE_ADDREAD(rd_fd) \
483         do {fds[*nFds].fd=rd_fd; fds[*nFds].events=POLLIN; (*nFds)++;} while(0)
484 # define CMK_PIPE_ADDWRITE(wr_fd) \
485         do {fds[*nFds].fd=wr_fd; fds[*nFds].events=POLLOUT; (*nFds)++;} while(0)
486 # define CMK_PIPE_CHECKREAD(rd_fd) fds[(*nFds)++].revents&POLLIN
487 # define CMK_PIPE_CHECKWRITE(wr_fd) fds[(*nFds)++].revents&POLLOUT
488
489 #elif CMK_USE_KQUEUE /* kqueue version */
490
491 # define CMK_PIPE_DECL(delayMs) \
492         if (_kq == -1) _kq = kqueue(); \
493     struct kevent ke_sto; \
494     struct kevent* ke = &ke_sto; \
495     struct timespec tmo; \
496     tmo.tv_sec = 0; tmo.tv_nsec = delayMs*1e6;
497 # define CMK_PIPE_SUB ke
498 # define CMK_PIPE_CALL() kevent(_kq, NULL, 0, ke, 1, &tmo)
499
500 # define CMK_PIPE_PARAM struct kevent* ke
501 # define CMK_PIPE_ADDREAD(rd_fd) \
502         do { EV_SET(ke, rd_fd, EVFILT_READ, EV_ADD, 0, 10, NULL); \
503                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
504 # define CMK_PIPE_ADDWRITE(wr_fd) \
505         do { EV_SET(ke, wr_fd, EVFILT_WRITE, EV_ADD, 0, 10, NULL); \
506                 kevent(_kq, ke, 1, NULL, 0, NULL); memset(ke, 0, sizeof(ke));} while(0)
507 # define CMK_PIPE_CHECKREAD(rd_fd) (ke->ident == rd_fd && ke->filter == EVFILT_READ)
508 # define CMK_PIPE_CHECKWRITE(wr_fd) (ke->ident == wr_fd && ke->filter == EVFILT_WRITE)
509
510 #else /*select() version*/
511
512 # define CMK_PIPE_DECL(delayMs) \
513         fd_set rfds_sto,wfds_sto;\
514         fd_set *rfds=&rfds_sto,*wfds=&wfds_sto; struct timeval tmo; \
515         FD_ZERO(rfds); FD_ZERO(wfds);tmo.tv_sec=0; tmo.tv_usec=1000*delayMs;
516 # define CMK_PIPE_SUB rfds,wfds
517 # define CMK_PIPE_CALL() select(FD_SETSIZE, rfds, wfds, NULL, &tmo)
518
519 # define CMK_PIPE_PARAM fd_set *rfds,fd_set *wfds
520 # define CMK_PIPE_ADDREAD(rd_fd) FD_SET(rd_fd,rfds)
521 # define CMK_PIPE_ADDWRITE(wr_fd) FD_SET(wr_fd,wfds)
522 # define CMK_PIPE_CHECKREAD(rd_fd) FD_ISSET(rd_fd,rfds)
523 # define CMK_PIPE_CHECKWRITE(wr_fd) FD_ISSET(wr_fd,wfds)
524 #endif
525
526 static void CMK_PIPE_CHECKERR(void) {
527 #if defined(_WIN32) && !defined(__CYGWIN__)
528 /* Win32 socket seems to randomly return inexplicable errors
529 here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK. 
530         int err=WSAGetLastError();
531         CmiPrintf("(%d)Select returns -1; errno=%d, WSAerr=%d\n",withDelayMs,errn
532 o,err);
533 */
534 #else /*UNIX machine*/
535         if (errno!=EINTR)
536                 KillEveryone("Socket error in CheckSocketsReady!\n");
537 #endif
538 }
539
540
541 static void CmiStdoutFlush(void);
542 static int  CmiStdoutNeedsService(void);
543 static void CmiStdoutService(void);
544 static void CmiStdoutAdd(CMK_PIPE_PARAM);
545 static void CmiStdoutCheck(CMK_PIPE_PARAM);
546
547
548 double GetClock(void)
549 {
550 #if defined(_WIN32) && !defined(__CYGWIN__)
551   struct _timeb tv; 
552   _ftime(&tv);
553   return (tv.time * 1.0 + tv.millitm * 1.0E-3);
554 #else
555   struct timeval tv; int ok;
556   ok = gettimeofday(&tv, NULL);
557   if (ok<0) { perror("gettimeofday"); KillEveryoneCode(9343112); }
558   return (tv.tv_sec * 1.0 + tv.tv_usec * 1.0E-6);
559 #endif
560 }
561
562 char *CopyMsg(char *msg, int len)
563 {
564   char *copy = (char *)CmiAlloc(len);
565   if (!copy)
566       fprintf(stderr, "Out of memory\n");
567   memcpy(copy, msg, len);
568   return copy;
569 }
570
571 /***********************************************************************
572  *
573  * Abort function:
574  *
575  ************************************************************************/
576
577 static int  Cmi_truecrash;
578 static int already_aborting=0;
579 void CmiAbort(const char *message)
580 {
581   if (already_aborting) machine_exit(1);
582   already_aborting=1;
583         {
584 /*       char str[100];
585          sprintf(str,"dead.%d",CmiMyNode());
586          FILE *fp = fopen(str,"w");
587          fprintf(fp,"%s",message);
588          fclose(fp);*/
589         }
590   MACHSTATE1(5,"CmiAbort(%s)",message);
591
592 #if CMK_CCS_AVAILABLE
593   /* if CharmDebug is attached simply try to send a message to it */
594   if (CpvAccess(cmiArgDebugFlag)) {
595     CpdNotify(CPD_ABORT, message);
596     CpdFreeze();
597   }
598 #endif
599   
600   /* CmiDestoryLocks();  */
601
602   {
603 /*    char str[22];
604     snprintf(str,18,"dead.%d",CmiMyPe());
605     FILE *fp = fopen(str,"w");
606     fprintf(fp,"Abort:%s\n",message);
607     fclose(fp);*/
608   }
609
610   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
611         "Reason: %s\n",CmiMyPe(),message);
612   CmiPrintStackTrace(0);
613   
614   /*Send off any remaining prints*/
615   CmiStdoutFlush();
616   
617   if(Cmi_truecrash) {
618     printf("CHARM++ FATAL ERROR: %s\n", message);
619     *(int *)NULL = 0; /*Write to null, causing bus error*/
620   } else {
621     charmrun_abort(message);
622     machine_exit(1);
623   }
624 }
625
626
627 /******************************************************************************
628  *
629  * CmiEnableAsyncIO
630  *
631  * The net and tcp versions use a bunch of unix processes talking to each
632  * other via file descriptors.  We need for a signal SIGIO to be generated
633  * each time a message arrives, making it possible to write a signal
634  * handler to handle the messages.  The vast majority of unixes can,
635  * in fact, do this.  However, there isn't any standard for how this is
636  * supposed to be done, so each version of UNIX has a different set of
637  * calls to turn this signal on.  So, there is like one version here for
638  * every major brand of UNIX.
639  *
640  *****************************************************************************/
641
642 #if CMK_ASYNC_USE_F_SETFL_AND_F_SETOWN
643 #include <fcntl.h>
644 void CmiEnableAsyncIO(int fd)
645 {
646   if ( fcntl(fd, F_SETOWN, getpid()) < 0 ) {
647     CmiError("setting socket owner: %s\n", strerror(errno)) ;
648     exit(1);
649   }
650   if ( fcntl(fd, F_SETFL, FASYNC) < 0 ) {
651     CmiError("setting socket async: %s\n", strerror(errno)) ;
652     exit(1);
653   }
654 }
655 #else
656 void CmiEnableAsyncIO(int fd) { }
657 #endif
658
659 /* We should probably have a set of "CMK_NONBLOCK_USE_..." defines here:*/
660 #if !defined(_WIN32) || defined(__CYGWIN__)
661 void CmiEnableNonblockingIO(int fd) {
662   int on=1;
663   if (fcntl(fd,F_SETFL,O_NONBLOCK,&on)<0) {
664     CmiError("setting nonblocking IO: %s\n", strerror(errno)) ;
665     exit(1);
666   }
667 }
668 #else
669 void CmiEnableNonblockingIO(int fd) { }
670 #endif
671
672
673 /******************************************************************************
674 *
675 * Configuration Data
676 *
677 * This data is all read in from the NETSTART variable (provided by the
678 * charmrun) and from the command-line arguments.  Once read in, it is never
679  * modified.
680  *
681  *****************************************************************************/
682
683 int               _Cmi_mynode;    /* Which address space am I */
684 int               _Cmi_mynodesize;/* Number of processors in my address space */
685 int               _Cmi_numnodes;  /* Total number of address spaces */
686 int               _Cmi_numpes;    /* Total number of processors */
687 static int        Cmi_nodestart; /* First processor in this address space */
688 static skt_ip_t   Cmi_self_IP;
689 static skt_ip_t   Cmi_charmrun_IP; /*Address of charmrun machine*/
690 static int        Cmi_charmrun_port;
691 static int        Cmi_charmrun_pid;
692 static int        Cmi_charmrun_fd=-1;
693 /* Magic number to be used for sanity check in messege header */
694 static int                              Cmi_net_magic;
695
696 static int    Cmi_netpoll;
697 static int    Cmi_asyncio;
698 static int    Cmi_idlepoll;
699 static int    Cmi_syncprint;
700 static int Cmi_print_stats = 0;
701
702 #if ! CMK_SMP && ! defined(_WIN32)
703 /* parse forks only used in non-smp mode */
704 static void parse_forks(void) {
705   char *forkstr;
706   int nread;
707   int forks;
708   int i,pid;
709   forkstr=getenv("CmiMyForks");
710   if(forkstr!=0) { /* charmrun */
711         nread = sscanf(forkstr,"%d",&forks);
712         for(i=1;i<=forks;i++) { /* by default forks = 0 */ 
713                 pid=fork();
714                 if(pid<0) CmiAbort("Fork returned an error");
715                 if(pid==0) { /* forked process */
716                         /* reset mynode,pe & exit loop */
717                         _Cmi_mynode+=i;
718                         _Cmi_mype+=i;
719                         break;
720                 }
721         }
722   }
723 }
724 #endif
725 static void parse_magic(void)
726 {
727         char* nm;       
728         int nread;
729   nm = getenv("NETMAGIC");
730   if (nm!=0) 
731   {/*Read values set by Charmrun*/
732         nread = sscanf(nm, "%d",&Cmi_net_magic);
733         }
734 }
735 static void parse_netstart(void)
736 {
737   char *ns;
738   int nread;
739   int port;
740   ns = getenv("NETSTART");
741   if (ns!=0) 
742   {/*Read values set by Charmrun*/
743         char Cmi_charmrun_name[1024];
744         nread = sscanf(ns, "%d%s%d%d%d",
745                  &_Cmi_mynode,
746                  Cmi_charmrun_name, &Cmi_charmrun_port,
747                  &Cmi_charmrun_pid, &port);
748         Cmi_charmrun_IP=skt_lookup_ip(Cmi_charmrun_name);
749
750         if (nread!=5) {
751                 fprintf(stderr,"Error parsing NETSTART '%s'\n",ns);
752                 exit(1);
753         }
754   } else 
755   {/*No charmrun-- set flag values for standalone operation*/
756         _Cmi_mynode=0;
757         Cmi_charmrun_IP=_skt_invalid_ip;
758         Cmi_charmrun_port=0;
759         Cmi_charmrun_pid=0;
760         dataport = -1;
761   }
762 #if CMK_USE_IBVERBS | CMK_USE_IBUD
763         char *cmi_num_nodes = getenv("CmiNumNodes");
764         if(cmi_num_nodes != NULL){
765                 sscanf(cmi_num_nodes,"%d",&_Cmi_numnodes);
766         }
767 #endif  
768 }
769
770 static void extract_common_args(char **argv)
771 {
772   if (CmiGetArgFlagDesc(argv,"+stats","Print network statistics at shutdown"))
773     Cmi_print_stats = 1;
774 }
775
776 /* for SMP */
777 #include "machine-smp.c"
778
779 CsvDeclare(CmiNodeState, NodeState);
780
781 /* Immediate message support */
782 #define CMI_DEST_RANK(msg)      *(int *)(msg)
783 #include "immediate.c"
784
785 /******************************************************************************
786  *
787  * Packet Performance Logging
788  *
789  * This module is designed to give a detailed log of the packets and their
790  * acknowledgements, for performance tuning.  It can be disabled.
791  *
792  *****************************************************************************/
793
794 #define LOGGING 0
795
796 #if LOGGING
797
798 typedef struct logent {
799   double time;
800   int seqno;
801   int srcpe;
802   int dstpe;
803   int kind;
804 } *logent;
805
806
807 logent log;
808 int    log_pos;
809 int    log_wrap;
810
811 static void log_init(void)
812 {
813   log = (logent)malloc(50000 * sizeof(struct logent));
814   _MEMCHECK(log);
815   log_pos = 0;
816   log_wrap = 0;
817 }
818
819 static void log_done(void)
820 {
821   char logname[100]; FILE *f; int i, size;
822   sprintf(logname, "log.%d", _Cmi_mynode);
823   f = fopen(logname, "w");
824   if (f==0) KillEveryone("fopen problem");
825   if (log_wrap) size = 50000; else size=log_pos;
826   for (i=0; i<size; i++) {
827     logent ent = log+i;
828     fprintf(f, "%1.4f %d %c %d %d\n",
829             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
830   }
831   fclose(f);
832 }
833
834 void printLog(void)
835 {
836   char logname[100]; FILE *f; int i, j, size;
837   static int logged = 0;
838   if (logged)
839       return;
840   logged = 1;
841   CmiPrintf("Logging: %d\n", _Cmi_mynode);
842   sprintf(logname, "log.%d", _Cmi_mynode);
843   f = fopen(logname, "w");
844   if (f==0) KillEveryone("fopen problem");
845   for (i = 5000; i; i--)
846   {
847   /*for (i=0; i<size; i++) */
848     j = log_pos - i;
849     if (j < 0)
850     {
851         if (log_wrap)
852             j = 5000 + j;
853         else
854             j = 0;
855     };
856     {
857     logent ent = log+j;
858     fprintf(f, "%1.4f %d %c %d %d\n",
859             ent->time, ent->srcpe, ent->kind, ent->dstpe, ent->seqno);
860     }
861   }
862   fclose(f);
863   CmiPrintf("Done Logging: %d\n", _Cmi_mynode);
864 }
865
866 #define LOG(t,s,k,d,q) { if (log_pos==50000) { log_pos=0; log_wrap=1;} { logent ent=log+log_pos; ent->time=t; ent->srcpe=s; ent->kind=k; ent->dstpe=d; ent->seqno=q; log_pos++; }}
867
868 #endif
869
870 #if !LOGGING
871
872 #define log_init() /*empty*/
873 #define log_done() /*empty*/
874 #define printLog() /*empty*/
875 #define LOG(t,s,k,d,q) /*empty*/
876
877 #endif
878
879 /******************************************************************************
880  *
881  * Node state
882  *
883  *****************************************************************************/
884
885
886 static CmiNodeLock    Cmi_scanf_mutex;
887 static double         Cmi_clock;
888 static double         Cmi_check_delay = 3.0;
889
890 /******************************************************************************
891  *
892  * OS Threads
893  * SMP implementation moved to machine-smp.c
894  *****************************************************************************/
895
896 /************************ No kernel SMP threads ***************/
897 #if CMK_SHARED_VARS_UNAVAILABLE
898
899 static volatile int memflag=0;
900 void CmiMemLock() { memflag++; }
901 void CmiMemUnlock() { memflag--; }
902
903 static volatile int comm_flag=0;
904 #define CmiCommLockOrElse(dothis) if (comm_flag!=0) dothis
905 #ifndef MACHLOCK_DEBUG
906 #  define CmiCommLock() (comm_flag=1)
907 #  define CmiCommUnlock() (comm_flag=0)
908 #else /* Error-checking flag locks */
909 void CmiCommLock(void) {
910   MACHLOCK_ASSERT(!comm_flag,"CmiCommLock");
911   comm_flag=1;
912 }
913 void CmiCommUnlock(void) {
914   MACHLOCK_ASSERT(comm_flag,"CmiCommUnlock");
915   comm_flag=0;
916 }
917 #endif
918
919 static struct CmiStateStruct Cmi_state;
920 int _Cmi_mype;
921 int _Cmi_myrank=0; /* Normally zero; only 1 during SIGIO handling */
922 #define CmiGetState() (&Cmi_state)
923 #define CmiGetStateN(n) (&Cmi_state)
924
925 void CmiYield(void) { sleep(0); }
926
927 static void CommunicationInterrupt(int ignored)
928 {
929   MACHLOCK_ASSERT(!_Cmi_myrank,"CommunicationInterrupt");
930   if (memflag || comm_flag || _immRunning || CmiCheckImmediateLock(0)) 
931   { /* Already busy inside malloc, comm, or immediate messages */
932     MACHSTATE(5,"--SKIPPING SIGIO--");
933     return;
934   }
935   MACHSTATE1(2,"--BEGIN SIGIO comm_flag: %d--", comm_flag)
936   {
937     /*Make sure any malloc's we do in here are NOT migratable:*/
938     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
939 /*    _Cmi_myrank=1; */
940     CommunicationServer(0, COMM_SERVER_FROM_INTERRUPT);  /* from interrupt */
941 /*    _Cmi_myrank=0; */
942     CmiIsomallocBlockListActivate(oldList);
943   }
944   MACHSTATE(2,"--END SIGIO--")
945 }
946
947 extern void CmiSignal(int sig1, int sig2, int sig3, void (*handler)());
948
949 static void CmiStartThreads(char **argv)
950 {
951   MACHSTATE2(3,"_Cmi_numpes %d _Cmi_numnodes %d",_Cmi_numpes,_Cmi_numnodes);
952   MACHSTATE1(3,"_Cmi_mynodesize %d",_Cmi_mynodesize);
953   if ((_Cmi_numpes != _Cmi_numnodes) || (_Cmi_mynodesize != 1))
954     KillEveryone
955       ("Multiple cpus unavailable, don't use cpus directive in nodesfile.\n");
956   
957   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
958   _Cmi_mype = Cmi_nodestart;
959
960   /* Prepare Cpv's for immediate messages: */
961   _Cmi_myrank=1;
962   CommunicationServerInit();
963   _Cmi_myrank=0;
964   
965 #if !CMK_ASYNC_NOT_NEEDED
966   if (Cmi_asyncio)
967   {
968     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
969     if (!Cmi_netpoll) {
970       if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
971       if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
972     }
973 #if CMK_USE_GM || CMK_USE_MX
974       /* charmrun is serviced in interrupt for gm */
975     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
976 #endif
977   }
978 #endif
979 }
980
981 static void CmiDestoryLocks()
982 {
983   comm_flag = 0;
984   memflag = 0;
985 }
986
987 #endif
988
989 /*Network progress utility variables. Period controls the rate at
990   which the network poll is called */
991
992 CpvDeclare(unsigned , networkProgressCount);
993 int networkProgressPeriod;
994
995 CpvDeclare(void *, CmiLocalQueue);
996
997
998 #ifndef CmiMyPe
999 int CmiMyPe(void) 
1000
1001   return CmiGetState()->pe; 
1002 }
1003 #endif
1004 #ifndef CmiMyRank
1005 int CmiMyRank(void)
1006 {
1007   return CmiGetState()->rank;
1008 }
1009 #endif
1010
1011 CpvExtern(int,_charmEpoch);
1012
1013 /*Add a message to this processor's receive queue 
1014   Must be called while holding comm. lock
1015 */
1016
1017 extern double evacTime;
1018
1019 void CmiPushPE(int pe,void *msg)
1020 {
1021   CmiState cs=CmiGetStateN(pe);
1022         /*
1023                 FAULT_EVAC
1024         
1025         if(CpvAccess(_charmEpoch)&&!CmiNodeAlive(CmiMyPe())){
1026                 printf("[%d] Message after stop at %.6lf in %.6lf \n",CmiMyPe(),CmiWallTimer(),CmiWallTimer()-evacTime);
1027         }*/
1028   MACHSTATE1(2,"Pushing message into %d's queue",pe);  
1029   MACHLOCK_ASSERT(comm_flag,"CmiPushPE")
1030
1031 #if CMK_IMMEDIATE_MSG
1032   if (CmiIsImmediate(msg)) {
1033     CmiPushImmediateMsg(msg);
1034     return;
1035   }
1036 #endif
1037 #if !CMK_SMP_MULTIQ
1038   PCQueuePush(cs->recv,msg);
1039 #else
1040   PCQueuePush(cs->recv[CmiGetState()->myGrpIdx], msg);
1041 #endif
1042
1043 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1044   if (_Cmi_noprocforcommthread) 
1045 #endif
1046   CmiIdleLock_addMessage(&cs->idle);
1047 }
1048
1049 #if CMK_NODE_QUEUE_AVAILABLE
1050 /*Add a message to the node queue.  
1051   Must be called while holding comm. lock
1052 */
1053 static void CmiPushNode(void *msg)
1054 {
1055   CmiState cs=CmiGetStateN(0);
1056   
1057   MACHSTATE(2,"Pushing message into node queue");
1058   MACHLOCK_ASSERT(comm_flag,"CmiPushNode")
1059   
1060 #if CMK_IMMEDIATE_MSG
1061   if (CmiIsImmediate(msg)) {
1062     MACHSTATE(2,"Pushing Immediate message into queue");
1063     CmiPushImmediateMsg(msg);
1064     return;
1065   }
1066 #endif 
1067 /* 
1068  * if CMK_SMP_MULTIQ is enabled, then PCQUEUE's push lock
1069  * may be disabled. In this case, the lock for node recv
1070  * queue has to be used.
1071  * */
1072 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1073   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1074 #endif
1075   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
1076 #if CMK_SMP_MULTIQ && !CMK_PCQUEUE_PUSH_LOCK
1077   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1078 #endif
1079
1080   /*Silly: always try to wake up processor 0, so at least *somebody*
1081     will be awake to handle the message*/
1082 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
1083   if (_Cmi_noprocforcommthread) 
1084 #endif
1085   CmiIdleLock_addMessage(&cs->idle);
1086 }
1087 #endif
1088
1089 /***************************************************************
1090  Communication with charmrun:
1091  We can send (ctrl_sendone) and receive (ctrl_getone)
1092  messages on a TCP socket connected to charmrun.
1093  This is used for printfs, CCS, etc; and also for
1094  killing ourselves if charmrun dies.
1095 */
1096
1097 /*This flag prevents simultanious outgoing
1098 messages on the charmrun socket.  It is protected
1099 by the commlock.*/
1100 static int Cmi_charmrun_fd_sendflag=0;
1101
1102 /* ctrl_sendone */
1103 static int sendone_abort_fn(int code,const char *msg) {
1104         fprintf(stderr,"Socket error %d in ctrl_sendone! %s\n",code,msg);
1105         machine_exit(1);
1106         return -1;
1107 }
1108
1109 static void ctrl_sendone_nolock(const char *type,
1110                                 const char *data1,int dataLen1,
1111                                 const char *data2,int dataLen2)
1112 {
1113   const void *bufs[3]; int lens[3]; int nBuffers=0;
1114   ChMessageHeader hdr;
1115   skt_abortFn oldAbort=skt_set_abort(sendone_abort_fn);
1116   MACHSTATE1(2,"ctrl_sendone_nolock { type=%s", type);
1117   if (Cmi_charmrun_fd==-1) 
1118         charmrun_abort("ctrl_sendone called in standalone!\n");
1119   Cmi_charmrun_fd_sendflag=1;
1120   ChMessageHeader_new(type,dataLen1+dataLen2,&hdr);
1121   bufs[nBuffers]=&hdr; lens[nBuffers]=sizeof(hdr); nBuffers++;
1122   if (dataLen1>0) {bufs[nBuffers]=data1; lens[nBuffers]=dataLen1; nBuffers++;}
1123   if (dataLen2>0) {bufs[nBuffers]=data2; lens[nBuffers]=dataLen2; nBuffers++;}
1124   skt_sendV(Cmi_charmrun_fd,nBuffers,bufs,lens);
1125   Cmi_charmrun_fd_sendflag=0;
1126   skt_set_abort(oldAbort);
1127   MACHSTATE(2,"} ctrl_sendone_nolock");
1128 }
1129
1130 static void ctrl_sendone_locking(const char *type,
1131                                 const char *data1,int dataLen1,
1132                                 const char *data2,int dataLen2)
1133 {
1134   CmiCommLock();
1135   ctrl_sendone_nolock(type,data1,dataLen1,data2,dataLen2);
1136   CmiCommUnlock();
1137 }
1138
1139 #ifndef MEMORYUSAGE_OUTPUT
1140 #define MEMORYUSAGE_OUTPUT 0 
1141 #endif
1142 #if MEMORYUSAGE_OUTPUT 
1143 #define MEMORYUSAGE_OUTPUT_FREQ 10 //how many prints in a second
1144 static int memoryusage_counter;
1145 #define memoryusage_isOutput ((memoryusage_counter%MEMORYUSAGE_OUTPUT_FREQ)==0)
1146 #define memoryusage_output {\
1147   memoryusage_counter++;\
1148   if(CmiMyPe()==0) printf("-- %d %f %ld --\n", CmiMyPe(), GetClock(), CmiMemoryUsage());}
1149 #endif
1150
1151 static double Cmi_check_last;
1152
1153 /* if charmrun dies, we finish */
1154 static void pingCharmrun(void *ignored) 
1155 {
1156 #if MEMORYUSAGE_OUTPUT
1157   memoryusage_output;
1158   if(memoryusage_isOutput){
1159     memoryusage_counter = 0;
1160 #else
1161   {
1162 #endif 
1163
1164   double clock=GetClock();
1165   if (clock > Cmi_check_last + Cmi_check_delay) {
1166     MACHSTATE1(3,"CommunicationsClock pinging charmrun Cmi_charmrun_fd_sendflag=%d", Cmi_charmrun_fd_sendflag);
1167     Cmi_check_last = clock; 
1168 #if CMK_USE_GM || CMK_USE_MX
1169     if (!Cmi_netpoll)  /* GM netpoll, charmrun service is done in interrupt */
1170 #endif
1171     CmiCommLockOrElse(return;); /*Already busy doing communication*/
1172     if (Cmi_charmrun_fd_sendflag) return; /*Busy talking to charmrun*/
1173     CmiCommLock();
1174     ctrl_sendone_nolock("ping",NULL,0,NULL,0); /*Charmrun may have died*/
1175     CmiCommUnlock();
1176   }
1177 #if 1
1178 #if CMK_USE_GM || CMK_USE_MX
1179   if (!Cmi_netpoll)
1180 #endif
1181   CmiStdoutFlush(); /*Make sure stdout buffer hasn't filled up*/
1182 #endif
1183   }
1184 }
1185
1186 /* periodic charm ping, for gm and netpoll */
1187 static void pingCharmrunPeriodic(void *ignored)
1188 {
1189   pingCharmrun(ignored);
1190   CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
1191 }
1192
1193 static int ignore_further_errors(int c,const char *msg) {machine_exit(2);return -1;}
1194 static void charmrun_abort(const char *s)
1195 {
1196   if (Cmi_charmrun_fd==-1) {/*Standalone*/
1197         fprintf(stderr,"Charm++ fatal error:\n%s\n",s);
1198 CmiPrintStackTrace(0);
1199         abort();
1200   } else {
1201         char msgBuf[80];
1202         skt_set_abort(ignore_further_errors);
1203         sprintf(msgBuf,"Fatal error on PE %d> ",CmiMyPe());
1204         ctrl_sendone_nolock("abort",msgBuf,strlen(msgBuf),s,strlen(s)+1);
1205   }
1206 }
1207
1208 /* ctrl_getone */
1209
1210 #ifdef __FAULT__
1211 #include "machine-recover.c"
1212 #endif
1213
1214 static void node_addresses_store(ChMessage *msg);
1215
1216 static int barrierReceived = 0;
1217
1218 static void ctrl_getone(void)
1219 {
1220   ChMessage msg;
1221   MACHSTATE(2,"ctrl_getone")
1222   MACHLOCK_ASSERT(comm_flag,"ctrl_getone")
1223   ChMessage_recv(Cmi_charmrun_fd,&msg);
1224   MACHSTATE1(2,"ctrl_getone recv one '%s'", msg.header.type);
1225
1226   if (strcmp(msg.header.type,"die")==0) {
1227     MACHSTATE(2,"ctrl_getone bye bye")
1228     fprintf(stderr,"aborting: %s\n",msg.data);
1229     log_done();
1230     ConverseCommonExit();
1231     machine_exit(0);
1232   } 
1233 #if CMK_CCS_AVAILABLE
1234   else if (strcmp(msg.header.type, "req_fw")==0) {
1235     CcsImplHeader *hdr=(CcsImplHeader *)msg.data;
1236         /*Sadly, I *can't* do a:
1237       CcsImpl_netRequest(hdr,msg.data+sizeof(CcsImplHeader));
1238         here, because I can't send converse messages in the
1239         communication thread.  I *can* poke this message into 
1240         any convenient processor's queue, though:  (OSL, 9/14/2000)
1241         */
1242     int pe=0;/*<- node-local processor number. Any one will do.*/
1243     void *cmsg=(void *)CcsImpl_ccs2converse(hdr,msg.data+sizeof(CcsImplHeader),NULL);
1244     MACHSTATE(2,"Incoming CCS request");
1245     if (cmsg!=NULL) CmiPushPE(pe,cmsg);
1246   }
1247 #endif
1248 #ifdef __FAULT__        
1249   else if(strcmp(msg.header.type,"crashnode")==0) {
1250         crash_node_handle(&msg);
1251   }
1252   else if(strcmp(msg.header.type,"initnodetab")==0) {
1253         /** A processor crashed and got recreated. So charmrun sent 
1254           across the whole nodetable data to update this processor*/
1255         node_addresses_store(&msg);
1256         // fprintf(stdout,"nodetable added %d\n",CmiMyPe());
1257   }
1258 #endif
1259   else if(strcmp(msg.header.type,"barrier")==0) {
1260         barrierReceived = 1;
1261   }
1262   else if(strcmp(msg.header.type,"barrier0")==0) {
1263         barrierReceived = 2;
1264   }
1265   else {
1266   /* We do not use KillEveryOne here because it calls CmiMyPe(),
1267    * which is not available to the communication thread on an SMP version.
1268    */
1269     /* CmiPrintf("Unknown message: %s\n", msg.header.type); */
1270     charmrun_abort("ERROR> Unrecognized message from charmrun.\n");
1271     machine_exit(1);
1272   }
1273   
1274   MACHSTATE(2,"ctrl_getone done")
1275   ChMessage_free(&msg);
1276 }
1277
1278 #if CMK_CCS_AVAILABLE && !NODE_0_IS_CONVHOST
1279 /*Deliver this reply data to this reply socket.
1280   The data is forwarded to CCS server via charmrun.*/
1281 void CcsImpl_reply(CcsImplHeader *hdr,int repLen,const void *repData)
1282 {
1283   MACHSTATE(2,"Outgoing CCS reply");
1284   ctrl_sendone_locking("reply_fw",(const char *)hdr,sizeof(CcsImplHeader),
1285       repData,repLen);
1286   MACHSTATE(1,"Outgoing CCS reply away");
1287 }
1288 #endif
1289
1290 /*****************************************************************************
1291  *
1292  * CmiPrintf, CmiError, CmiScanf
1293  *
1294  *****************************************************************************/
1295 static void InternalWriteToTerminal(int isStdErr,const char *str,int len);
1296 static void InternalPrintf(const char *f, va_list l)
1297 {
1298   ChMessage replymsg;
1299   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1300   CmiStdoutFlush();
1301   vsprintf(buffer, f, l);
1302   if(Cmi_syncprint) {
1303           CmiCommLock();
1304           ctrl_sendone_nolock("printsyn", buffer,strlen(buffer)+1,NULL,0);
1305           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1306           ChMessage_free(&replymsg);
1307           CmiCommUnlock();
1308   } else {
1309           ctrl_sendone_locking("print", buffer,strlen(buffer)+1,NULL,0);
1310   }
1311   InternalWriteToTerminal(0,buffer,strlen(buffer));
1312   CmiTmpFree(buffer);
1313 }
1314
1315 static void InternalError(const char *f, va_list l)
1316 {
1317   ChMessage replymsg;
1318   char *buffer = CmiTmpAlloc(PRINTBUFSIZE);
1319   CmiStdoutFlush();
1320   vsprintf(buffer, f, l);
1321   if(Cmi_syncprint) {
1322           ctrl_sendone_locking("printerrsyn", buffer,strlen(buffer)+1,NULL,0);
1323           CmiCommLock();
1324           ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1325           ChMessage_free(&replymsg);
1326           CmiCommUnlock();
1327   } else {
1328           ctrl_sendone_locking("printerr", buffer,strlen(buffer)+1,NULL,0);
1329   }
1330   InternalWriteToTerminal(1,buffer,strlen(buffer));
1331   CmiTmpFree(buffer);
1332 }
1333
1334 static int InternalScanf(char *fmt, va_list l)
1335 {
1336   ChMessage replymsg;
1337   char *ptr[20];
1338   char *p; int nargs, i;
1339   nargs=0;
1340   p=fmt;
1341   while (*p) {
1342     if ((p[0]=='%')&&(p[1]=='*')) { p+=2; continue; }
1343     if ((p[0]=='%')&&(p[1]=='%')) { p+=2; continue; }
1344     if (p[0]=='%') { nargs++; p++; continue; }
1345     if (*p=='\n') *p=' '; p++;
1346   }
1347   if (nargs > 18) KillEveryone("CmiScanf only does 18 args.\n");
1348   for (i=0; i<nargs; i++) ptr[i]=va_arg(l, char *);
1349   CmiLock(Cmi_scanf_mutex);
1350   if (Cmi_charmrun_fd!=-1)
1351   {/*Send charmrun the format string*/
1352         ctrl_sendone_locking("scanf", fmt, strlen(fmt)+1,NULL,0);
1353         /*Wait for the reply (characters to scan) from charmrun*/
1354         CmiCommLock();
1355         ChMessage_recv(Cmi_charmrun_fd,&replymsg);
1356         i = sscanf((char*)replymsg.data, fmt,
1357                      ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1358                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1359                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1360         ChMessage_free(&replymsg);
1361         CmiCommUnlock();
1362   } else
1363   {/*Just do the scanf normally*/
1364         i=scanf(fmt, ptr[ 0], ptr[ 1], ptr[ 2], ptr[ 3], ptr[ 4], ptr[ 5],
1365                      ptr[ 6], ptr[ 7], ptr[ 8], ptr[ 9], ptr[10], ptr[11],
1366                      ptr[12], ptr[13], ptr[14], ptr[15], ptr[16], ptr[17]);
1367   }
1368   CmiUnlock(Cmi_scanf_mutex);
1369   return i;
1370 }
1371
1372 #if CMK_CMIPRINTF_IS_A_BUILTIN
1373
1374 /*New stdarg.h declarations*/
1375 void CmiPrintf(const char *fmt, ...)
1376 {
1377   CpdSystemEnter();
1378   {
1379   va_list p; va_start(p, fmt);
1380   if (Cmi_charmrun_fd!=-1)
1381     InternalPrintf(fmt, p);
1382   else
1383     vfprintf(stdout,fmt,p);
1384   va_end(p);
1385   }
1386   CpdSystemExit();
1387 }
1388
1389 void CmiError(const char *fmt, ...)
1390 {
1391   CpdSystemEnter();
1392   {
1393   va_list p; va_start (p, fmt);
1394   if (Cmi_charmrun_fd!=-1)
1395     InternalError(fmt, p);
1396   else
1397     vfprintf(stderr,fmt,p);
1398   va_end(p);
1399   }
1400   CpdSystemExit();
1401 }
1402
1403 int CmiScanf(const char *fmt, ...)
1404 {
1405   int i;
1406   CpdSystemEnter();
1407   {
1408   va_list p; va_start(p, fmt);
1409   i = InternalScanf((char *)fmt, p);
1410   va_end(p);
1411   }
1412   CpdSystemExit();
1413   return i;
1414 }
1415
1416 #endif
1417
1418 /***************************************************************************
1419  * Output redirection:
1420  *  When people don't use CkPrintf, like above, we'd still like to be able
1421  * to collect their output.  Thus we make a pipe and dup2 it to stdout,
1422  * which lets us read the characters sent to stdout at our lesiure.
1423  ***************************************************************************/
1424
1425 /*Can read from stdout or stderr using these fd's*/
1426 static int readStdout[2]; 
1427 static int writeStdout[2]; /*The original stdout/stderr sockets*/ 
1428 static int serviceStdout[2]; /*(bool) Normally zero; one if service needed.*/
1429 #define readStdoutBufLen (16*1024)
1430 static char readStdoutBuf[readStdoutBufLen+1]; /*Protected by comm. lock*/
1431 static int servicingStdout;
1432
1433 /*Initialization-- should only be called once per node*/
1434 static void CmiStdoutInit(void) {
1435         int i;
1436         if (Cmi_charmrun_fd==-1) return; /* standalone mode */
1437
1438 /*There's some way to do this same thing in windows, but I don't know how*/
1439 #if !defined(_WIN32) || defined(__CYGWIN__)
1440         /*Prevent buffering in stdio library:*/
1441         setbuf(stdout,NULL); setbuf(stderr,NULL);
1442
1443         /*Reopen stdout and stderr fd's as new pipes:*/
1444         for (i=0;i<2;i++) {
1445                 int pair[2];
1446                 int srcFd=1+i; /* 1 is stdout; 2 is stderr */
1447                 
1448                 /*First, save a copy of the original stdout*/
1449                 writeStdout[i]=dup(srcFd);
1450 #if 0
1451                 /*Build a pipe to connect to stdout (4kb buffer, but no SIGIO...)*/
1452                 if (-1==pipe(pair)) {perror("building stdio redirection pipe"); exit(1);}
1453 #else
1454                /* UNIX socket (16kb default buffer, and works with SIGIO!) */
1455                 if (-1==socketpair(PF_UNIX,SOCK_STREAM,0,pair)) 
1456                         {perror("building stdio redirection socketpair"); exit(1);}
1457 #endif
1458                 readStdout[i]=pair[0]; /*We get the read end of pipe*/
1459                 if (-1==dup2(pair[1],srcFd)) {perror("dup2 redirection pipe"); exit(1);}
1460                 
1461 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
1462                 CmiEnableNonblockingIO(srcFd);
1463 #endif
1464 #if CMK_SHARED_VARS_UNAVAILABLE
1465                 if (Cmi_asyncio)
1466                 {
1467   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
1468                         CmiEnableAsyncIO(readStdout[i]);
1469                 }
1470 #endif
1471         }
1472 #else
1473 /*Windows system-- just fake reads for now*/
1474 # ifndef read
1475 #  define read(x,y,z) 0
1476 # endif
1477 # ifndef write
1478 #  define write(x,y,z) 
1479 # endif
1480 #endif
1481 }
1482
1483 /*Sends data to original stdout (e.g., for ++debug or ++in-xterm)*/
1484 static void InternalWriteToTerminal(int isStdErr,const char *str,int len)
1485 {
1486         write(writeStdout[isStdErr],str,len);   
1487 }
1488
1489 /*
1490   Service this particular stdout pipe.  
1491   Must hold comm. lock.
1492 */
1493 static void CmiStdoutServiceOne(int i) {
1494         int nBytes;
1495         const static char *cmdName[2]={"print","printerr"};
1496         servicingStdout=1;
1497         while(1) {
1498                 const char *tooMuchWarn=NULL; int tooMuchLen=0;
1499                 if (!skt_select1(readStdout[i],0)) break; /*Nothing to read*/
1500                 nBytes=read(readStdout[i],readStdoutBuf,readStdoutBufLen);
1501                 if (nBytes<=0) break; /*Nothing to send*/
1502                 
1503                 /*Send these bytes off to charmrun*/
1504                 readStdoutBuf[nBytes]=0; /*Zero-terminate read string*/
1505                 nBytes++; /*Include zero-terminator in message to charmrun*/
1506                 
1507                 if (nBytes>=readStdoutBufLen-100) 
1508                 { /*We must have filled up our output pipe-- most output libraries
1509                    don't handle this well (e.g., glibc printf just drops the line).*/
1510                         
1511                         tooMuchWarn="\nWARNING: Too much output at once-- possible output discontinuity!\n"
1512                                 "Use CkPrintf to avoid discontinuity (and this warning).\n\n";
1513                         nBytes--; /*Remove terminator from user's data*/
1514                         tooMuchLen=strlen(tooMuchWarn)+1;
1515                 }
1516                 ctrl_sendone_nolock(cmdName[i],readStdoutBuf,nBytes,
1517                                     tooMuchWarn,tooMuchLen);
1518                 
1519                 InternalWriteToTerminal(i,readStdoutBuf,nBytes);
1520         }
1521         servicingStdout=0;
1522         serviceStdout[i]=0; /*This pipe is now serviced*/
1523 }
1524
1525 /*Service all stdout pipes, whether it looks like they need it
1526   or not.  Used when you aren't sure if select() has been called recently.
1527   Must hold comm. lock.
1528 */
1529 static void CmiStdoutServiceAll(void) {
1530         int i;
1531         for (i=0;i<2;i++) {
1532                 if (readStdout[i]==0) continue; /*Pipe not open*/
1533                 CmiStdoutServiceOne(i);
1534         }
1535 }
1536
1537 /*Service any outstanding stdout pipes.
1538   Must hold comm. lock.
1539 */
1540 static void CmiStdoutService(void) {
1541         CmiStdoutServiceAll();
1542 }
1543
1544 /*Add our pipes to the pile for select() or poll().
1545   Both can be called with or without the comm. lock.
1546 */
1547 static void CmiStdoutAdd(CMK_PIPE_PARAM) {
1548         int i;
1549         for (i=0;i<2;i++) {
1550                 if (readStdout[i]==0) continue; /*Pipe not open*/
1551                 CMK_PIPE_ADDREAD(readStdout[i]);
1552         }
1553 }
1554 static void CmiStdoutCheck(CMK_PIPE_PARAM) {
1555         int i;
1556         for (i=0;i<2;i++) {
1557                 if (readStdout[i]==0) continue; /*Pipe not open*/
1558                 if (CMK_PIPE_CHECKREAD(readStdout[i])) serviceStdout[i]=1;
1559         }
1560 }
1561 static int CmiStdoutNeedsService(void) {
1562         return (serviceStdout[0]!=0 || serviceStdout[1]!=0);
1563 }
1564
1565 /*Called every few milliseconds to flush the stdout pipes*/
1566 static void CmiStdoutFlush(void) {
1567         if (servicingStdout) return; /* might be called by SIGALRM */
1568         CmiCommLockOrElse( return; )
1569         CmiCommLock();
1570         CmiStdoutServiceAll();
1571         CmiCommUnlock();
1572 }
1573
1574 /***************************************************************************
1575  * Message Delivery:
1576  *
1577  ***************************************************************************/
1578
1579 #include "machine-dgram.c"
1580
1581
1582 #ifndef CmiNodeFirst
1583 int CmiNodeFirst(int node) { return nodes[node].nodestart; }
1584 int CmiNodeSize(int node)  { return nodes[node].nodesize; }
1585 #endif
1586
1587 #ifndef CmiNodeOf
1588 int CmiNodeOf(int pe)      { return (nodes_by_pe[pe] - nodes); }
1589 int CmiRankOf(int pe)      { return pe - (nodes_by_pe[pe]->nodestart); }
1590 #endif
1591
1592
1593 /*****************************************************************************
1594  *
1595  * node_addresses
1596  *
1597  *  These two functions fill the node-table.
1598  *
1599  *
1600  *   This node, like all others, first sends its own address to charmrun
1601  *   using this command:
1602  *
1603  *     Type: nodeinfo
1604  *     Data: Big-endian 4-byte ints
1605  *           <my-node #><Dataport>
1606  *
1607  *   When charmrun has all the addresses, he sends this table to me:
1608  *
1609  *     Type: nodes
1610  *     Data: Big-endian 4-byte ints
1611  *           <number of nodes n>
1612  *           <#PEs><IP><Dataport> Node 0
1613  *           <#PEs><IP><Dataport> Node 1
1614  *           ...
1615  *           <#PEs><IP><Dataport> Node n-1
1616  *
1617  *****************************************************************************/
1618
1619 #if CMK_USE_IBVERBS
1620 void copyInfiAddr(ChInfiAddr *qpList);
1621 #endif
1622
1623 #if CMK_IBVERBS_FAST_START
1624 static void send_partial_init()
1625 {
1626   ChMessageInt_t nodeNo = ChMessageInt_new(_Cmi_mynode);
1627         ctrl_sendone_nolock("partinit",(const char *)&(nodeNo),sizeof(nodeNo),NULL,0);
1628 }       
1629 #endif
1630
1631
1632 /*Note: node_addresses_obtain is called before starting
1633   threads, so no locks are needed (or valid!)*/
1634 static void node_addresses_obtain(char **argv)
1635 {
1636   ChMessage nodetabmsg; /* info about all nodes*/
1637   MACHSTATE(3,"node_addresses_obtain { ");
1638   if (Cmi_charmrun_fd==-1) 
1639   {/*Standalone-- fake a single-node nodetab message*/
1640         int npes=1;
1641         ChSingleNodeinfo *fakeTab;
1642         ChMessage_new("nodeinfo",sizeof(ChSingleNodeinfo),&nodetabmsg);
1643         fakeTab=(ChSingleNodeinfo *)(nodetabmsg.data);
1644         CmiGetArgIntDesc(argv,"+p",&npes,"Set the number of processes to create");
1645 #if CMK_SHARED_VARS_UNAVAILABLE
1646         if (npes!=1) {
1647                 fprintf(stderr,
1648                         "To use multiple processors, you must run this program as:\n"
1649                         " > charmrun +p%d %s <args>\n"
1650                         "or build the %s-smp version of Charm++.\n",
1651                         npes,argv[0],CMK_MACHINE_NAME);
1652                 exit(1);
1653         }
1654 #else
1655         /* standalone smp version reads ppn */
1656         if (CmiGetArgInt(argv, "+ppn", &_Cmi_mynodesize) || 
1657                CmiGetArgInt(argv, "++ppn", &_Cmi_mynodesize) )
1658           npes = _Cmi_mynodesize;
1659 #endif
1660         /*This is a stupid hack: we expect the *number* of nodes
1661         followed by ChNodeinfo structs; so we use a ChSingleNodeinfo
1662         (which happens to have exactly that layout!) and stuff
1663         a 1 into the "node number" slot
1664         */
1665         fakeTab->nodeNo=ChMessageInt_new(1); /* <- hack */
1666         fakeTab->info.nPE=ChMessageInt_new(npes);
1667         fakeTab->info.dataport=ChMessageInt_new(0);
1668         fakeTab->info.IP=_skt_invalid_ip;
1669   }
1670   else 
1671   { /*Contact charmrun for machine info.*/
1672         ChSingleNodeinfo me;
1673
1674         me.nodeNo=ChMessageInt_new(_Cmi_mynode);
1675
1676 #if CMK_USE_IBVERBS
1677         {
1678                 int qpListSize = (_Cmi_numnodes-1)*sizeof(ChInfiAddr);
1679                 me.info.qpList = malloc(qpListSize);
1680                 copyInfiAddr(me.info.qpList);
1681                 MACHSTATE1(3,"me.info.qpList created and copied size %d bytes",qpListSize);
1682                 ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),(const char *)me.info.qpList,qpListSize);
1683                 free(me.info.qpList);
1684         }
1685 #else
1686         /*The nPE fields are set by charmrun--
1687           these values don't matter. 
1688           Set IP in case it is mpiexec mode where charmrun does not have IP yet
1689         */
1690         me.info.nPE=ChMessageInt_new(0);
1691         /* me.info.IP=_skt_invalid_ip; */
1692         me.info.IP=skt_innode_my_ip();
1693         me.info.mach_id=ChMessageInt_new(Cmi_mach_id);
1694 #ifdef CMK_USE_MX
1695         me.info.nic_id=ChMessageLong_new(Cmi_nic_id);
1696 #endif
1697 #if CMK_USE_IBUD
1698         me.info.qp.lid=ChMessageInt_new(context->localAddr.lid);
1699         me.info.qp.qpn=ChMessageInt_new(context->localAddr.qpn);
1700         me.info.qp.psn=ChMessageInt_new(context->localAddr.psn);
1701         MACHSTATE3(3,"IBUD Information lid=%i qpn=%i psn=%i\n",me.info.qp.lid,me.info.qp.qpn,me.info.qp.psn);
1702 #endif
1703         me.info.dataport=ChMessageInt_new(dataport);
1704
1705         /*Send our node info. to charmrun.
1706         CommLock hasn't been initialized yet-- 
1707         use non-locking version*/
1708         ctrl_sendone_nolock("initnode",(const char *)&me,sizeof(me),NULL,0);
1709         MACHSTATE1(5,"send initnode - dataport:%d", dataport);
1710 #endif  //CMK_USE_IBVERBS
1711
1712         MACHSTATE(3,"initnode sent");
1713   
1714         /*We get the other node addresses from a message sent
1715           back via the charmrun control port.*/
1716         if (!skt_select1(Cmi_charmrun_fd,1200*1000)){
1717                 CmiAbort("Timeout waiting for nodetab!\n");
1718         }
1719         MACHSTATE(2,"recv initnode {");
1720         ChMessage_recv(Cmi_charmrun_fd,&nodetabmsg);
1721         MACHSTATE(2,"} recv initnode");
1722   }
1723 //#if CMK_USE_IBVERBS   
1724 //#else
1725   node_addresses_store(&nodetabmsg);
1726   ChMessage_free(&nodetabmsg);
1727 //#endif        
1728   MACHSTATE(3,"} node_addresses_obtain ");
1729 }
1730
1731 #if CMK_NODE_QUEUE_AVAILABLE
1732
1733 /***********************************************************************
1734  * DeliverOutgoingNodeMessage()
1735  *
1736  * This function takes care of delivery of outgoing node messages from the
1737  * sender end. Broadcast messages are divided into sets of messages that 
1738  * are bound to the local node, and to remote nodes. For local
1739  * transmission, the messages are directly pushed into the recv
1740  * queues. For non-local transmission, the function DeliverViaNetwork()
1741  * is called
1742  ***********************************************************************/
1743 void DeliverOutgoingNodeMessage(OutgoingMsg ogm)
1744 {
1745   int i, rank, dst; OtherNode node;
1746
1747   dst = ogm->dst;
1748   switch (dst) {
1749   case NODE_BROADCAST_ALL:
1750     CmiPushNode(CopyMsg(ogm->data,ogm->size));
1751     /*case-fallthrough (no break)-- deliver to all other processors*/
1752   case NODE_BROADCAST_OTHERS:
1753 #if CMK_BROADCAST_SPANNING_TREE
1754     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1755 #elif CMK_BROADCAST_HYPERCUBE
1756     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_NODEBROADCAST);
1757 #else
1758     for (i=0; i<_Cmi_numnodes; i++)
1759       if (i!=_Cmi_mynode)
1760         DeliverViaNetwork(ogm, nodes + i, DGRAM_NODEMESSAGE, DGRAM_ROOTPE_MASK, 1);
1761 #endif
1762     GarbageCollectMsg(ogm);
1763     break;
1764   default:
1765     node = nodes+dst;
1766     rank=DGRAM_NODEMESSAGE;
1767     if (dst != _Cmi_mynode) {
1768       DeliverViaNetwork(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1769       GarbageCollectMsg(ogm);
1770     } else {
1771       if (ogm->freemode == 'A') {
1772         CmiPushNode(CopyMsg(ogm->data,ogm->size));
1773         ogm->freemode = 'X';
1774       } else {
1775         CmiPushNode(ogm->data);
1776         FreeOutgoingMsg(ogm);
1777       }
1778     }
1779   }
1780 }
1781
1782 #else
1783
1784 #define DeliverOutgoingNodeMessage(msg) DeliverOutgoingMessage(msg)
1785
1786 #endif
1787
1788 #if CMK_C_INLINE
1789 inline static
1790 #endif
1791 void DeliverViaNetworkOrPxshm(OutgoingMsg ogm,OtherNode node,int rank,unsigned int broot,int copy){
1792 #if CMK_USE_SYSVSHM
1793         {
1794 #if SYSVSHM_STATS
1795         double _startValidTime = CmiWallTimer();
1796 #endif
1797         int ret=CmiValidSysvshm(ogm,node);
1798 #if SYSVSHM_STATS
1799         sysvshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1800 #endif                  
1801         MACHSTATE4(3,"Msg ogm %p size %d dst %d useSysvShm %d",ogm,ogm->size,ogm->dst,ret);
1802         if(ret){
1803                 CmiSendMessageSysvshm(ogm,node,rank,broot);
1804         }else{
1805                 DeliverViaNetwork(ogm, node, rank, broot,copy);
1806         }
1807         } 
1808 #elif CMK_USE_PXSHM
1809      {
1810 #if PXSHM_STATS
1811         double _startValidTime = CmiWallTimer();
1812 #endif
1813       int ret=CmiValidPxshm(ogm,node);
1814 #if PXSHM_STATS
1815         pxshmContext->validCheckTime += CmiWallTimer() - _startValidTime;
1816 #endif                  
1817       MACHSTATE4(3,"Msg ogm %p size %d dst %d usePxShm %d",ogm,ogm->size,ogm->dst,ret);
1818       if(ret){
1819          CmiSendMessagePxshm(ogm,node,rank,broot);
1820        }else{
1821          DeliverViaNetwork(ogm, node, rank, broot,copy);
1822        }
1823       } 
1824 #else
1825       DeliverViaNetwork(ogm, node, rank, broot, copy);
1826 #endif                  
1827         
1828 }
1829
1830
1831
1832 /***********************************************************************
1833  * DeliverOutgoingMessage()
1834  *
1835  * This function takes care of delivery of outgoing messages from the
1836  * sender end. Broadcast messages are divided into sets of messages that 
1837  * are bound to the local node, and to remote nodes. For local
1838  * transmission, the messages are directly pushed into the recv
1839  * queues. For non-local transmission, the function DeliverViaNetwork()
1840  * is called
1841  ***********************************************************************/
1842 int DeliverOutgoingMessage(OutgoingMsg ogm)
1843 {
1844   int i, rank, dst; OtherNode node;
1845         
1846   int network = 1;
1847
1848         
1849   dst = ogm->dst;
1850
1851   switch (dst) {
1852   case PE_BROADCAST_ALL:
1853 #if !CMK_SMP_NOT_RELAX_LOCK       
1854     CmiCommLock();
1855 #endif
1856     for (rank = 0; rank<_Cmi_mynodesize; rank++) {
1857       CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1858     }
1859 #if CMK_BROADCAST_SPANNING_TREE
1860     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1861 #elif CMK_BROADCAST_HYPERCUBE
1862     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1863 #else
1864     for (i=0; i<_Cmi_numnodes; i++)
1865       if (i!=_Cmi_mynode){
1866         /*FAULT_EVAC : is the target processor valid*/
1867         if(CmiNodeAlive(i)){
1868           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1869         }
1870       } 
1871 #endif
1872     GarbageCollectMsg(ogm);
1873 #if !CMK_SMP_NOT_RELAX_LOCK               
1874     CmiCommUnlock();
1875 #endif    
1876     break;
1877   case PE_BROADCAST_OTHERS:
1878 #if !CMK_SMP_NOT_RELAX_LOCK               
1879     CmiCommLock();
1880 #endif  
1881     for (rank = 0; rank<_Cmi_mynodesize; rank++)
1882       if (rank + Cmi_nodestart != ogm->src) {
1883         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1884       }
1885 #if CMK_BROADCAST_SPANNING_TREE
1886     SendSpanningChildren(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1887 #elif CMK_BROADCAST_HYPERCUBE
1888     SendHypercube(ogm, 1, 0, NULL, 0, DGRAM_BROADCAST);
1889 #else
1890     for (i = 0; i<_Cmi_numnodes; i++)
1891       if (i!=_Cmi_mynode){
1892         /*FAULT_EVAC : is the target processor valid*/
1893         if(CmiNodeAlive(i)){
1894           DeliverViaNetworkOrPxshm(ogm, nodes+i, DGRAM_BROADCAST, DGRAM_ROOTPE_MASK, 1);
1895         }
1896       } 
1897 #endif
1898     GarbageCollectMsg(ogm);
1899 #if !CMK_SMP_NOT_RELAX_LOCK               
1900     CmiCommUnlock();
1901 #endif    
1902     break;
1903   default:
1904 #ifndef CMK_OPTIMIZE
1905     if (dst<0 || dst>=CmiNumPes())
1906       CmiAbort("Send to out-of-bounds processor!");
1907 #endif
1908     node = nodes_by_pe[dst];
1909     rank = dst - node->nodestart;
1910     if (node->nodestart != Cmi_nodestart) {
1911 #if !CMK_SMP_NOT_RELAX_LOCK                     
1912         CmiCommLock();
1913 #endif          
1914         DeliverViaNetworkOrPxshm(ogm, node, rank, DGRAM_ROOTPE_MASK, 0);
1915         GarbageCollectMsg(ogm);
1916 #if !CMK_SMP_NOT_RELAX_LOCK                     
1917         CmiCommUnlock();
1918 #endif          
1919     } else {
1920       network = 0;
1921       if (ogm->freemode == 'A') {
1922         CmiPushPE(rank,CopyMsg(ogm->data,ogm->size));
1923         ogm->freemode = 'X';
1924       } else {
1925         CmiPushPE(rank, ogm->data);
1926         FreeOutgoingMsg(ogm);
1927       }
1928     }
1929   }
1930 #if CMK_MULTICORE
1931   network = 0;
1932 #endif
1933   return network;
1934 }
1935
1936
1937 /******************************************************************************
1938  *
1939  * CmiGetNonLocal
1940  *
1941  * The design of this system is that the communication thread does all the
1942  * work, to eliminate as many locking issues as possible.  This is the only
1943  * part of the code that happens in the receiver-thread.
1944  *
1945  * This operation is fairly cheap, it might be worthwhile to inline
1946  * the code into CmiDeliverMsgs to reduce function call overhead.
1947  *
1948  *****************************************************************************/
1949
1950 #if CMK_NODE_QUEUE_AVAILABLE
1951 char *CmiGetNonLocalNodeQ(void)
1952 {
1953   char *result = 0;
1954   if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {
1955     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1956     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1957     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1958   }
1959   return result;
1960 }
1961 #endif
1962
1963 void *CmiGetNonLocal(void)
1964 {
1965 #if CMK_SMP_MULTIQ
1966   int i;
1967 #endif
1968
1969   CmiState cs = CmiGetState();
1970   CmiIdleLock_checkMessage(&cs->idle);
1971
1972 #if !CMK_SMP_MULTIQ
1973   return (void *) PCQueuePop(cs->recv);
1974 #else
1975   void *retVal = NULL;
1976   for(i=cs->curPolledIdx; i<MULTIQ_GRPSIZE; i++){
1977     retVal = (void *)PCQueuePop(cs->recv[i]);
1978     if(retVal!=NULL) {
1979         cs->curPolledIdx = i+1;
1980         return retVal;
1981     }
1982   }
1983   cs->curPolledIdx=0;
1984   return NULL;
1985 #endif
1986 }
1987
1988
1989 /**
1990  * Set up an OutgoingMsg structure for this message.
1991  */
1992 static OutgoingMsg PrepareOutgoing(CmiState cs,int pe,int size,int freemode,char *data) {
1993   OutgoingMsg ogm;
1994   MallocOutgoingMsg(ogm);
1995   MACHSTATE2(2,"Preparing outgoing message for pe %d, size %d",pe,size);
1996   ogm->size = size;
1997   ogm->data = data;
1998   ogm->src = cs->pe;
1999   ogm->dst = pe;
2000   ogm->freemode = freemode;
2001   ogm->refcount = 0;
2002   return (CmiCommHandle)ogm;    
2003 }
2004
2005 #if CMK_NODE_QUEUE_AVAILABLE
2006
2007 /******************************************************************************
2008  *
2009  * CmiGeneralNodeSend
2010  *
2011  * Description: This is a generic message sending routine. All the
2012  * converse message send functions are implemented in terms of this
2013  * function. (By setting appropriate flags (eg freemode) that tell
2014  * CmiGeneralSend() how exactly to handle the particular case of
2015  * message send)
2016  *
2017  *****************************************************************************/
2018
2019 CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
2020 {
2021   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2022   MACHSTATE(1,"CmiGeneralNodeSend {");
2023
2024   if (freemode == 'S') {
2025     char *copy = (char *)CmiAlloc(size);
2026     if (!copy)
2027       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2028     memcpy(copy, data, size);
2029     data = copy; freemode = 'F';
2030   }
2031
2032 #if CMK_IMMEDIATE_MSG
2033     /* execute the immediate message right away */
2034   if (node == CmiMyNode() && CmiIsImmediate(data)) {
2035     CmiPushImmediateMsg(data);
2036       /* only communication thread executes immediate messages in SMP */
2037     if (!_immRunning) CmiHandleImmediate();
2038     return;
2039   }
2040 #endif
2041
2042   CmiMsgHeaderSetLength(data, size);
2043   ogm=PrepareOutgoing(cs,node,size,freemode,data);
2044   CmiCommLock();
2045   DeliverOutgoingNodeMessage(ogm);
2046   CmiCommUnlock();
2047   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2048   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2049   MACHSTATE(1,"} CmiGeneralNodeSend");
2050   return (CmiCommHandle)ogm;
2051 }
2052
2053 #endif
2054
2055
2056 /******************************************************************************
2057  *
2058  * CmiGeneralSend
2059  *
2060  * Description: This is a generic message sending routine. All the
2061  * converse message send functions are implemented in terms of this
2062  * function. (By setting appropriate flags (eg freemode) that tell
2063  * CmiGeneralSend() how exactly to handle the particular case of
2064  * message send)
2065  *
2066  *****************************************************************************/
2067
2068 CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
2069 {
2070   int sendonnetwork;
2071   CmiState cs = CmiGetState(); OutgoingMsg ogm;
2072   MACHSTATE(1,"CmiGeneralSend {");
2073
2074   if (freemode == 'S') {
2075 #if CMK_USE_GM
2076     if (pe != cs->pe) {
2077       freemode = 'G';
2078     }
2079     else
2080 #endif
2081     {
2082     char *copy = (char *)CmiAlloc(size);
2083     if (!copy)
2084       fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
2085     memcpy(copy, data, size);
2086     data = copy; freemode = 'F';
2087     }
2088   }
2089   
2090   if (pe == cs->pe) {
2091 #if ! CMK_SMP
2092     if (!_immRunning) /* CdsFifo_Enqueue, below, isn't SIGIO or thread safe.  
2093                       The SMP comm thread never gets here, because of the pe test. */
2094 #endif
2095     {
2096 #if CMK_IMMEDIATE_MSG
2097       /* execute the immediate message right away */
2098       /* but to avoid infinite recursive call, don't do this if _immRunning */
2099     if (CmiIsImmediate(data)) {
2100       CmiPushImmediateMsg(data);
2101       CmiHandleImmediate();
2102       return 0;
2103     }
2104 #endif
2105     CdsFifo_Enqueue(cs->localqueue, data);
2106     if (freemode == 'A') {
2107       MallocOutgoingMsg(ogm);
2108       ogm->freemode = 'X';
2109       return ogm;
2110     } else return 0;
2111     }
2112   }
2113
2114 #if CMK_PERSISTENT_COMM
2115   if (phs) {
2116       CmiAssert(phsSize == 1);
2117       CmiSendPersistentMsg(*phs, pe, size, data);
2118       return NULL;
2119   }
2120 #endif
2121
2122   CmiMsgHeaderSetLength(data, size);
2123   ogm=PrepareOutgoing(cs,pe,size,freemode,data);
2124
2125   #if CMK_SMP_NOT_RELAX_LOCK  
2126   CmiCommLock();
2127 #endif  
2128   
2129   sendonnetwork = DeliverOutgoingMessage(ogm);
2130   
2131 #if CMK_SMP_NOT_RELAX_LOCK  
2132   CmiCommUnlock();
2133 #endif  
2134   
2135   /* Check if any packets have arrived recently (preserves kernel network buffers). */
2136 #if CMK_USE_SYSVSHM
2137         CommunicationServerSysvshm();
2138 #elif CMK_USE_PXSHM
2139         CommunicationServerPxshm();
2140 #endif
2141 #if !CMK_SHARED_VARS_UNAVAILABLE
2142 #if !CMK_SMP_NOT_SKIP_COMMSERVER
2143   if (sendonnetwork!=0)   /* only call server when we send msg on network in SMP */
2144 #endif
2145 #endif
2146   CommunicationServer(0, COMM_SERVER_FROM_WORKER);
2147   MACHSTATE(1,"}  CmiGeneralSend");
2148   return (CmiCommHandle)ogm;
2149 }
2150
2151
2152 void CmiSyncSendFn(int p, int s, char *m)
2153
2154   CQdCreate(CpvAccess(cQdState), 1);
2155   CmiGeneralSend(p,s,'S',m); 
2156 }
2157
2158 CmiCommHandle CmiAsyncSendFn(int p, int s, char *m)
2159
2160   CQdCreate(CpvAccess(cQdState), 1);
2161   return CmiGeneralSend(p,s,'A',m); 
2162 }
2163
2164 void CmiFreeSendFn(int p, int s, char *m)
2165
2166   CQdCreate(CpvAccess(cQdState), 1);
2167   CmiGeneralSend(p,s,'F',m); 
2168 }
2169
2170 void CmiSyncBroadcastFn(int s, char *m)
2171
2172   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2173   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'S',m); 
2174 }
2175
2176 CmiCommHandle CmiAsyncBroadcastFn(int s, char *m)
2177
2178   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1); 
2179   return CmiGeneralSend(PE_BROADCAST_OTHERS,s,'A',m); 
2180 }
2181
2182 void CmiFreeBroadcastFn(int s, char *m)
2183
2184   CQdCreate(CpvAccess(cQdState), CmiNumPes()-1);
2185   CmiGeneralSend(PE_BROADCAST_OTHERS,s,'F',m); 
2186 }
2187
2188 void CmiSyncBroadcastAllFn(int s, char *m)
2189
2190   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2191   CmiGeneralSend(PE_BROADCAST_ALL,s,'S',m); 
2192 }
2193
2194 CmiCommHandle CmiAsyncBroadcastAllFn(int s, char *m)
2195
2196   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2197   return CmiGeneralSend(PE_BROADCAST_ALL,s,'A',m); 
2198 }
2199
2200 void CmiFreeBroadcastAllFn(int s, char *m)
2201
2202   CQdCreate(CpvAccess(cQdState), CmiNumPes()); 
2203   CmiGeneralSend(PE_BROADCAST_ALL,s,'F',m); 
2204 }
2205
2206 #if CMK_NODE_QUEUE_AVAILABLE
2207
2208 void CmiSyncNodeSendFn(int p, int s, char *m)
2209
2210   CQdCreate(CpvAccess(cQdState), 1);
2211   CmiGeneralNodeSend(p,s,'S',m); 
2212 }
2213
2214 CmiCommHandle CmiAsyncNodeSendFn(int p, int s, char *m)
2215
2216   CQdCreate(CpvAccess(cQdState), 1);
2217   return CmiGeneralNodeSend(p,s,'A',m); 
2218 }
2219
2220 void CmiFreeNodeSendFn(int p, int s, char *m)
2221
2222   CQdCreate(CpvAccess(cQdState), 1);
2223   CmiGeneralNodeSend(p,s,'F',m); 
2224 }
2225
2226 void CmiSyncNodeBroadcastFn(int s, char *m)
2227
2228   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2229   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'S',m); 
2230 }
2231
2232 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
2233
2234   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2235   return CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'A',m);
2236 }
2237
2238 void CmiFreeNodeBroadcastFn(int s, char *m)
2239
2240   CQdCreate(CpvAccess(cQdState), CmiNumNodes()-1);
2241   CmiGeneralNodeSend(NODE_BROADCAST_OTHERS,s,'F',m); 
2242 }
2243
2244 void CmiSyncNodeBroadcastAllFn(int s, char *m)
2245
2246   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2247   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'S',m); 
2248 }
2249
2250 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
2251
2252   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2253   return CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'A',m); 
2254 }
2255
2256 void CmiFreeNodeBroadcastAllFn(int s, char *m)
2257
2258   CQdCreate(CpvAccess(cQdState), CmiNumNodes());
2259   CmiGeneralNodeSend(NODE_BROADCAST_ALL,s,'F',m); 
2260 }
2261 #endif
2262
2263 /******************************************************************************
2264  *
2265  * Comm Handle manipulation.
2266  *
2267  *****************************************************************************/
2268
2269 int CmiAsyncMsgSent(CmiCommHandle handle)
2270 {
2271   return (((OutgoingMsg)handle)->freemode == 'X');
2272 }
2273
2274 void CmiReleaseCommHandle(CmiCommHandle handle)
2275 {
2276   FreeOutgoingMsg(((OutgoingMsg)handle));
2277 }
2278
2279 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
2280
2281 /*****************************************************************************
2282  *
2283  * NET version List-Cast and Multicast Code
2284  *
2285  ****************************************************************************/
2286                                                                                 
2287 void CmiSyncListSendFn(int npes, int *pes, int len, char *msg)
2288 {
2289   int i;
2290   for(i=0;i<npes;i++) {
2291     CmiReference(msg);
2292     CmiSyncSendAndFree(pes[i], len, msg);
2293   }
2294 }
2295                                                                                 
2296 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int len, char *msg)
2297 {
2298   CmiError("ListSend not implemented.");
2299   return (CmiCommHandle) 0;
2300 }
2301                                                                                 
2302 /* 
2303   because in all net versions, the message buffer after CmiSyncSendAndFree
2304   returns is not changed, we can use memory reference trick to avoid 
2305   memory copying here
2306 */
2307 void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
2308 {
2309   int i;
2310   for(i=0;i<npes;i++) {
2311     CmiReference(msg);
2312     CmiSyncSendAndFree(pes[i], len, msg);
2313   }
2314   CmiFree(msg);
2315 }
2316
2317 #endif
2318
2319 #if CMK_BROADCAST_SPANNING_TREE
2320 /*
2321   if root is 1, it is called from the broadcast root, only ogm is needed;
2322   if root is 0, it is called in the tree, ogm must be NULL and msg, size and startpe are needed
2323   note: function leaves msg buffer untouched
2324 */
2325 void SendSpanningChildren(OutgoingMsg ogm, int root, int size, char *msg, unsigned int startpe, int noderank)
2326 {
2327   CmiState cs = CmiGetState();
2328   int i;
2329
2330   if (root) startpe = _Cmi_mynode;
2331   else ogm = NULL;
2332
2333   CmiAssert(startpe>=0 && startpe<_Cmi_numnodes);
2334
2335   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
2336     int p = _Cmi_mynode-startpe;
2337     if (p<0) p+=_Cmi_numnodes;
2338     p = BROADCAST_SPANNING_FACTOR*p + i;
2339     if (p > _Cmi_numnodes - 1) break;
2340     p += startpe;
2341     p = p%_Cmi_numnodes;
2342     CmiAssert(p!=_Cmi_mynode);
2343     /* CmiPrintf("SendSpanningChildren: %d => %d\n", _Cmi_mynode, p); */
2344     if (!root && !ogm) ogm=PrepareOutgoing(cs, PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2345   //  DeliverViaNetwork(ogm, nodes + p, noderank, startpe, 1);
2346     DeliverViaNetworkOrPxshm(ogm, nodes+p, noderank, startpe, 1);
2347   }
2348   if (!root && ogm) GarbageCollectMsg(ogm);
2349 }
2350 #endif
2351
2352 #if CMK_BROADCAST_HYPERCUBE
2353 int log_of_2 (int i) {
2354   int m;
2355   for (m=0; i>(1<<m); ++m);
2356   return m;
2357 }
2358
2359 /* called from root - send msg along the hypercube in broadcast.
2360   note: function leaves msg buffer untouched
2361 */
2362 void SendHypercube(OutgoingMsg ogm, int root, int size, char *msg, unsigned int srcpe, int noderank)
2363 {
2364   CmiState cs = CmiGetState();
2365   int i, k, npes, tmp;
2366   int *dest_pes;
2367
2368   if (root) {
2369     msg = ogm->data;
2370     srcpe = CmiMyNode();
2371   }
2372   else ogm = NULL;
2373
2374   tmp = srcpe ^ CmiMyNode();
2375   k = log_of_2(CmiNumNodes()) + 2;
2376   if (tmp) {
2377      do {--k;} while (!(tmp>>k));
2378   }
2379
2380         MACHSTATE2(3,"Broadcast SendHypercube ogm %p size %d",ogm,size);
2381
2382   dest_pes = CmiTmpAlloc(sizeof(int)*(k+1));
2383   k--;
2384   npes = HypercubeGetBcastDestinations(CmiMyNode(), CmiNumNodes(), k, dest_pes);
2385   
2386   for (i = 0; i < npes; i++) {
2387     int p = dest_pes[i];
2388     /* CmiPrintf("SendHypercube: %d => %d (%d)\n", cs->pe, p, i); */
2389     if (!root && ! ogm) 
2390       ogm=PrepareOutgoing(cs ,PE_BROADCAST_OTHERS, size,'F',CopyMsg(msg, size));
2391     DeliverViaNetworkOrPxshm(ogm, nodes + p, noderank, CmiMyNode(), 1);
2392   }
2393   if (!root && ogm) GarbageCollectMsg(ogm);
2394   CmiTmpFree(dest_pes);
2395 }
2396 #endif
2397
2398 /*
2399 #if CMK_IMMEDIATE_MSG
2400 void CmiProbeImmediateMsg()
2401 {
2402   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2403 }
2404 #endif
2405 */
2406
2407 /* Network progress function is used to poll the network when for
2408    messages. This flushes receive buffers on some implementations*/ 
2409 void CmiMachineProgressImpl()
2410 {
2411 #if CMK_USE_SYSVSHM
2412         CommunicationServerSysvshm();
2413 #elif CMK_USE_PXSHM
2414         CommunicationServerPxshm();
2415 #endif
2416   CommunicationServer(0, COMM_SERVER_FROM_SMP);
2417 }
2418
2419 /******************************************************************************
2420  *
2421  * Main code, Init, and Exit
2422  *
2423  *****************************************************************************/
2424
2425 #if CMK_BARRIER_USE_COMMON_CODE
2426
2427 /* happen at node level */
2428 /* must be called on every PE including communication processors */
2429 int CmiBarrier()
2430 {
2431   int len, size, i;
2432   int status;
2433   int numnodes = CmiNumNodes();
2434   static int barrier_phase = 0;
2435
2436   if (Cmi_charmrun_fd == -1) return 0;                // standalone
2437   if (numnodes == 1) {
2438     CmiNodeAllBarrier();
2439     return 0;
2440   }
2441
2442   if (CmiMyRank() == 0) {
2443     ctrl_sendone_locking("barrier",NULL,0,NULL,0);
2444     while (barrierReceived != 1) {
2445       CmiCommLock();
2446       ctrl_getone();
2447       CmiCommUnlock();
2448     }
2449     barrierReceived = 0;
2450     barrier_phase ++;
2451   }
2452
2453   CmiNodeAllBarrier();
2454   /* printf("[%d] OUT of barrier %d \n", CmiMyPe(), barrier_phase); */
2455   return 0;
2456 }
2457
2458
2459 int CmiBarrierZero()
2460 {
2461   int i;
2462   int numnodes = CmiNumNodes();
2463   ChMessage msg;
2464
2465   if (Cmi_charmrun_fd == -1) return 0;                // standalone
2466   if (numnodes == 1) {
2467     CmiNodeAllBarrier();
2468     return 0;
2469   }
2470
2471   if (CmiMyRank() == 0) {
2472     char str[64];
2473     sprintf(str, "%d", CmiMyNode());
2474     ctrl_sendone_locking("barrier0",str,strlen(str)+1,NULL,0);
2475     if (CmiMyNode() == 0) {
2476       while (barrierReceived != 2) {
2477         CmiCommLock();
2478         ctrl_getone();
2479         CmiCommUnlock();
2480       }
2481       barrierReceived = 0;
2482     }
2483   }
2484
2485   CmiNodeAllBarrier();
2486   return 0;
2487 }
2488
2489 #endif
2490
2491 /******************************************************************************
2492  *
2493  * Main code, Init, and Exit
2494  *
2495  *****************************************************************************/
2496 extern void CthInit(char **argv);
2497 extern void ConverseCommonInit(char **);
2498
2499 static char     **Cmi_argv;
2500 static char     **Cmi_argvcopy;
2501 static CmiStartFn Cmi_startfn;   /* The start function */
2502 static int        Cmi_usrsched;  /* Continue after start function finishes? */
2503
2504 static void ConverseRunPE(int everReturn)
2505 {
2506   CmiIdleState *s=CmiNotifyGetState();
2507   CmiState cs;
2508   char** CmiMyArgv;
2509   CmiNodeAllBarrier();
2510   cs = CmiGetState();
2511   CpvInitialize(void *,CmiLocalQueue);
2512   CpvAccess(CmiLocalQueue) = cs->localqueue;
2513
2514   /* all non 0 rank use the copied one while rank 0 will modify the actual argv */
2515   if (CmiMyRank())
2516     CmiMyArgv = CmiCopyArgs(Cmi_argvcopy);
2517   else
2518     CmiMyArgv = Cmi_argv;
2519   CthInit(CmiMyArgv);
2520
2521 #if CMK_USE_GM
2522   CmiCheckGmStatus();
2523 #endif
2524
2525   ConverseCommonInit(CmiMyArgv);
2526
2527   /* initialize the network progress counter*/
2528   /* Network progress function is used to poll the network when for
2529      messages. This flushes receive buffers on some  implementations*/ 
2530   CpvInitialize(int , networkProgressCount);
2531   CpvAccess(networkProgressCount) = 0;
2532
2533   /* better to show the status here */
2534   if (CmiMyPe() == 0) {
2535     if (Cmi_netpoll == 1) {
2536       CmiPrintf("Charm++> scheduler running in netpoll mode.\n");
2537     }
2538 #if CMK_SHARED_VARS_UNAVAILABLE
2539     else {
2540       if (CmiMemoryIs(CMI_MEMORY_IS_OS))
2541         CmiAbort("Charm++ Fatal Error: interrupt mode does not work with default system memory allocator. Run with +netpoll to disable the interrupt.");
2542     }
2543 #endif
2544   }
2545 #if MEMORYUSAGE_OUTPUT
2546   memoryusage_counter = 0;
2547 #endif
2548 #if CMK_USE_GM || CMK_USE_MX
2549   if (Cmi_charmrun_fd != -1)
2550 #endif
2551   {
2552   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
2553       (CcdVoidFn) CmiNotifyBeginIdle, (void *) s);
2554   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
2555       (CcdVoidFn) CmiNotifyStillIdle, (void *) s);
2556 #if CMK_USE_SYSVSHM
2557          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdleSysvshm, NULL);
2558          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdleSysvshm, NULL);
2559 #elif CMK_USE_PXSHM
2560                 //TODO: add pxshm notify idle
2561          CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn) CmiNotifyBeginIdlePxshm, NULL);
2562          CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn) CmiNotifyStillIdlePxshm, NULL);
2563 #endif
2564   }
2565
2566 #if CMK_SHARED_VARS_UNAVAILABLE
2567   if (Cmi_netpoll) /*Repeatedly call CommServer*/
2568     CcdCallOnConditionKeep(CcdPERIODIC, 
2569         (CcdVoidFn) CommunicationPeriodic, NULL);
2570   else /*Only need this for retransmits*/
2571     CcdCallOnConditionKeep(CcdPERIODIC_10ms, 
2572         (CcdVoidFn) CommunicationPeriodic, NULL);
2573 #endif
2574
2575   if (CmiMyRank()==0 && Cmi_charmrun_fd!=-1) {
2576     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CmiStdoutFlush, NULL);
2577 #if CMK_SHARED_VARS_UNAVAILABLE
2578     if (!Cmi_asyncio) {
2579     /* gm cannot live with setitimer */
2580     CcdCallFnAfter((CcdVoidFn)pingCharmrunPeriodic,NULL,1000);
2581     }
2582     else {
2583     /*Occasionally ping charmrun, to test if it's dead*/
2584     struct itimerval i;
2585     CmiSignal(SIGALRM, 0, 0, pingCharmrun);
2586 #if MEMORYUSAGE_OUTPUT
2587     i.it_interval.tv_sec = 0;
2588     i.it_interval.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2589     i.it_value.tv_sec = 0;
2590     i.it_value.tv_usec = 1000000/MEMORYUSAGE_OUTPUT_FREQ;
2591 #else
2592     i.it_interval.tv_sec = 1;
2593     i.it_interval.tv_usec = 0;
2594     i.it_value.tv_sec = 1;
2595     i.it_value.tv_usec = 0;
2596 #endif
2597     setitimer(ITIMER_REAL, &i, NULL);
2598     }
2599
2600 #if ! CMK_USE_GM && ! CMK_USE_MX && ! CMK_USE_TCP && ! CMK_USE_IBVERBS
2601     /*Occasionally check for retransmissions, outgoing acks, etc.*/
2602     /*no need for GM case */
2603     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
2604 #endif
2605 #endif
2606     
2607     /*Initialize the clock*/
2608     Cmi_clock=GetClock();
2609   }
2610
2611 #ifdef IGET_FLOWCONTROL 
2612   /* Call the function once to determine the amount of physical memory available */
2613   getAvailSysMem();
2614   /* Call the function to periodically call the token adapt function */
2615   CcdCallFnAfter((CcdVoidFn)TokenUpdatePeriodic, NULL, 2000); // magic number of 2000ms
2616   CcdCallOnConditionKeep(CcdPERIODIC_10s,   // magic number of PERIOD 10s
2617         (CcdVoidFn) TokenUpdatePeriodic, NULL);
2618 #endif
2619   
2620 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
2621   srand((int)(1024.0*CmiWallTimer()));
2622   if (CmiMyPe()==0)
2623     CmiPrintf("Charm++: Machine layer will randomly corrupt every %d'th message (rand %d)\n",
2624         CMK_RANDOMLY_CORRUPT_MESSAGES,rand());
2625 #endif
2626
2627 #ifdef __ONESIDED_IMPL
2628 #ifdef __ONESIDED_NO_HARDWARE
2629   putSrcHandler = CmiRegisterHandler((CmiHandler)handlePutSrc);
2630   putDestHandler = CmiRegisterHandler((CmiHandler)handlePutDest);
2631   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2632   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2633 #endif
2634 #ifdef __ONESIDED_GM_HARDWARE
2635   getSrcHandler = CmiRegisterHandler((CmiHandler)handleGetSrc);
2636   getDestHandler = CmiRegisterHandler((CmiHandler)handleGetDest);
2637 #endif
2638 #endif
2639
2640   /* communication thread */
2641   if (CmiMyRank() == CmiMyNodeSize()) {
2642     if(!everReturn) Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2643     if (Cmi_charmrun_fd!=-1)
2644           while (1) CommunicationServer(5, COMM_SERVER_FROM_SMP);
2645   }
2646   else{
2647     if (!everReturn) {
2648       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2649       /* turn on immediate messages only now
2650        node barrier previously should take care of the node synchronization */
2651       _immediateReady = 1;
2652       if (Cmi_usrsched==0) CsdScheduler(-1);
2653       ConverseExit();
2654     }else{
2655       _immediateReady = 1;
2656     }
2657   }
2658 }
2659
2660 void ConverseExit(void)
2661 {
2662   MACHSTATE(2,"ConverseExit {");
2663   machine_initiated_shutdown=1;
2664   if (CmiMyRank()==0) {
2665     if(Cmi_print_stats)
2666       printNetStatistics();
2667     log_done();
2668   }
2669 #if CMK_USE_SYSVSHM
2670         CmiExitSysvshm();
2671 #elif CMK_USE_PXSHM
2672         CmiExitPxshm();
2673 #endif
2674   ConverseCommonExit();               /* should be called by every rank */
2675   CmiNodeBarrier();        /* single node SMP, make sure every rank is done */
2676   if (CmiMyRank()==0) CmiStdoutFlush();
2677   if (Cmi_charmrun_fd==-1) {
2678     if (CmiMyRank() == 0) exit(0); /*Standalone version-- just leave*/
2679     else while (1) CmiYield();
2680   }
2681   else {
2682         ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away, every PE needs to report */
2683 #if CMK_SHARED_VARS_UNAVAILABLE
2684         Cmi_check_delay = 1.0;          /* speed up checking of charmrun */
2685         while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2686 #elif CMK_MULTICORE
2687         if (!Cmi_commthread && CmiMyRank()==0) {
2688           Cmi_check_delay = 1.0;        /* speed up checking of charmrun */
2689           while (1) CommunicationServer(500, COMM_SERVER_FROM_WORKER);
2690         }
2691 #endif
2692   }
2693   MACHSTATE(2,"} ConverseExit");
2694
2695 /*Comm. thread will kill us.*/
2696   while (1) CmiYield();
2697 }
2698
2699 static void set_signals(void)
2700 {
2701   if(!Cmi_truecrash) {
2702     signal(SIGSEGV, KillOnAllSigs);
2703     signal(SIGFPE, KillOnAllSigs);
2704     signal(SIGILL, KillOnAllSigs);
2705     signal(SIGINT, KillOnAllSigs);
2706     signal(SIGTERM, KillOnAllSigs);
2707     signal(SIGABRT, KillOnAllSigs);
2708 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2709     signal(SIGQUIT, KillOnAllSigs);
2710     signal(SIGBUS, KillOnAllSigs);
2711 #     if CMK_HANDLE_SIGUSR
2712     signal(SIGUSR1, HandleUserSignals);
2713     signal(SIGUSR2, HandleUserSignals);
2714 #     endif
2715 #   endif /*UNIX*/
2716   }
2717 }
2718
2719 /*Socket idle function to use before addresses have been
2720   obtained.  During the real program, we idle with CmiYield.
2721 */
2722 static void obtain_idleFn(void) {sleep(0);}
2723
2724 static int net_default_skt_abort(int code,const char *msg)
2725 {
2726   fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
2727   machine_exit(1);
2728   return -1;
2729 }
2730
2731 #if MACHINE_DEBUG_LOG
2732 FILE *debugLog = NULL;
2733 #endif
2734
2735 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
2736 {
2737 #if MACHINE_DEBUG
2738   debugLog=NULL;
2739 #endif
2740 #if CMK_USE_HP_MAIN_FIX
2741 #if FOR_CPLUS
2742   _main(argc,argv);
2743 #endif
2744 #endif
2745   Cmi_startfn = fn; Cmi_usrsched = usc;
2746   Cmi_netpoll = 0;
2747 #if CMK_NETPOLL
2748   Cmi_netpoll = 1;
2749 #endif
2750 #if CMK_WHEN_PROCESSOR_IDLE_USLEEP
2751   Cmi_idlepoll = 0;
2752 #else
2753   Cmi_idlepoll = 1;
2754 #endif
2755   Cmi_truecrash = 0;
2756   if (CmiGetArgFlagDesc(argv,"+truecrash","Do not install signal handlers") ||
2757       CmiGetArgFlagDesc(argv,"++debug",NULL /*meaning: don't show this*/)) Cmi_truecrash = 1;
2758     /* netpoll disable signal */
2759   if (CmiGetArgFlagDesc(argv,"+netpoll","Do not use SIGIO--poll instead")) Cmi_netpoll = 1;
2760   if (CmiGetArgFlagDesc(argv,"+netint","Use SIGIO")) Cmi_netpoll = 0;
2761     /* idlepoll use poll instead if sleep when idle */
2762   if (CmiGetArgFlagDesc(argv,"+idlepoll","Do not sleep when idle")) Cmi_idlepoll = 1;
2763     /* idlesleep use sleep instead if busywait when idle */
2764   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
2765   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
2766
2767   Cmi_asyncio = 1;
2768 #if CMK_ASYNC_NOT_NEEDED
2769   Cmi_asyncio = 0;
2770 #endif
2771   if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
2772   if (CmiGetArgFlagDesc(argv,"+asynciooff","Don not use async IO")) Cmi_asyncio = 0;
2773 #if CMK_MULTICORE
2774   if (CmiGetArgFlagDesc(argv,"+commthread","Use communication thread")) {
2775     Cmi_commthread = 1;
2776 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
2777     _Cmi_noprocforcommthread = 1;   /* worker thread go sleep */
2778 #endif
2779     if (CmiMyPe() == 0) CmiPrintf("Charm++> communication thread is launched in multicore version. \n");
2780   }
2781 #endif
2782
2783   skt_init();
2784   /* use special abort handler instead of default_skt_abort to 
2785      prevent exit trapped by atexit_check() due to the exit() call  */
2786   skt_set_abort(net_default_skt_abort);
2787   atexit(machine_atexit_check);
2788   parse_netstart();
2789   parse_magic();
2790 #if ! CMK_SMP && ! defined(_WIN32)
2791   /* only get forks in non-smp mode */
2792   parse_forks();
2793 #endif
2794   extract_args(argv);
2795   log_init();
2796   Cmi_scanf_mutex = CmiCreateLock();
2797
2798 #if MACHINE_DEBUG_LOG
2799   {
2800     char ln[200];
2801     sprintf(ln,"debugLog.%d",_Cmi_mynode);
2802     debugLog=fopen(ln,"w");
2803   }
2804 #endif
2805
2806     /* NOTE: can not acutally call timer before timerInit ! GZ */
2807   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
2808
2809   skt_set_idle(obtain_idleFn);
2810   if (!skt_ip_match(Cmi_charmrun_IP,_skt_invalid_ip)) {
2811         set_signals();
2812 #if CMK_USE_TCP
2813         dataskt=skt_server(&dataport);
2814 #elif !CMK_USE_GM && !CMK_USE_MX
2815         dataskt=skt_datagram(&dataport, Cmi_os_buffer_size);
2816 #else
2817           /* GM and MX do not need to create any socket for communication */
2818         dataskt=-1;
2819 #endif
2820         MACHSTATE2(5,"skt_connect at dataskt:%d Cmi_charmrun_port:%d",dataskt, Cmi_charmrun_port);
2821         Cmi_charmrun_fd = skt_connect(Cmi_charmrun_IP, Cmi_charmrun_port, 1800);
2822         MACHSTATE2(5,"Opened connection to charmrun at socket %d, dataport=%d", Cmi_charmrun_fd, dataport);
2823         skt_tcp_no_nagle(Cmi_charmrun_fd);
2824         CmiStdoutInit();
2825   } else {/*Standalone operation*/
2826         printf("Charm++: standalone mode (not using charmrun)\n");
2827         dataskt=-1;
2828         Cmi_charmrun_fd=-1;
2829   }
2830
2831   CmiMachineInit(argv);
2832
2833   node_addresses_obtain(argv);
2834   MACHSTATE(5,"node_addresses_obtain done");
2835
2836   CmiCommunicationInit(argv);
2837
2838 #if CMK_USE_SYSVSHM
2839   CmiInitSysvshm(argv);
2840 #elif CMK_USE_PXSHM
2841   CmiInitPxshm(argv);
2842 #endif
2843
2844   skt_set_idle(CmiYield);
2845   Cmi_check_delay = 1.0+0.25*_Cmi_numnodes;
2846
2847   if (Cmi_charmrun_fd==-1) /*Don't bother with check in standalone mode*/
2848       Cmi_check_delay=1.0e30;
2849
2850   CsvInitialize(CmiNodeState, NodeState);
2851   CmiNodeStateInit(&CsvAccess(NodeState));
2852  
2853
2854   /* Network progress function is used to poll the network when for
2855      messages. This flushes receive buffers on some  implementations*/ 
2856   networkProgressPeriod = 0;  
2857   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2858
2859   Cmi_argvcopy = CmiCopyArgs(argv);
2860   Cmi_argv = argv; 
2861
2862   CmiStartThreads(argv);
2863
2864 #if CMK_USE_AMMASSO
2865   CmiAmmassoOpenQueuePairs();
2866 #endif
2867
2868   ConverseRunPE(everReturn);
2869 }
2870
2871 #if CMK_PERSISTENT_COMM
2872
2873 int persistentSendMsgHandlerIdx;
2874
2875 static void sendPerMsgHandler(char *msg)
2876 {
2877   int msgSize;
2878   void *destAddr, *destSizeAddr;
2879   int ep;
2880
2881   msgSize = CmiMsgHeaderGetLength(msg);
2882   msgSize -= (2*sizeof(void *)+sizeof(int));
2883   ep = *(int*)(msg+msgSize);
2884   destAddr = *(void **)(msg + msgSize + sizeof(int));
2885   destSizeAddr = *(void **)(msg + msgSize + sizeof(int) + sizeof(void*));
2886 /*CmiPrintf("msgSize:%d destAddr:%p, destSizeAddr:%p\n", msgSize, destAddr, destSizeAddr);*/
2887   CmiSetHandler(msg, ep);
2888   *((int *)destSizeAddr) = msgSize;
2889   memcpy(destAddr, msg, msgSize);
2890 }
2891
2892 void CmiSendPersistentMsg(PersistentHandle h, int destPE, int size, void *m)
2893 {
2894   CmiAssert(h!=NULL);
2895   PersistentSendsTable *slot = (PersistentSendsTable *)h;
2896   CmiAssert(slot->used == 1);
2897   CmiAssert(slot->destPE == destPE);
2898   if (size > slot->sizeMax) {
2899     CmiPrintf("size: %d sizeMax: %d\n", size, slot->sizeMax);
2900     CmiAbort("Abort: Invalid size\n");
2901   }
2902
2903 /*CmiPrintf("[%d] CmiSendPersistentMsg h=%p hdl=%d destpe=%d destAddress=%p size=%d\n", CmiMyPe(), *phs, CmiGetHandler(m), slot->destPE, slot->destAddress, size);*/
2904
2905   if (slot->destAddress[0]) {
2906     int oldep = CmiGetHandler(m);
2907     int newsize = size + sizeof(void *)*2 + sizeof(int);
2908     char *newmsg = (char*)CmiAlloc(newsize);
2909     memcpy(newmsg, m, size);
2910     memcpy(newmsg+size, &oldep, sizeof(int));
2911     memcpy(newmsg+size+sizeof(int), &slot->destAddress[0], sizeof(void *));
2912     memcpy(newmsg+size+sizeof(int)+sizeof(void*), &slot->destSizeAddress[0], sizeof(void *));
2913     CmiFree(m);
2914     CmiMsgHeaderSetLength(newmsg, newsize);
2915     CmiSetHandler(newmsg, persistentSendMsgHandlerIdx);
2916     phs = NULL; phsSize = 0;
2917     CmiSyncSendAndFree(slot->destPE, newsize, newmsg);
2918   }
2919   else {
2920 #if 1
2921     /* buffer until ready */
2922     if (slot->messageBuf != NULL) {
2923       CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
2924       CmiAbort("");
2925     }
2926     slot->messageBuf = m;
2927     slot->messageSize = size;
2928 #else
2929     /* normal send */
2930     PersistentHandle  *phs_tmp = phs;
2931     int phsSize_tmp = phsSize;
2932     phs = NULL; phsSize = 0;
2933     CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
2934     CmiSyncSendAndFree(slot->destPE, size, m);
2935     phs = phs_tmp; phsSize = phsSize_tmp;
2936 #endif
2937   }
2938 }
2939
2940 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
2941 {
2942   char *dupmsg = (char *) CmiAlloc(size);
2943   memcpy(dupmsg, msg, size);
2944
2945   /*  CmiPrintf("Setting root to %d\n", 0); */
2946   if (CmiMyPe()==destPE) {
2947     CQdCreate(CpvAccess(cQdState), 1);
2948     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
2949   }
2950   else
2951     CmiSendPersistentMsg(h, destPE, size, dupmsg);
2952 }
2953
2954 /* called in PumpMsgs */
2955 int PumpPersistent()
2956 {
2957   PersistentReceivesTable *slot = persistentReceivesTableHead;
2958   int status = 0;
2959   while (slot) {
2960     unsigned int size = *(slot->recvSizePtr[0]);
2961     if (size > 0)
2962     {
2963       char *msg = slot->messagePtr[0];
2964 /*CmiPrintf("[%d] size: %d rank:%d msg:%p %p\n", CmiMyPe(), size, CMI_DEST_RANK(msg), msg, slot->messagePtr);*/
2965
2966 #if 0
2967       void *dupmsg;
2968       dupmsg = CmiAlloc(size);
2969       
2970       _MEMCHECK(dupmsg);
2971       memcpy(dupmsg, msg, size);
2972       msg = dupmsg;
2973 #else
2974       /* return messagePtr directly and user MUST make sure not to delete it. */
2975       /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
2976
2977       CmiReference(msg);
2978 #endif
2979
2980       CmiPushPE(CMI_DEST_RANK(msg), msg);
2981 #if CMK_BROADCAST_SPANNING_TREE
2982       if (CMI_BROADCAST_ROOT(msg))
2983           SendSpanningChildren(size, msg);
2984 #endif
2985       *(slot->recvSizePtr[0]) = 0;
2986       status = 1;
2987     }
2988     slot = slot->next;
2989   }
2990   return status;
2991 }
2992
2993 void *PerAlloc(int size)
2994 {
2995   return CmiAlloc(size);
2996 }
2997                                                                                 
2998 void PerFree(char *msg)
2999 {
3000     CmiFree(msg);
3001 }
3002
3003 void persist_machine_init() 
3004 {
3005   persistentSendMsgHandlerIdx =
3006        CmiRegisterHandler((CmiHandler)sendPerMsgHandler);
3007 }
3008
3009 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
3010 {
3011   int i;
3012   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
3013     char *buf = PerAlloc(maxBytes+sizeof(int)*2);
3014     _MEMCHECK(buf);
3015     memset(buf, 0, maxBytes+sizeof(int)*2);
3016     slot->messagePtr[i] = buf;
3017     slot->recvSizePtr[i] = (unsigned int*)CmiAlloc(sizeof(unsigned int));
3018     *(slot->recvSizePtr[0]) = 0;
3019   }
3020   slot->sizeMax = maxBytes;
3021 }
3022
3023 #endif
3024
3025
3026 #if CMK_CELL
3027
3028 #include "spert_ppu.h"
3029
3030 void machine_OffloadAPIProgress() {
3031   CmiCommLock();
3032   OffloadAPIProgress();
3033   CmiCommUnlock();
3034 }
3035 #endif
3036
3037
3038
3039 /*@}*/