Changed the note for tracing comm op to differentiate the ops
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #undef CMI_MPI_TRACE_USEREVENTS
61 #define CMI_MPI_TRACE_USEREVENTS 1
62 #endif
63
64 #include "machine.h"
65
66 #include "pcqueue.h"
67
68 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
69
70 #if CMK_BLUEGENEL
71 #define MAX_QLEN 8
72 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
73 #else
74 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
75 #define MAX_QLEN 200
76 #endif
77
78 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
79 CpvStaticDeclare(double, projTraceStart);
80 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
81 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
82 #else
83 # define  START_EVENT()
84 # define  END_EVENT(x)
85 #endif
86
87 /*
88     To reduce the buffer used in broadcast and distribute the load from
89   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
90   spanning tree broadcast algorithm.
91     This will use the fourth short in message as an indicator of spanning tree
92   root.
93 */
94 #if CMK_SMP
95 #define CMK_BROADCAST_SPANNING_TREE    0
96 #else
97 #define CMK_BROADCAST_SPANNING_TREE    1
98 #define CMK_BROADCAST_HYPERCUBE        0
99 #endif
100
101 #define BROADCAST_SPANNING_FACTOR      4
102
103 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
104 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
105
106 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
107 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
108
109 /* FIXME: need a random number that everyone agrees ! */
110 #define CHARM_MAGIC_NUMBER               126
111
112 #if !CMK_OPTIMIZE
113 static int checksum_flag = 0;
114 #define CMI_SET_CHECKSUM(msg, len)      \
115         if (checksum_flag)  {   \
116           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
117           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
118         }
119 #define CMI_CHECK_CHECKSUM(msg, len)    \
120         if (checksum_flag)      \
121           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
122             CmiAbort("Fatal error: checksum doesn't agree!\n");
123 #else
124 #define CMI_SET_CHECKSUM(msg, len)
125 #define CMI_CHECK_CHECKSUM(msg, len)
126 #endif
127
128 #if CMK_BROADCAST_SPANNING_TREE
129 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
130 #else
131 #  define CMI_SET_BROADCAST_ROOT(msg, root)
132 #endif
133
134 #if CMK_BROADCAST_HYPERCUBE
135 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
136 #else
137 #  define CMI_SET_CYCLE(msg, cycle)
138 #endif
139
140
141 /** 
142     If MPI_POST_RECV is defined, we provide default values for size 
143     and number of posted recieves. If MPI_POST_RECV_COUNT is set
144     then a default value for MPI_POST_RECV_SIZE is used if not specified
145     by the user.
146 */
147 #ifdef MPI_POST_RECV
148 #define MPI_POST_RECV_COUNT 10
149 #undef MPI_POST_RECV
150 #endif
151 #if MPI_POST_RECV_COUNT > 0
152 #warning "Using MPI posted receives which have not yet been tested"
153 #ifndef MPI_POST_RECV_SIZE
154 #define MPI_POST_RECV_SIZE 200
155 #endif
156 /* #undef  MPI_POST_RECV_DEBUG  */
157 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
158 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
159 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
160 CpvDeclare(char*,CmiPostedRecvBuffers);
161 #endif
162
163 /*
164  to avoid MPI's in order delivery, changing MPI Tag all the time
165 */
166 #define TAG     1375
167
168 #if MPI_POST_RECV_COUNT > 0
169 #define POST_RECV_TAG TAG+1
170 #define BARRIER_ZERO_TAG TAG
171 #else
172 #define BARRIER_ZERO_TAG     1375
173 #endif
174
175 #include <signal.h>
176 void (*signal_int)(int);
177
178 /*
179 static int mpi_tag = TAG;
180 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
181 */
182
183 static int        _thread_provided = -1;
184 int               _Cmi_numpes;
185 int               _Cmi_mynode;    /* Which address space am I */
186 int               _Cmi_mynodesize;/* Number of processors in my address space */
187 int               _Cmi_numnodes;  /* Total number of address spaces */
188 int               _Cmi_numpes;    /* Total number of processors */
189 static int        Cmi_nodestart; /* First processor in this address space */
190 CpvDeclare(void*, CmiLocalQueue);
191
192 /*Network progress utility variables. Period controls the rate at
193   which the network poll is called */
194 CpvDeclare(unsigned , networkProgressCount);
195 int networkProgressPeriod;
196
197 int               idleblock = 0;
198
199 #define BLK_LEN  512
200
201 #if CMK_NODE_QUEUE_AVAILABLE
202 #define DGRAM_NODEMESSAGE   (0xFB)
203
204 #define NODE_BROADCAST_OTHERS (-1)
205 #define NODE_BROADCAST_ALL    (-2)
206 #endif
207
208 #if 0
209 static void **recdQueue_blk;
210 static unsigned int recdQueue_blk_len;
211 static unsigned int recdQueue_first;
212 static unsigned int recdQueue_len;
213 static void recdQueueInit(void);
214 static void recdQueueAddToBack(void *element);
215 static void *recdQueueRemoveFromFront(void);
216 #endif
217
218 static void ConverseRunPE(int everReturn);
219 static void CommunicationServer(int sleepTime);
220 static void CommunicationServerThread(int sleepTime);
221
222 typedef struct msg_list {
223      char *msg;
224      struct msg_list *next;
225      int size, destpe;
226
227 #if CMK_SMP_TRACE_COMMTHREAD
228         int srcpe;
229 #endif  
230         
231      MPI_Request req;
232 } SMSG_LIST;
233
234 int MsgQueueLen=0;
235 static int request_max;
236
237 static SMSG_LIST *sent_msgs=0;
238 static SMSG_LIST *end_sent=0;
239
240 static int Cmi_dim;
241
242 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
243
244 #if NODE_0_IS_CONVHOST
245 int inside_comm = 0;
246 #endif
247
248 void CmiAbort(const char *message);
249 static void PerrorExit(const char *msg);
250
251 void SendSpanningChildren(int size, char *msg);
252 void SendHypercube(int size, char *msg);
253
254 static void PerrorExit(const char *msg)
255 {
256   perror(msg);
257   exit(1);
258 }
259
260 extern unsigned char computeCheckSum(unsigned char *data, int len);
261
262 /**************************  TIMER FUNCTIONS **************************/
263
264 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
265
266 /* MPI calls are not threadsafe, even the timer on some machines */
267 static CmiNodeLock  timerLock = 0;
268 static double starttimer = 0;
269 static int _is_global = 0;
270
271 int CmiTimerIsSynchronized()
272 {
273   int  flag;
274   void *v;
275
276   /*  check if it using synchronized timer */
277   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
278     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
279   if (flag) {
280     _is_global = *(int*)v;
281     if (_is_global && CmiMyPe() == 0)
282       printf("Charm++> MPI timer is synchronized!\n");
283   }
284   return _is_global;
285 }
286
287 void CmiTimerInit()
288 {
289   _is_global = CmiTimerIsSynchronized();
290
291   if (_is_global) {
292     if (CmiMyRank() == 0) {
293       double minTimer;
294 #if CMK_TIMER_USE_XT3_DCLOCK
295       starttimer = dclock();
296 #else
297       starttimer = MPI_Wtime();
298 #endif
299
300       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
301                                   MPI_COMM_WORLD );
302       starttimer = minTimer;
303     }
304   }
305   else {  /* we don't have a synchronous timer, set our own start time */
306     CmiBarrier();
307     CmiBarrier();
308     CmiBarrier();
309 #if CMK_TIMER_USE_XT3_DCLOCK
310     starttimer = dclock();
311 #else
312     starttimer = MPI_Wtime();
313 #endif
314   }
315
316 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
317   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
318     timerLock = CmiCreateLock();
319 #endif
320   CmiNodeAllBarrier();          /* for smp */
321 }
322
323 /**
324  * Since the timerLock is never created, and is
325  * always NULL, then all the if-condition inside
326  * the timer functions could be disabled right
327  * now in the case of SMP. --Chao Mei
328  */
329 double CmiTimer(void)
330 {
331   double t;
332 #if 0 && CMK_SMP
333   if (timerLock) CmiLock(timerLock);
334 #endif
335 #if CMK_TIMER_USE_XT3_DCLOCK
336   t = dclock() - starttimer;
337 #else
338   t = MPI_Wtime() - starttimer;
339 #endif
340
341 #if 0 && CMK_SMP
342   if (timerLock) CmiUnlock(timerLock);
343 #endif
344
345   return t;
346 }
347
348 double CmiWallTimer(void)
349 {
350   double t;
351 #if 0 && CMK_SMP
352   if (timerLock) CmiLock(timerLock);
353 #endif
354 #if CMK_TIMER_USE_XT3_DCLOCK
355   t = dclock() - starttimer;
356 #else
357   t = MPI_Wtime() - starttimer;
358 #endif
359 #if 0 && CMK_SMP
360   if (timerLock) CmiUnlock(timerLock);
361 #endif
362   return t;
363 }
364
365 double CmiCpuTimer(void)
366 {
367   double t;
368 #if 0 && CMK_SMP
369   if (timerLock) CmiLock(timerLock);
370 #endif
371 #if CMK_TIMER_USE_XT3_DCLOCK
372   t = dclock() - starttimer;
373 #else
374   t = MPI_Wtime() - starttimer;
375 #endif
376 #if 0 && CMK_SMP
377   if (timerLock) CmiUnlock(timerLock);
378 #endif
379   return t;
380 }
381
382 #endif
383
384 /* must be called on all ranks including comm thread in SMP */
385 int CmiBarrier()
386 {
387 #if CMK_SMP
388     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
389   CmiNodeAllBarrier();
390   if (CmiMyRank() == CmiMyNodeSize()) 
391 #else
392   if (CmiMyRank() == 0) 
393 #endif
394   {
395 /**
396  *  The call of CmiBarrier is usually before the initialization
397  *  of trace module of Charm++, therefore, the START_EVENT
398  *  and END_EVENT are disabled here. -Chao Mei
399  */     
400     /*START_EVENT();*/
401
402     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
403         CmiAbort("Timernit: MPI_Barrier failed!\n");
404
405     /*END_EVENT(10);*/
406   }
407   CmiNodeAllBarrier();
408   return 0;
409 }
410
411 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
412 int CmiBarrierZero()
413 {
414   int i;
415 #if CMK_SMP
416   if (CmiMyRank() == CmiMyNodeSize()) 
417 #else
418   if (CmiMyRank() == 0) 
419 #endif
420   {
421     char msg[1];
422     MPI_Status sts;
423     if (CmiMyNode() == 0)  {
424       for (i=0; i<CmiNumNodes()-1; i++) {
425          START_EVENT();
426
427          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
428             CmiPrintf("MPI_Recv failed!\n");
429
430          END_EVENT(30);
431       }
432     }
433     else {
434       START_EVENT();
435
436       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
437          printf("MPI_Send failed!\n");
438
439       END_EVENT(20);
440     }
441   }
442   CmiNodeAllBarrier();
443   return 0;
444 }
445
446 typedef struct ProcState {
447 #if MULTI_SENDQUEUE
448 PCQueue      sendMsgBuf;       /* per processor message sending queue */
449 #endif
450 CmiNodeLock  recvLock;              /* for cs->recv */
451 } ProcState;
452
453 static ProcState  *procState;
454
455 #if CMK_SMP
456
457 #if !MULTI_SENDQUEUE
458 static PCQueue sendMsgBuf;
459 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
460 #endif
461
462 #endif
463
464 /************************************************************
465  *
466  * Processor state structure
467  *
468  ************************************************************/
469
470 /* fake Cmi_charmrun_fd */
471 static int Cmi_charmrun_fd = 0;
472 #include "machine-smp.c"
473
474 CsvDeclare(CmiNodeState, NodeState);
475
476 #include "immediate.c"
477
478 #if CMK_SHARED_VARS_UNAVAILABLE
479 /************ non SMP **************/
480 static struct CmiStateStruct Cmi_state;
481 int _Cmi_mype;
482 int _Cmi_myrank;
483
484 void CmiMemLock() {}
485 void CmiMemUnlock() {}
486
487 #define CmiGetState() (&Cmi_state)
488 #define CmiGetStateN(n) (&Cmi_state)
489
490 void CmiYield(void) { sleep(0); }
491
492 static void CmiStartThreads(char **argv)
493 {
494   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
495   _Cmi_mype = Cmi_nodestart;
496   _Cmi_myrank = 0;
497 }
498 #endif  /* non smp */
499
500 /*Add a message to this processor's receive queue, pe is a rank */
501 void CmiPushPE(int pe,void *msg)
502 {
503   CmiState cs = CmiGetStateN(pe);
504   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
505 #if CMK_IMMEDIATE_MSG
506   if (CmiIsImmediate(msg)) {
507 /*
508 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
509     CmiHandleMessage(msg);
510 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
511 */
512     /**(CmiUInt2 *)msg = pe;*/
513     CMI_DEST_RANK(msg) = pe;
514     CmiPushImmediateMsg(msg);
515     return;
516   }
517 #endif
518
519 #if CMK_SMP
520   CmiLock(procState[pe].recvLock);
521 #endif
522   PCQueuePush(cs->recv,msg);
523 #if CMK_SMP
524   CmiUnlock(procState[pe].recvLock);
525 #endif
526   CmiIdleLock_addMessage(&cs->idle);
527   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
528 }
529
530 #if CMK_NODE_QUEUE_AVAILABLE
531 /*Add a message to this processor's receive queue */
532 static void CmiPushNode(void *msg)
533 {
534   MACHSTATE(3,"Pushing message into NodeRecv queue");
535 #if CMK_IMMEDIATE_MSG
536   if (CmiIsImmediate(msg)) {
537     CMI_DEST_RANK(msg) = 0;
538     CmiPushImmediateMsg(msg);
539     return;
540   }
541 #endif
542   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
543   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
544   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
545   {
546   CmiState cs=CmiGetStateN(0);
547   CmiIdleLock_addMessage(&cs->idle);
548   }
549 }
550 #endif
551
552 #ifndef CmiMyPe
553 int CmiMyPe(void)
554 {
555   return CmiGetState()->pe;
556 }
557 #endif
558
559 #ifndef CmiMyRank
560 int CmiMyRank(void)
561 {
562   return CmiGetState()->rank;
563 }
564 #endif
565
566 #ifndef CmiNodeFirst
567 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
568 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
569 #endif
570
571 #ifndef CmiNodeOf
572 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
573 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
574 #endif
575
576 static size_t CmiAllAsyncMsgsSent(void)
577 {
578    SMSG_LIST *msg_tmp = sent_msgs;
579    MPI_Status sts;
580    int done;
581
582    while(msg_tmp!=0) {
583     done = 0;
584     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
585       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
586     if(!done)
587       return 0;
588     msg_tmp = msg_tmp->next;
589 /*    MsgQueueLen--; ????? */
590    }
591    return 1;
592 }
593
594 int CmiAsyncMsgSent(CmiCommHandle c) {
595
596   SMSG_LIST *msg_tmp = sent_msgs;
597   int done;
598   MPI_Status sts;
599
600   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
601     msg_tmp = msg_tmp->next;
602   if(msg_tmp) {
603     done = 0;
604     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
605       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
606     return ((done)?1:0);
607   } else {
608     return 1;
609   }
610 }
611
612 void CmiReleaseCommHandle(CmiCommHandle c)
613 {
614   return;
615 }
616
617 #if CMK_BLUEGENEL
618 extern void MPID_Progress_test();
619 #endif
620
621 void CmiReleaseSentMessages(void)
622 {
623   SMSG_LIST *msg_tmp=sent_msgs;
624   SMSG_LIST *prev=0;
625   SMSG_LIST *temp;
626   int done;
627   MPI_Status sts;
628
629 #if CMK_BLUEGENEL
630   MPID_Progress_test();
631 #endif
632
633   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
634   while(msg_tmp!=0) {
635     done =0;
636 #if CMK_SMP_TRACE_COMMTHREAD
637     double startT = CmiWallTimer();
638 #endif
639     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
640       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
641     if(done) {
642       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
643       MsgQueueLen--;
644       /* Release the message */
645       temp = msg_tmp->next;
646       if(prev==0)  /* first message */
647         sent_msgs = temp;
648       else
649         prev->next = temp;
650       CmiFree(msg_tmp->msg);
651       CmiFree(msg_tmp);
652       msg_tmp = temp;
653     } else {
654       prev = msg_tmp;
655       msg_tmp = msg_tmp->next;
656     }
657 #if CMK_SMP_TRACE_COMMTHREAD
658     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
659 #endif
660   }
661   end_sent = prev;
662   MACHSTATE(2,"} CmiReleaseSentMessages end");
663 }
664
665 int PumpMsgs(void)
666 {
667   int nbytes, flg, res;
668   char *msg;
669   MPI_Status sts;
670   int recd=0;
671
672 #if CMI_EXERT_RECV_CAP
673   int recvCnt=0;
674 #endif
675         
676 #if CMK_BLUEGENEL
677   MPID_Progress_test();
678 #endif
679
680   MACHSTATE(2,"PumpMsgs begin {");
681
682         
683   while(1) {
684 #if CMI_EXERT_RECV_CAP
685         if(recvCnt==RECV_CAP) break;
686 #endif
687           
688     /* First check posted recvs then do  probe unmatched outstanding messages */
689 #if MPI_POST_RECV_COUNT > 0 
690     int completed_index=-1;
691     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
692         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
693     if(flg){
694         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
695             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
696
697         recd = 1;
698         msg = (char *) CmiAlloc(nbytes);
699         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
700         /* and repost the recv */
701
702         START_EVENT();
703
704         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
705             MPI_POST_RECV_SIZE,
706             MPI_BYTE,
707             MPI_ANY_SOURCE,
708             POST_RECV_TAG,
709             MPI_COMM_WORLD,
710             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
711                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
712
713         END_EVENT(50);
714
715         CpvAccess(Cmi_posted_recv_total)++;
716     }
717     else {
718         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
719         if(res != MPI_SUCCESS)
720         CmiAbort("MPI_Iprobe failed\n");
721         if(!flg) break;
722         recd = 1;
723         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
724         msg = (char *) CmiAlloc(nbytes);
725
726         START_EVENT();
727
728         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
729             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
730
731         END_EVENT(30);
732
733         CpvAccess(Cmi_unposted_recv_total)++;
734     }
735 #else
736     /* Original version */
737 #if CMK_SMP_TRACE_COMMTHREAD
738   double startT = CmiWallTimer(); 
739 #endif
740     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
741     if(res != MPI_SUCCESS)
742       CmiAbort("MPI_Iprobe failed\n");
743
744     if(!flg) break;
745 #if CMK_SMP_TRACE_COMMTHREAD
746     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
747 #endif
748
749     recd = 1;
750     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
751     msg = (char *) CmiAlloc(nbytes);
752     
753     START_EVENT();
754
755     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
756       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
757
758     /*END_EVENT(30);*/
759
760 #endif
761
762 #if CMK_SMP_TRACE_COMMTHREAD
763         traceBeginCommOp(msg);
764         traceChangeLastTimestamp(CpvAccess(projTraceStart));
765         traceEndCommOp(msg);
766         char tmp[32];
767         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
768         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
769 #endif  
770         
771     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
772     CMI_CHECK_CHECKSUM(msg, nbytes);
773     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
774       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
775       CmiFree(msg);
776       CmiAbort("Abort!\n");
777       continue;
778     }
779 #if CMK_NODE_QUEUE_AVAILABLE
780     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
781       CmiPushNode(msg);
782     else
783 #endif
784       CmiPushPE(CMI_DEST_RANK(msg), msg);
785
786 #if CMK_BROADCAST_SPANNING_TREE
787     if (CMI_BROADCAST_ROOT(msg))
788       SendSpanningChildren(nbytes, msg);
789 #elif CMK_BROADCAST_HYPERCUBE
790     if (CMI_GET_CYCLE(msg))
791       SendHypercube(nbytes, msg);
792 #endif
793         
794 #if CMI_EXERT_RECV_CAP
795         recvCnt++;
796 #endif  
797   }
798
799   
800 #if CMK_IMMEDIATE_MSG && !CMK_SMP
801   CmiHandleImmediate();
802 #endif
803   
804   MACHSTATE(2,"} PumpMsgs end ");
805   return recd;
806 }
807
808 /* blocking version */
809 static void PumpMsgsBlocking(void)
810 {
811   static int maxbytes = 20000000;
812   static char *buf = NULL;
813   int nbytes, flg;
814   MPI_Status sts;
815   char *msg;
816   int recd=0;
817
818   if (!PCQueueEmpty(CmiGetState()->recv)) return;
819   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
820   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
821   if (sent_msgs)  return;
822
823 #if 0
824   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
825 #endif
826
827   if (buf == NULL) {
828     buf = (char *) CmiAlloc(maxbytes);
829     _MEMCHECK(buf);
830   }
831
832
833 #if MPI_POST_RECV_COUNT > 0
834 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
835 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
836 #endif
837
838   START_EVENT();
839
840   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
841       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
842
843   /*END_EVENT(30);*/
844     
845    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
846    msg = (char *) CmiAlloc(nbytes);
847    memcpy(msg, buf, nbytes);
848
849 #if CMK_SMP_TRACE_COMMTHREAD
850         traceBeginCommOp(msg);
851         traceChangeLastTimestamp(CpvAccess(projTraceStart));
852         traceEndCommOp(msg);
853         char tmp[32];
854         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
855         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
856 #endif
857   
858 #if CMK_NODE_QUEUE_AVAILABLE
859    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
860       CmiPushNode(msg);
861    else
862 #endif
863       CmiPushPE(CMI_DEST_RANK(msg), msg);
864
865 #if CMK_BROADCAST_SPANNING_TREE
866    if (CMI_BROADCAST_ROOT(msg))
867       SendSpanningChildren(nbytes, msg);
868 #elif CMK_BROADCAST_HYPERCUBE
869    if (CMI_GET_CYCLE(msg))
870       SendHypercube(nbytes, msg);
871 #endif
872 }
873
874 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
875
876 #if CMK_SMP
877
878 static int inexit = 0;
879 static CmiNodeLock  exitLock = 0;
880
881 static int MsgQueueEmpty()
882 {
883   int i;
884 #if MULTI_SENDQUEUE
885   for (i=0; i<_Cmi_mynodesize; i++)
886     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
887 #else
888   return PCQueueEmpty(sendMsgBuf);
889 #endif
890   return 1;
891 }
892
893 static int SendMsgBuf();
894
895 /* test if all processors recv queues are empty */
896 static int RecvQueueEmpty()
897 {
898   int i;
899   for (i=0; i<_Cmi_mynodesize; i++) {
900     CmiState cs=CmiGetStateN(i);
901     if (!PCQueueEmpty(cs->recv)) return 0;
902   }
903   return 1;
904 }
905
906 /**
907 CommunicationServer calls MPI to send messages in the queues and probe message from network.
908 */
909
910 #define REPORT_COMM_METRICS 0
911 #if REPORT_COMM_METRICS
912 static double pumptime = 0.0;
913 static double releasetime = 0.0;
914 static double sendtime = 0.0;
915 #endif
916
917 static void CommunicationServer(int sleepTime)
918 {
919   int static count=0;
920 /*
921   count ++;
922   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
923 */
924 #if REPORT_COMM_METRICS
925   double t1, t2, t3, t4;
926   t1 = CmiWallTimer();
927 #endif
928   PumpMsgs();
929 #if REPORT_COMM_METRICS
930   t2 = CmiWallTimer();
931 #endif
932   CmiReleaseSentMessages();
933 #if REPORT_COMM_METRICS
934   t3 = CmiWallTimer();
935 #endif
936   SendMsgBuf();
937 #if REPORT_COMM_METRICS
938   t4 = CmiWallTimer();
939   pumptime += (t2-t1);
940   releasetime += (t3-t2);
941   sendtime += (t4-t3);
942 #endif
943 /*
944   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
945 */
946   if (inexit == CmiMyNodeSize()) {
947     MACHSTATE(2, "CommunicationServer exiting {");
948 #if 0
949     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
950 #endif
951     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
952       CmiReleaseSentMessages();
953       SendMsgBuf();
954       PumpMsgs();
955     }
956     MACHSTATE(2, "CommunicationServer barrier begin {");
957
958     START_EVENT();
959
960     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
961       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
962
963     END_EVENT(10);
964
965     MACHSTATE(2, "} CommunicationServer barrier end");
966 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
967     if (CmiMyNode() == 0){
968       CmiPrintf("End of program\n");
969     }
970 #endif
971     MACHSTATE(2, "} CommunicationServer EXIT");
972
973     ConverseCommonExit();   
974 #if REPORT_COMM_METRICS
975     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
976 #endif
977
978 #if ! CMK_AUTOBUILD
979     signal(SIGINT, signal_int);
980     MPI_Finalize();
981     #endif
982     exit(0);
983   }
984 }
985
986 #endif
987
988 static void CommunicationServerThread(int sleepTime)
989 {
990 #if CMK_SMP
991   CommunicationServer(sleepTime);
992 #endif
993 #if CMK_IMMEDIATE_MSG
994   CmiHandleImmediate();
995 #endif
996 }
997
998 #if CMK_NODE_QUEUE_AVAILABLE
999 char *CmiGetNonLocalNodeQ(void)
1000 {
1001   CmiState cs = CmiGetState();
1002   char *result = 0;
1003   CmiIdleLock_checkMessage(&cs->idle);
1004 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1005     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1006     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1007     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1008     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1009     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1010 /*  }  */
1011   return result;
1012 }
1013 #endif
1014
1015 void *CmiGetNonLocal(void)
1016 {
1017   static int count=0;
1018   CmiState cs = CmiGetState();
1019   void *msg;
1020
1021 #if ! CMK_SMP
1022   if (CmiNumPes() == 1) return NULL;
1023 #endif
1024
1025   CmiIdleLock_checkMessage(&cs->idle);
1026   /* although it seems that lock is not needed, I found it crashes very often
1027      on mpi-smp without lock */
1028
1029 #if ! CMK_SMP
1030   CmiReleaseSentMessages();
1031   PumpMsgs();
1032 #endif
1033
1034   /* CmiLock(procState[cs->rank].recvLock); */
1035   msg =  PCQueuePop(cs->recv);
1036   /* CmiUnlock(procState[cs->rank].recvLock); */
1037
1038 /*
1039   if (msg) {
1040     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1041   else {
1042     count++;
1043     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1044   }
1045 */
1046 #if ! CMK_SMP
1047   if (no_outstanding_sends) {
1048     while (MsgQueueLen>0) {
1049       CmiReleaseSentMessages();
1050       PumpMsgs();
1051     }
1052   }
1053
1054   if(!msg) {
1055     CmiReleaseSentMessages();
1056     if (PumpMsgs())
1057       return  PCQueuePop(cs->recv);
1058     else
1059       return 0;
1060   }
1061 #endif
1062   return msg;
1063 }
1064
1065 /* called in non-smp mode */
1066 void CmiNotifyIdle(void)
1067 {
1068   CmiReleaseSentMessages();
1069   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1070 }
1071
1072
1073 /********************************************************
1074     The call to probe immediate messages has been renamed to
1075     CmiMachineProgressImpl
1076 ******************************************************/
1077 /* user call to handle immediate message, only useful in non SMP version
1078    using polling method to schedule message.
1079 */
1080 /*
1081 #if CMK_IMMEDIATE_MSG
1082 void CmiProbeImmediateMsg()
1083 {
1084 #if !CMK_SMP
1085   PumpMsgs();
1086   CmiHandleImmediate();
1087 #endif
1088 }
1089 #endif
1090 */
1091
1092 /* Network progress function is used to poll the network when for
1093    messages. This flushes receive buffers on some  implementations*/
1094 #if CMK_MACHINE_PROGRESS_DEFINED
1095 void CmiMachineProgressImpl()
1096 {
1097 #if !CMK_SMP
1098     PumpMsgs();
1099 #if CMK_IMMEDIATE_MSG
1100     CmiHandleImmediate();
1101 #endif
1102 #else
1103     /*Not implemented yet. Communication server does not seem to be
1104       thread safe, so only communication thread call it */
1105     if (CmiMyRank() == CmiMyNodeSize())
1106         CommunicationServerThread(0);
1107 #endif
1108 }
1109 #endif
1110
1111 /********************* MESSAGE SEND FUNCTIONS ******************/
1112
1113 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1114
1115 static void CmiSendSelf(char *msg)
1116 {
1117 #if CMK_IMMEDIATE_MSG
1118     if (CmiIsImmediate(msg)) {
1119       /* CmiBecomeNonImmediate(msg); */
1120       CmiPushImmediateMsg(msg);
1121       CmiHandleImmediate();
1122       return;
1123     }
1124 #endif
1125     CQdCreate(CpvAccess(cQdState), 1);
1126     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1127 }
1128
1129 void CmiSyncSendFn(int destPE, int size, char *msg)
1130 {
1131   CmiState cs = CmiGetState();
1132   char *dupmsg = (char *) CmiAlloc(size);
1133   memcpy(dupmsg, msg, size);
1134
1135   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1136
1137   if (cs->pe==destPE) {
1138     CmiSendSelf(dupmsg);
1139   }
1140   else
1141     CmiAsyncSendFn_(destPE, size, dupmsg);
1142 }
1143
1144 #if CMK_SMP
1145
1146 /* called by communication thread in SMP */
1147 static int SendMsgBuf()
1148 {
1149   SMSG_LIST *msg_tmp;
1150   char *msg;
1151   int node, rank, size;
1152   int i;
1153   int sent = 0;
1154
1155 #if CMI_EXERT_SEND_CAP
1156         int sentCnt = 0;
1157 #endif  
1158         
1159   MACHSTATE(2,"SendMsgBuf begin {");
1160 #if MULTI_SENDQUEUE
1161   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1162   {
1163     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1164     {
1165       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1166 #else
1167     /* single message sending queue */
1168     /* CmiLock(sendMsgBufLock); */
1169     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1170     /* CmiUnlock(sendMsgBufLock); */
1171     while (NULL != msg_tmp)
1172     {
1173 #endif
1174       node = msg_tmp->destpe;
1175       size = msg_tmp->size;
1176       msg = msg_tmp->msg;
1177       msg_tmp->next = 0;
1178       while (MsgQueueLen > request_max) {
1179         CmiReleaseSentMessages();
1180         PumpMsgs();
1181       }
1182       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1183       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1184       CMI_SET_CHECKSUM(msg, size);
1185
1186 #if MPI_POST_RECV_COUNT > 0
1187         if(size <= MPI_POST_RECV_SIZE){
1188
1189           START_EVENT();
1190           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1191                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1192
1193           STOP_EVENT(40);
1194         }
1195         else {
1196             START_EVENT();
1197             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1198                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1199             STOP_EVENT(40);
1200         }
1201 #else
1202         START_EVENT();
1203         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1204             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1205         /*END_EVENT(40);*/
1206 #endif
1207         
1208 #if CMK_SMP_TRACE_COMMTHREAD
1209                 traceBeginCommOp(msg);
1210                 traceChangeLastTimestamp(CpvAccess(projTraceStart));
1211                 traceEndCommOp(msg);
1212                 char tmp[64];
1213                 sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1214                 traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1215 #endif
1216                 
1217                 
1218       MACHSTATE(3,"}MPI_send end");
1219       MsgQueueLen++;
1220       if(sent_msgs==0)
1221         sent_msgs = msg_tmp;
1222       else
1223         end_sent->next = msg_tmp;
1224       end_sent = msg_tmp;
1225       sent=1;
1226           
1227 #if CMI_EXERT_SEND_CAP    
1228           if(++sentCnt == SEND_CAP) break;
1229 #endif    
1230           
1231 #if ! MULTI_SENDQUEUE
1232       /* CmiLock(sendMsgBufLock); */
1233       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1234       /* CmiUnlock(sendMsgBufLock); */
1235 #endif
1236     }
1237 #if MULTI_SENDQUEUE
1238   }
1239 #endif
1240   MACHSTATE(2,"}SendMsgBuf end ");
1241   return sent;
1242 }
1243
1244 void EnqueueMsg(void *m, int size, int node)
1245 {
1246   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1247   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1248   msg_tmp->msg = m;
1249   msg_tmp->size = size;
1250   msg_tmp->destpe = node;
1251         
1252 #if CMK_SMP_TRACE_COMMTHREAD
1253         msg_tmp->srcpe = CmiMyPe();
1254 #endif  
1255
1256 #if MULTI_SENDQUEUE
1257   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1258 #else
1259   CmiLock(sendMsgBufLock);
1260   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1261   CmiUnlock(sendMsgBufLock);
1262 #endif
1263         
1264   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1265 }
1266
1267 #endif
1268
1269 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1270 {
1271   CmiState cs = CmiGetState();
1272   SMSG_LIST *msg_tmp;
1273   CmiUInt2  rank, node;
1274
1275   if(destPE == cs->pe) {
1276     char *dupmsg = (char *) CmiAlloc(size);
1277     memcpy(dupmsg, msg, size);
1278     CmiSendSelf(dupmsg);
1279     return 0;
1280   }
1281   CQdCreate(CpvAccess(cQdState), 1);
1282 #if CMK_SMP
1283   node = CmiNodeOf(destPE);
1284   rank = CmiRankOf(destPE);
1285   if (node == CmiMyNode())  {
1286     CmiPushPE(rank, msg);
1287     return 0;
1288   }
1289   CMI_DEST_RANK(msg) = rank;
1290   EnqueueMsg(msg, size, node);
1291   return 0;
1292 #else
1293   /* non smp */
1294   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1295   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1296   msg_tmp->msg = msg;
1297   msg_tmp->next = 0;
1298   while (MsgQueueLen > request_max) {
1299         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1300         CmiReleaseSentMessages();
1301         PumpMsgs();
1302   }
1303   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1304   CMI_SET_CHECKSUM(msg, size);
1305
1306 #if MPI_POST_RECV_COUNT > 0
1307         if(size <= MPI_POST_RECV_SIZE){
1308
1309           START_EVENT();
1310           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1311                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1312           END_EVENT(40);
1313         }
1314         else {
1315           START_EVENT();
1316           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1317                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1318           END_EVENT(40);
1319         }
1320 #else
1321   START_EVENT();
1322   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1323     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1324   END_EVENT(40);
1325 #endif
1326
1327   MsgQueueLen++;
1328   if(sent_msgs==0)
1329     sent_msgs = msg_tmp;
1330   else
1331     end_sent->next = msg_tmp;
1332   end_sent = msg_tmp;
1333   return (CmiCommHandle) &(msg_tmp->req);
1334 #endif              /* non-smp */
1335 }
1336
1337 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1338 {
1339   CMI_SET_BROADCAST_ROOT(msg, 0);
1340   CmiAsyncSendFn_(destPE, size, msg);
1341 }
1342
1343 void CmiFreeSendFn(int destPE, int size, char *msg)
1344 {
1345   CmiState cs = CmiGetState();
1346   CMI_SET_BROADCAST_ROOT(msg, 0);
1347
1348   if (cs->pe==destPE) {
1349     CmiSendSelf(msg);
1350   } else {
1351     CmiAsyncSendFn_(destPE, size, msg);
1352   }
1353 }
1354
1355 /*********************** BROADCAST FUNCTIONS **********************/
1356
1357 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1358 void CmiSyncSendFn1(int destPE, int size, char *msg)
1359 {
1360   CmiState cs = CmiGetState();
1361   char *dupmsg = (char *) CmiAlloc(size);
1362   memcpy(dupmsg, msg, size);
1363   if (cs->pe==destPE)
1364     CmiSendSelf(dupmsg);
1365   else
1366     CmiAsyncSendFn_(destPE, size, dupmsg);
1367 }
1368
1369 /* send msg to its spanning children in broadcast. G. Zheng */
1370 void SendSpanningChildren(int size, char *msg)
1371 {
1372   CmiState cs = CmiGetState();
1373   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1374   int i;
1375
1376   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1377
1378   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1379     int p = cs->pe-startpe;
1380     if (p<0) p+=_Cmi_numpes;
1381     p = BROADCAST_SPANNING_FACTOR*p + i;
1382     if (p > _Cmi_numpes - 1) break;
1383     p += startpe;
1384     p = p%_Cmi_numpes;
1385     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1386     CmiSyncSendFn1(p, size, msg);
1387   }
1388 }
1389
1390 #include <math.h>
1391
1392 /* send msg along the hypercube in broadcast. (Sameer) */
1393 void SendHypercube(int size, char *msg)
1394 {
1395   CmiState cs = CmiGetState();
1396   int curcycle = CMI_GET_CYCLE(msg);
1397   int i;
1398
1399   double logp = CmiNumPes();
1400   logp = log(logp)/log(2.0);
1401   logp = ceil(logp);
1402
1403   /*  CmiPrintf("In hypercube\n"); */
1404
1405   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1406
1407   for (i = curcycle; i < logp; i++) {
1408     int p = cs->pe ^ (1 << i);
1409
1410     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1411
1412     if(p < CmiNumPes()) {
1413       CMI_SET_CYCLE(msg, i + 1);
1414       CmiSyncSendFn1(p, size, msg);
1415     }
1416   }
1417 }
1418
1419 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1420 {
1421   CmiState cs = CmiGetState();
1422 #if CMK_BROADCAST_SPANNING_TREE
1423   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1424   SendSpanningChildren(size, msg);
1425
1426 #elif CMK_BROADCAST_HYPERCUBE
1427   CMI_SET_CYCLE(msg, 0);
1428   SendHypercube(size, msg);
1429
1430 #else
1431   int i;
1432
1433   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1434     CmiSyncSendFn(i, size,msg) ;
1435   for ( i=0; i<cs->pe; i++ )
1436     CmiSyncSendFn(i, size,msg) ;
1437 #endif
1438
1439   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1440 }
1441
1442
1443 /*  FIXME: luckily async is never used  G. Zheng */
1444 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1445 {
1446   CmiState cs = CmiGetState();
1447   int i ;
1448
1449   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1450     CmiAsyncSendFn(i,size,msg) ;
1451   for ( i=0; i<cs->pe; i++ )
1452     CmiAsyncSendFn(i,size,msg) ;
1453
1454   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1455   CmiAbort("CmiAsyncBroadcastFn should never be called");
1456   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1457 }
1458
1459 void CmiFreeBroadcastFn(int size, char *msg)
1460 {
1461    CmiSyncBroadcastFn(size,msg);
1462    CmiFree(msg);
1463 }
1464
1465 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1466 {
1467
1468 #if CMK_BROADCAST_SPANNING_TREE
1469   CmiState cs = CmiGetState();
1470   CmiSyncSendFn(cs->pe, size,msg) ;
1471   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1472   SendSpanningChildren(size, msg);
1473
1474 #elif CMK_BROADCAST_HYPERCUBE
1475   CmiState cs = CmiGetState();
1476   CmiSyncSendFn(cs->pe, size,msg) ;
1477   CMI_SET_CYCLE(msg, 0);
1478   SendHypercube(size, msg);
1479
1480 #else
1481     int i ;
1482
1483   for ( i=0; i<_Cmi_numpes; i++ )
1484     CmiSyncSendFn(i,size,msg) ;
1485 #endif
1486
1487   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1488 }
1489
1490 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1491 {
1492   int i ;
1493
1494   for ( i=1; i<_Cmi_numpes; i++ )
1495     CmiAsyncSendFn(i,size,msg) ;
1496
1497   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1498
1499   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1500 }
1501
1502 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1503 {
1504
1505 #if CMK_BROADCAST_SPANNING_TREE
1506   CmiState cs = CmiGetState();
1507   CmiSyncSendFn(cs->pe, size,msg) ;
1508   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1509   SendSpanningChildren(size, msg);
1510
1511 #elif CMK_BROADCAST_HYPERCUBE
1512   CmiState cs = CmiGetState();
1513   CmiSyncSendFn(cs->pe, size,msg) ;
1514   CMI_SET_CYCLE(msg, 0);
1515   SendHypercube(size, msg);
1516
1517 #else
1518   int i ;
1519
1520   for ( i=0; i<_Cmi_numpes; i++ )
1521     CmiSyncSendFn(i,size,msg) ;
1522 #endif
1523   CmiFree(msg) ;
1524   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1525 }
1526
1527 #if CMK_NODE_QUEUE_AVAILABLE
1528
1529 static void CmiSendNodeSelf(char *msg)
1530 {
1531 #if CMK_IMMEDIATE_MSG
1532 #if 0
1533     if (CmiIsImmediate(msg) && !_immRunning) {
1534       /*CmiHandleImmediateMessage(msg); */
1535       CmiPushImmediateMsg(msg);
1536       CmiHandleImmediate();
1537       return;
1538     }
1539 #endif
1540     if (CmiIsImmediate(msg))
1541     {
1542       CmiPushImmediateMsg(msg);
1543       if (!_immRunning) CmiHandleImmediate();
1544       return;
1545     }
1546 #endif
1547     CQdCreate(CpvAccess(cQdState), 1);
1548     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1549     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1550     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1551 }
1552
1553 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1554 {
1555   int i;
1556   SMSG_LIST *msg_tmp;
1557   char *dupmsg;
1558
1559   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1560   switch (dstNode) {
1561   case NODE_BROADCAST_ALL:
1562     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1563   case NODE_BROADCAST_OTHERS:
1564     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1565     for (i=0; i<_Cmi_numnodes; i++)
1566       if (i!=_Cmi_mynode) {
1567         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1568       }
1569     break;
1570   default:
1571     dupmsg = (char *)CmiCopyMsg(msg,size);
1572     if(dstNode == _Cmi_mynode) {
1573       CmiSendNodeSelf(dupmsg);
1574     }
1575     else {
1576       CQdCreate(CpvAccess(cQdState), 1);
1577       EnqueueMsg(dupmsg, size, dstNode);
1578     }
1579   }
1580   return 0;
1581 }
1582
1583 void CmiSyncNodeSendFn(int p, int s, char *m)
1584 {
1585   CmiAsyncNodeSendFn(p, s, m);
1586 }
1587
1588 /* need */
1589 void CmiFreeNodeSendFn(int p, int s, char *m)
1590 {
1591   CmiAsyncNodeSendFn(p, s, m);
1592   CmiFree(m);
1593 }
1594
1595 /* need */
1596 void CmiSyncNodeBroadcastFn(int s, char *m)
1597 {
1598   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1599 }
1600
1601 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1602 {
1603 }
1604
1605 /* need */
1606 void CmiFreeNodeBroadcastFn(int s, char *m)
1607 {
1608   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1609   CmiFree(m);
1610 }
1611
1612 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1613 {
1614   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1615 }
1616
1617 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1618 {
1619   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1620 }
1621
1622 /* need */
1623 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1624 {
1625   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1626   CmiFree(m);
1627 }
1628 #endif
1629
1630 /************************** MAIN ***********************************/
1631 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1632
1633 void ConverseExit(void)
1634 {
1635 #if ! CMK_SMP
1636   while(!CmiAllAsyncMsgsSent()) {
1637     PumpMsgs();
1638     CmiReleaseSentMessages();
1639   }
1640   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1641     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1642
1643   ConverseCommonExit();
1644 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1645   if (CmiMyPe() == 0){
1646     CmiPrintf("End of program\n");
1647 #if MPI_POST_RECV_COUNT > 0
1648     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1649 #endif
1650 }
1651 #endif
1652 #if ! CMK_AUTOBUILD
1653   signal(SIGINT, signal_int);
1654   MPI_Finalize();
1655 #endif
1656   exit(0);
1657
1658 #else
1659     /* SMP version, communication thread will exit */
1660   ConverseCommonExit();
1661   /* atomic increment */
1662   CmiLock(exitLock);
1663   inexit++;
1664   CmiUnlock(exitLock);
1665   while (1) CmiYield();
1666 #endif
1667 }
1668
1669 static void registerMPITraceEvents() {
1670 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1671     traceRegisterUserEvent("MPI_Barrier", 10);
1672     traceRegisterUserEvent("MPI_Send", 20);
1673     traceRegisterUserEvent("MPI_Recv", 30);
1674     traceRegisterUserEvent("MPI_Isend", 40);
1675     traceRegisterUserEvent("MPI_Irecv", 50);
1676     traceRegisterUserEvent("MPI_Test", 60);
1677     traceRegisterUserEvent("MPI_Iprobe", 70);
1678 #endif
1679 }
1680
1681
1682 static char     **Cmi_argv;
1683 static char     **Cmi_argvcopy;
1684 static CmiStartFn Cmi_startfn;   /* The start function */
1685 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1686
1687 typedef struct {
1688   int sleepMs; /*Milliseconds to sleep while idle*/
1689   int nIdles; /*Number of times we've been idle in a row*/
1690   CmiState cs; /*Machine state*/
1691 } CmiIdleState;
1692
1693 static CmiIdleState *CmiNotifyGetState(void)
1694 {
1695   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1696   s->sleepMs=0;
1697   s->nIdles=0;
1698   s->cs=CmiGetState();
1699   return s;
1700 }
1701
1702 static void CmiNotifyBeginIdle(CmiIdleState *s)
1703 {
1704   s->sleepMs=0;
1705   s->nIdles=0;
1706 }
1707
1708 static void CmiNotifyStillIdle(CmiIdleState *s)
1709 {
1710 #if ! CMK_SMP
1711   CmiReleaseSentMessages();
1712   PumpMsgs();
1713 #else
1714 /*  CmiYield();  */
1715 #endif
1716
1717 #if 1
1718   {
1719   int nSpins=20; /*Number of times to spin before sleeping*/
1720   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1721   s->nIdles++;
1722   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1723     s->sleepMs+=2;
1724     if (s->sleepMs>10) s->sleepMs=10;
1725   }
1726   /*Comm. thread will listen on sockets-- just sleep*/
1727   if (s->sleepMs>0) {
1728     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1729     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1730     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1731   }
1732   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1733   }
1734 #endif
1735 }
1736
1737 #if MACHINE_DEBUG_LOG
1738 FILE *debugLog = NULL;
1739 #endif
1740
1741 static int machine_exit_idx;
1742 static void machine_exit(char *m) {
1743   EmergencyExit();
1744   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1745   fflush(stdout);
1746   CmiNodeBarrier();
1747   if (CmiMyRank() == 0) {
1748     MPI_Barrier(MPI_COMM_WORLD);
1749     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1750     MPI_Abort(MPI_COMM_WORLD, 1);
1751   } else {
1752     while (1) CmiYield();
1753   }
1754 }
1755
1756 static void KillOnAllSigs(int sigNo) {
1757   static int already_in_signal_handler = 0;
1758   char *m;
1759   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1760   already_in_signal_handler = 1;
1761 #if CMK_CCS_AVAILABLE
1762   if (CpvAccess(cmiArgDebugFlag)) {
1763     CpdNotify(CPD_SIGNAL, sigNo);
1764     CpdFreeze();
1765   }
1766 #endif
1767   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1768       "Signal: %d\n",CmiMyPe(),sigNo);
1769   CmiPrintStackTrace(1);
1770
1771   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1772   CmiSetHandler(m, machine_exit_idx);
1773   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1774   machine_exit(m);
1775 }
1776
1777 static void ConverseRunPE(int everReturn)
1778 {
1779   CmiIdleState *s=CmiNotifyGetState();
1780   CmiState cs;
1781   char** CmiMyArgv;
1782
1783   CmiNodeAllBarrier();
1784
1785   cs = CmiGetState();
1786   CpvInitialize(void *,CmiLocalQueue);
1787   CpvAccess(CmiLocalQueue) = cs->localqueue;
1788
1789   if (CmiMyRank())
1790     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1791   else
1792     CmiMyArgv=Cmi_argv;
1793
1794   CthInit(CmiMyArgv);
1795
1796   ConverseCommonInit(CmiMyArgv);
1797   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1798
1799 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1800   CpvInitialize(double, projTraceStart);
1801   /* only PE 0 needs to care about registration (to generate sts file). */
1802   if (CmiMyPe() == 0) {
1803     registerMachineUserEventsFunction(&registerMPITraceEvents);
1804   }
1805 #endif
1806
1807   /* initialize the network progress counter*/
1808   /* Network progress function is used to poll the network when for
1809      messages. This flushes receive buffers on some  implementations*/
1810   CpvInitialize(int , networkProgressCount);
1811   CpvAccess(networkProgressCount) = 0;
1812
1813 #if CMK_SMP
1814   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1815   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1816 #else
1817   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1818 #endif
1819
1820 #if MACHINE_DEBUG_LOG
1821   if (CmiMyRank() == 0) {
1822     char ln[200];
1823     sprintf(ln,"debugLog.%d",CmiMyNode());
1824     debugLog=fopen(ln,"w");
1825   }
1826 #endif
1827
1828   /* Converse initialization finishes, immediate messages can be processed.
1829      node barrier previously should take care of the node synchronization */
1830   _immediateReady = 1;
1831
1832   /* communication thread */
1833   if (CmiMyRank() == CmiMyNodeSize()) {
1834     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1835     while (1) CommunicationServerThread(5);
1836   }
1837   else {  /* worker thread */
1838   if (!everReturn) {
1839     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1840     if (Cmi_usrsched==0) CsdScheduler(-1);
1841     ConverseExit();
1842   }
1843   }
1844 }
1845
1846 static char *thread_level_tostring(int thread_level)
1847 {
1848 #if CMK_MPI_INIT_THREAD
1849   switch (thread_level) {
1850   case MPI_THREAD_SINGLE:
1851       return "MPI_THREAD_SINGLE";
1852   case MPI_THREAD_FUNNELED:
1853       return "MPI_THREAD_FUNNELED";
1854   case MPI_THREAD_SERIALIZED:
1855       return "MPI_THREAD_SERIALIZED";
1856   case MPI_THREAD_MULTIPLE :
1857       return "MPI_THREAD_MULTIPLE ";
1858   default: {
1859       char *str = (char*)malloc(5);
1860       sprintf(str,"%d", thread_level);
1861       return str;
1862       }
1863   }
1864   return  "unknown";
1865 #else
1866   char *str = (char*)malloc(5);
1867   sprintf(str,"%d", thread_level);
1868   return str;
1869 #endif
1870 }
1871
1872 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1873 {
1874   int n,i;
1875   int ver, subver;
1876   int provided;
1877   int thread_level;
1878
1879 #if MACHINE_DEBUG
1880   debugLog=NULL;
1881 #endif
1882 #if CMK_USE_HP_MAIN_FIX
1883 #if FOR_CPLUS
1884   _main(argc,argv);
1885 #endif
1886 #endif
1887
1888 #if CMK_MPI_INIT_THREAD
1889 #if CMK_SMP
1890   thread_level = MPI_THREAD_FUNNELED;
1891 #else
1892   thread_level = MPI_THREAD_SINGLE;
1893 #endif
1894   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1895   _thread_provided = provided;
1896 #else
1897   MPI_Init(&argc, &argv);
1898   thread_level = 0;
1899   provided = -1;
1900 #endif
1901   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1902   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1903
1904   MPI_Get_version(&ver, &subver);
1905   if (_Cmi_mynode == 0) {
1906     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1907   }
1908
1909   /* processor per node */
1910   _Cmi_mynodesize = 1;
1911   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
1912     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1913 #if ! CMK_SMP
1914   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
1915     CmiAbort("+ppn cannot be used in non SMP version!\n");
1916 #endif
1917   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1918   if (idleblock && _Cmi_mynode == 0) {
1919     printf("Charm++: Running in idle blocking mode.\n");
1920   }
1921
1922   /* setup signal handlers */
1923   signal(SIGSEGV, KillOnAllSigs);
1924   signal(SIGFPE, KillOnAllSigs);
1925   signal(SIGILL, KillOnAllSigs);
1926   signal_int = signal(SIGINT, KillOnAllSigs);
1927   signal(SIGTERM, KillOnAllSigs);
1928   signal(SIGABRT, KillOnAllSigs);
1929 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1930   signal(SIGQUIT, KillOnAllSigs);
1931   signal(SIGBUS, KillOnAllSigs);
1932 /*#     if CMK_HANDLE_SIGUSR
1933   signal(SIGUSR1, HandleUserSignals);
1934   signal(SIGUSR2, HandleUserSignals);
1935 #     endif*/
1936 #   endif /*UNIX*/
1937   
1938 #if CMK_NO_OUTSTANDING_SENDS
1939   no_outstanding_sends=1;
1940 #endif
1941   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1942     no_outstanding_sends = 1;
1943     if (_Cmi_mynode == 0)
1944       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1945         no_outstanding_sends?"":" not");
1946   }
1947   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1948   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1949   Cmi_argvcopy = CmiCopyArgs(argv);
1950   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1951   /* find dim = log2(numpes), to pretend we are a hypercube */
1952   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1953     Cmi_dim++ ;
1954  /* CmiSpanTreeInit();*/
1955   request_max=MAX_QLEN;
1956   CmiGetArgInt(argv,"+requestmax",&request_max);
1957   /*printf("request max=%d\n", request_max);*/
1958
1959   /* checksum flag */
1960   if (CmiGetArgFlag(argv,"+checksum")) {
1961 #if !CMK_OPTIMIZE
1962     checksum_flag = 1;
1963     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1964 #else
1965     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1966 #endif
1967   }
1968
1969   {
1970   int debug = CmiGetArgFlag(argv,"++debug");
1971   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
1972   if (debug || debug_no_pause)
1973   {   /*Pause so user has a chance to start and attach debugger*/
1974 #if CMK_HAS_GETPID
1975     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
1976     fflush(stdout);
1977     if (!debug_no_pause)
1978       sleep(15);
1979 #else
1980     printf("++debug ignored.\n");
1981 #endif
1982   }
1983   }
1984
1985 #if MPI_POST_RECV_COUNT > 0
1986
1987   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1988   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1989   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
1990   CpvInitialize(char*,CmiPostedRecvBuffers);
1991
1992     /* Post some extra recvs to help out with incoming messages */
1993     /* On some MPIs the messages are unexpected and thus slow */
1994
1995     /* An array of request handles for posted recvs */
1996     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1997
1998     /* An array of buffers for posted recvs */
1999     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2000
2001     /* Post Recvs */
2002     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2003         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2004                     MPI_POST_RECV_SIZE,
2005                     MPI_BYTE,
2006                     MPI_ANY_SOURCE,
2007                     POST_RECV_TAG,
2008                     MPI_COMM_WORLD,
2009                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2010           CmiAbort("MPI_Irecv failed\n");
2011     }
2012
2013 #endif
2014
2015
2016
2017   /* CmiTimerInit(); */
2018
2019 #if 0
2020   CthInit(argv);
2021   ConverseCommonInit(argv);
2022
2023   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2024   if (initret==0) {
2025     fn(CmiGetArgc(argv), argv);
2026     if (usched==0) CsdScheduler(-1);
2027     ConverseExit();
2028   }
2029 #endif
2030
2031   CsvInitialize(CmiNodeState, NodeState);
2032   CmiNodeStateInit(&CsvAccess(NodeState));
2033
2034   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2035
2036   for (i=0; i<_Cmi_mynodesize+1; i++) {
2037 #if MULTI_SENDQUEUE
2038     procState[i].sendMsgBuf = PCQueueCreate();
2039 #endif
2040     procState[i].recvLock = CmiCreateLock();
2041   }
2042 #if CMK_SMP
2043 #if !MULTI_SENDQUEUE
2044   sendMsgBuf = PCQueueCreate();
2045   sendMsgBufLock = CmiCreateLock();
2046 #endif
2047   exitLock = CmiCreateLock();            /* exit count lock */
2048 #endif
2049
2050   /* Network progress function is used to poll the network when for
2051      messages. This flushes receive buffers on some  implementations*/
2052   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2053   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2054
2055   CmiStartThreads(argv);
2056   ConverseRunPE(initret);
2057 }
2058
2059 /***********************************************************************
2060  *
2061  * Abort function:
2062  *
2063  ************************************************************************/
2064
2065 void CmiAbort(const char *message)
2066 {
2067   char *m;
2068   /* if CharmDebug is attached simply try to send a message to it */
2069 #if CMK_CCS_AVAILABLE
2070   if (CpvAccess(cmiArgDebugFlag)) {
2071     CpdNotify(CPD_ABORT, message);
2072     CpdFreeze();
2073   }
2074 #endif  
2075   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2076         "Reason: %s\n",CmiMyPe(),message);
2077  /*  CmiError(message); */
2078   CmiPrintStackTrace(0);
2079   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2080   CmiSetHandler(m, machine_exit_idx);
2081   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2082   machine_exit(m);
2083   /* Program never reaches here */
2084   MPI_Abort(MPI_COMM_WORLD, 1);
2085 }
2086
2087
2088 #if 0
2089
2090 /* ****************************************************************** */
2091 /*    The following internal functions implement recd msg queue       */
2092 /* ****************************************************************** */
2093
2094 static void ** AllocBlock(unsigned int len)
2095 {
2096   void ** blk;
2097
2098   blk=(void **)CmiAlloc(len*sizeof(void *));
2099   if(blk==(void **)0) {
2100     CmiError("Cannot Allocate Memory!\n");
2101     MPI_Abort(MPI_COMM_WORLD, 1);
2102   }
2103   return blk;
2104 }
2105
2106 static void
2107 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2108 {
2109   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2110   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2111 }
2112
2113 void recdQueueInit(void)
2114 {
2115   recdQueue_blk = AllocBlock(BLK_LEN);
2116   recdQueue_blk_len = BLK_LEN;
2117   recdQueue_first = 0;
2118   recdQueue_len = 0;
2119 }
2120
2121 void recdQueueAddToBack(void *element)
2122 {
2123 #if NODE_0_IS_CONVHOST
2124   inside_comm = 1;
2125 #endif
2126   if(recdQueue_len==recdQueue_blk_len) {
2127     void **blk;
2128     recdQueue_blk_len *= 3;
2129     blk = AllocBlock(recdQueue_blk_len);
2130     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2131     CmiFree(recdQueue_blk);
2132     recdQueue_blk = blk;
2133     recdQueue_first = 0;
2134   }
2135   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2136 #if NODE_0_IS_CONVHOST
2137   inside_comm = 0;
2138 #endif
2139 }
2140
2141
2142 void * recdQueueRemoveFromFront(void)
2143 {
2144   if(recdQueue_len) {
2145     void *element;
2146     element = recdQueue_blk[recdQueue_first++];
2147     recdQueue_first %= recdQueue_blk_len;
2148     recdQueue_len--;
2149     return element;
2150   }
2151   return 0;
2152 }
2153
2154 #endif
2155
2156 /*@}*/