Fixed a linking error if neither the spanning tree nor the hypercube broadcast is...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #define CMK_BROADCAST_SPANNING_TREE    1
96 #define CMK_BROADCAST_HYPERCUBE        0
97
98 #define BROADCAST_SPANNING_FACTOR      4
99 /* The number of children used when a msg is broadcast inside a node */
100 #define BROADCAST_SPANNING_INTRA_FACTOR      8
101
102 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
103 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
107
108 /* FIXME: need a random number that everyone agrees ! */
109 #define CHARM_MAGIC_NUMBER               126
110
111 #if !CMK_OPTIMIZE
112 static int checksum_flag = 0;
113 #define CMI_SET_CHECKSUM(msg, len)      \
114         if (checksum_flag)  {   \
115           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
116           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
117         }
118 #define CMI_CHECK_CHECKSUM(msg, len)    \
119         if (checksum_flag)      \
120           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
121             CmiAbort("Fatal error: checksum doesn't agree!\n");
122 #else
123 #define CMI_SET_CHECKSUM(msg, len)
124 #define CMI_CHECK_CHECKSUM(msg, len)
125 #endif
126
127 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
128 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
129 #else
130 #define CMI_SET_BROADCAST_ROOT(msg, root) 
131 #endif
132
133
134 /** 
135     If MPI_POST_RECV is defined, we provide default values for size 
136     and number of posted recieves. If MPI_POST_RECV_COUNT is set
137     then a default value for MPI_POST_RECV_SIZE is used if not specified
138     by the user.
139 */
140 #ifdef MPI_POST_RECV
141 #define MPI_POST_RECV_COUNT 10
142 #undef MPI_POST_RECV
143 #endif
144 #if MPI_POST_RECV_COUNT > 0
145 #warning "Using MPI posted receives which have not yet been tested"
146 #ifndef MPI_POST_RECV_SIZE
147 #define MPI_POST_RECV_SIZE 200
148 #endif
149 /* #undef  MPI_POST_RECV_DEBUG  */
150 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
151 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
152 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
153 CpvDeclare(char*,CmiPostedRecvBuffers);
154 #endif
155
156 /*
157  to avoid MPI's in order delivery, changing MPI Tag all the time
158 */
159 #define TAG     1375
160
161 #if MPI_POST_RECV_COUNT > 0
162 #define POST_RECV_TAG TAG+1
163 #define BARRIER_ZERO_TAG TAG
164 #else
165 #define BARRIER_ZERO_TAG     1375
166 #endif
167
168 #include <signal.h>
169 void (*signal_int)(int);
170
171 /*
172 static int mpi_tag = TAG;
173 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
174 */
175
176 static int        _thread_provided = -1;
177 int               _Cmi_numpes;
178 int               _Cmi_mynode;    /* Which address space am I */
179 int               _Cmi_mynodesize;/* Number of processors in my address space */
180 int               _Cmi_numnodes;  /* Total number of address spaces */
181 int               _Cmi_numpes;    /* Total number of processors */
182 static int        Cmi_nodestart; /* First processor in this address space */
183 CpvDeclare(void*, CmiLocalQueue);
184
185 /*Network progress utility variables. Period controls the rate at
186   which the network poll is called */
187 CpvDeclare(unsigned , networkProgressCount);
188 int networkProgressPeriod;
189
190 int               idleblock = 0;
191
192 #define BLK_LEN  512
193
194 #if CMK_NODE_QUEUE_AVAILABLE
195 #define DGRAM_NODEMESSAGE   (0xFB)
196
197 #define NODE_BROADCAST_OTHERS (-1)
198 #define NODE_BROADCAST_ALL    (-2)
199 #endif
200
201 #if 0
202 static void **recdQueue_blk;
203 static unsigned int recdQueue_blk_len;
204 static unsigned int recdQueue_first;
205 static unsigned int recdQueue_len;
206 static void recdQueueInit(void);
207 static void recdQueueAddToBack(void *element);
208 static void *recdQueueRemoveFromFront(void);
209 #endif
210
211 static void ConverseRunPE(int everReturn);
212 static void CommunicationServer(int sleepTime);
213 static void CommunicationServerThread(int sleepTime);
214
215 typedef struct msg_list {
216      char *msg;
217      struct msg_list *next;
218      int size, destpe;
219
220 #if CMK_SMP_TRACE_COMMTHREAD
221         int srcpe;
222 #endif  
223         
224      MPI_Request req;
225 } SMSG_LIST;
226
227 int MsgQueueLen=0;
228 static int request_max;
229
230 static SMSG_LIST *sent_msgs=0;
231 static SMSG_LIST *end_sent=0;
232
233 static int Cmi_dim;
234
235 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
236
237 #if NODE_0_IS_CONVHOST
238 int inside_comm = 0;
239 #endif
240
241 void CmiAbort(const char *message);
242 static void PerrorExit(const char *msg);
243
244 void SendSpanningChildren(int size, char *msg);
245 void SendHypercube(int size, char *msg);
246
247 static void PerrorExit(const char *msg)
248 {
249   perror(msg);
250   exit(1);
251 }
252
253 extern unsigned char computeCheckSum(unsigned char *data, int len);
254
255 /**************************  TIMER FUNCTIONS **************************/
256
257 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
258
259 /* MPI calls are not threadsafe, even the timer on some machines */
260 static CmiNodeLock  timerLock = 0;
261 static double starttimer = 0;
262 static int _is_global = 0;
263
264 int CmiTimerIsSynchronized()
265 {
266   int  flag;
267   void *v;
268
269   /*  check if it using synchronized timer */
270   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
271     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
272   if (flag) {
273     _is_global = *(int*)v;
274     if (_is_global && CmiMyPe() == 0)
275       printf("Charm++> MPI timer is synchronized!\n");
276   }
277   return _is_global;
278 }
279
280 void CmiTimerInit()
281 {
282   _is_global = CmiTimerIsSynchronized();
283
284   if (_is_global) {
285     if (CmiMyRank() == 0) {
286       double minTimer;
287 #if CMK_TIMER_USE_XT3_DCLOCK
288       starttimer = dclock();
289 #else
290       starttimer = MPI_Wtime();
291 #endif
292
293       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
294                                   MPI_COMM_WORLD );
295       starttimer = minTimer;
296     }
297   }
298   else {  /* we don't have a synchronous timer, set our own start time */
299     CmiBarrier();
300     CmiBarrier();
301     CmiBarrier();
302 #if CMK_TIMER_USE_XT3_DCLOCK
303     starttimer = dclock();
304 #else
305     starttimer = MPI_Wtime();
306 #endif
307   }
308
309 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
310   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
311     timerLock = CmiCreateLock();
312 #endif
313   CmiNodeAllBarrier();          /* for smp */
314 }
315
316 /**
317  * Since the timerLock is never created, and is
318  * always NULL, then all the if-condition inside
319  * the timer functions could be disabled right
320  * now in the case of SMP. --Chao Mei
321  */
322 double CmiTimer(void)
323 {
324   double t;
325 #if 0 && CMK_SMP
326   if (timerLock) CmiLock(timerLock);
327 #endif
328 #if CMK_TIMER_USE_XT3_DCLOCK
329   t = dclock() - starttimer;
330 #else
331   t = MPI_Wtime() - starttimer;
332 #endif
333
334 #if 0 && CMK_SMP
335   if (timerLock) CmiUnlock(timerLock);
336 #endif
337
338   return t;
339 }
340
341 double CmiWallTimer(void)
342 {
343   double t;
344 #if 0 && CMK_SMP
345   if (timerLock) CmiLock(timerLock);
346 #endif
347 #if CMK_TIMER_USE_XT3_DCLOCK
348   t = dclock() - starttimer;
349 #else
350   t = MPI_Wtime() - starttimer;
351 #endif
352 #if 0 && CMK_SMP
353   if (timerLock) CmiUnlock(timerLock);
354 #endif
355   return t;
356 }
357
358 double CmiCpuTimer(void)
359 {
360   double t;
361 #if 0 && CMK_SMP
362   if (timerLock) CmiLock(timerLock);
363 #endif
364 #if CMK_TIMER_USE_XT3_DCLOCK
365   t = dclock() - starttimer;
366 #else
367   t = MPI_Wtime() - starttimer;
368 #endif
369 #if 0 && CMK_SMP
370   if (timerLock) CmiUnlock(timerLock);
371 #endif
372   return t;
373 }
374
375 #endif
376
377 /* must be called on all ranks including comm thread in SMP */
378 int CmiBarrier()
379 {
380 #if CMK_SMP
381     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
382   CmiNodeAllBarrier();
383   if (CmiMyRank() == CmiMyNodeSize()) 
384 #else
385   if (CmiMyRank() == 0) 
386 #endif
387   {
388 /**
389  *  The call of CmiBarrier is usually before the initialization
390  *  of trace module of Charm++, therefore, the START_EVENT
391  *  and END_EVENT are disabled here. -Chao Mei
392  */     
393     /*START_EVENT();*/
394
395     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
396         CmiAbort("Timernit: MPI_Barrier failed!\n");
397
398     /*END_EVENT(10);*/
399   }
400   CmiNodeAllBarrier();
401   return 0;
402 }
403
404 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
405 int CmiBarrierZero()
406 {
407   int i;
408 #if CMK_SMP
409   if (CmiMyRank() == CmiMyNodeSize()) 
410 #else
411   if (CmiMyRank() == 0) 
412 #endif
413   {
414     char msg[1];
415     MPI_Status sts;
416     if (CmiMyNode() == 0)  {
417       for (i=0; i<CmiNumNodes()-1; i++) {
418          START_EVENT();
419
420          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
421             CmiPrintf("MPI_Recv failed!\n");
422
423          END_EVENT(30);
424       }
425     }
426     else {
427       START_EVENT();
428
429       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
430          printf("MPI_Send failed!\n");
431
432       END_EVENT(20);
433     }
434   }
435   CmiNodeAllBarrier();
436   return 0;
437 }
438
439 typedef struct ProcState {
440 #if MULTI_SENDQUEUE
441 PCQueue      sendMsgBuf;       /* per processor message sending queue */
442 #endif
443 CmiNodeLock  recvLock;              /* for cs->recv */
444 } ProcState;
445
446 static ProcState  *procState;
447
448 #if CMK_SMP
449
450 #if !MULTI_SENDQUEUE
451 static PCQueue sendMsgBuf;
452 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
453 #endif
454
455 #endif
456
457 /************************************************************
458  *
459  * Processor state structure
460  *
461  ************************************************************/
462
463 /* fake Cmi_charmrun_fd */
464 static int Cmi_charmrun_fd = 0;
465 #include "machine-smp.c"
466
467 CsvDeclare(CmiNodeState, NodeState);
468
469 #include "immediate.c"
470
471 #if CMK_SHARED_VARS_UNAVAILABLE
472 /************ non SMP **************/
473 static struct CmiStateStruct Cmi_state;
474 int _Cmi_mype;
475 int _Cmi_myrank;
476
477 void CmiMemLock() {}
478 void CmiMemUnlock() {}
479
480 #define CmiGetState() (&Cmi_state)
481 #define CmiGetStateN(n) (&Cmi_state)
482
483 void CmiYield(void) { sleep(0); }
484
485 static void CmiStartThreads(char **argv)
486 {
487   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
488   _Cmi_mype = Cmi_nodestart;
489   _Cmi_myrank = 0;
490 }
491 #endif  /* non smp */
492
493 /*Add a message to this processor's receive queue, pe is a rank */
494 void CmiPushPE(int pe,void *msg)
495 {
496   CmiState cs = CmiGetStateN(pe);
497   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
498 #if CMK_IMMEDIATE_MSG
499   if (CmiIsImmediate(msg)) {
500 /*
501 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
502     CmiHandleMessage(msg);
503 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
504 */
505     /**(CmiUInt2 *)msg = pe;*/
506     CMI_DEST_RANK(msg) = pe;
507     CmiPushImmediateMsg(msg);
508     return;
509   }
510 #endif
511
512 #if CMK_SMP
513   CmiLock(procState[pe].recvLock);
514 #endif
515   PCQueuePush(cs->recv,msg);
516 #if CMK_SMP
517   CmiUnlock(procState[pe].recvLock);
518 #endif
519   CmiIdleLock_addMessage(&cs->idle);
520   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
521 }
522
523 #if CMK_NODE_QUEUE_AVAILABLE
524 /*Add a message to this processor's receive queue */
525 static void CmiPushNode(void *msg)
526 {
527   MACHSTATE(3,"Pushing message into NodeRecv queue");
528 #if CMK_IMMEDIATE_MSG
529   if (CmiIsImmediate(msg)) {
530     CMI_DEST_RANK(msg) = 0;
531     CmiPushImmediateMsg(msg);
532     return;
533   }
534 #endif
535   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
536   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
537   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
538   {
539   CmiState cs=CmiGetStateN(0);
540   CmiIdleLock_addMessage(&cs->idle);
541   }
542 }
543 #endif
544
545 #ifndef CmiMyPe
546 int CmiMyPe(void)
547 {
548   return CmiGetState()->pe;
549 }
550 #endif
551
552 #ifndef CmiMyRank
553 int CmiMyRank(void)
554 {
555   return CmiGetState()->rank;
556 }
557 #endif
558
559 #ifndef CmiNodeFirst
560 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
561 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
562 #endif
563
564 #ifndef CmiNodeOf
565 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
566 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
567 #endif
568
569 static size_t CmiAllAsyncMsgsSent(void)
570 {
571    SMSG_LIST *msg_tmp = sent_msgs;
572    MPI_Status sts;
573    int done;
574
575    while(msg_tmp!=0) {
576     done = 0;
577     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
578       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
579     if(!done)
580       return 0;
581     msg_tmp = msg_tmp->next;
582 /*    MsgQueueLen--; ????? */
583    }
584    return 1;
585 }
586
587 int CmiAsyncMsgSent(CmiCommHandle c) {
588
589   SMSG_LIST *msg_tmp = sent_msgs;
590   int done;
591   MPI_Status sts;
592
593   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
594     msg_tmp = msg_tmp->next;
595   if(msg_tmp) {
596     done = 0;
597     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
598       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
599     return ((done)?1:0);
600   } else {
601     return 1;
602   }
603 }
604
605 void CmiReleaseCommHandle(CmiCommHandle c)
606 {
607   return;
608 }
609
610 #if CMK_BLUEGENEL
611 extern void MPID_Progress_test();
612 #endif
613
614 void CmiReleaseSentMessages(void)
615 {
616   SMSG_LIST *msg_tmp=sent_msgs;
617   SMSG_LIST *prev=0;
618   SMSG_LIST *temp;
619   int done;
620   MPI_Status sts;
621
622 #if CMK_BLUEGENEL
623   MPID_Progress_test();
624 #endif
625
626   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
627   while(msg_tmp!=0) {
628     done =0;
629 #if CMK_SMP_TRACE_COMMTHREAD
630     double startT = CmiWallTimer();
631 #endif
632     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
633       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
634     if(done) {
635       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
636       MsgQueueLen--;
637       /* Release the message */
638       temp = msg_tmp->next;
639       if(prev==0)  /* first message */
640         sent_msgs = temp;
641       else
642         prev->next = temp;
643       CmiFree(msg_tmp->msg);
644       CmiFree(msg_tmp);
645       msg_tmp = temp;
646     } else {
647       prev = msg_tmp;
648       msg_tmp = msg_tmp->next;
649     }
650 #if CMK_SMP_TRACE_COMMTHREAD
651     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
652 #endif
653   }
654   end_sent = prev;
655   MACHSTATE(2,"} CmiReleaseSentMessages end");
656 }
657
658 int PumpMsgs(void)
659 {
660   int nbytes, flg, res;
661   char *msg;
662   MPI_Status sts;
663   int recd=0;
664
665 #if CMI_EXERT_RECV_CAP
666   int recvCnt=0;
667 #endif
668         
669 #if CMK_BLUEGENEL
670   MPID_Progress_test();
671 #endif
672
673   MACHSTATE(2,"PumpMsgs begin {");
674
675         
676   while(1) {
677 #if CMI_EXERT_RECV_CAP
678         if(recvCnt==RECV_CAP) break;
679 #endif
680           
681     /* First check posted recvs then do  probe unmatched outstanding messages */
682 #if MPI_POST_RECV_COUNT > 0 
683     int completed_index=-1;
684     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
685         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
686     if(flg){
687         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
688             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
689
690         recd = 1;
691         msg = (char *) CmiAlloc(nbytes);
692         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
693         /* and repost the recv */
694
695         START_EVENT();
696
697         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
698             MPI_POST_RECV_SIZE,
699             MPI_BYTE,
700             MPI_ANY_SOURCE,
701             POST_RECV_TAG,
702             MPI_COMM_WORLD,
703             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
704                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
705
706         END_EVENT(50);
707
708         CpvAccess(Cmi_posted_recv_total)++;
709     }
710     else {
711         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
712         if(res != MPI_SUCCESS)
713         CmiAbort("MPI_Iprobe failed\n");
714         if(!flg) break;
715         recd = 1;
716         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
717         msg = (char *) CmiAlloc(nbytes);
718
719         START_EVENT();
720
721         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
722             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
723
724         END_EVENT(30);
725
726         CpvAccess(Cmi_unposted_recv_total)++;
727     }
728 #else
729     /* Original version */
730 #if CMK_SMP_TRACE_COMMTHREAD
731   double startT = CmiWallTimer(); 
732 #endif
733     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
734     if(res != MPI_SUCCESS)
735       CmiAbort("MPI_Iprobe failed\n");
736
737     if(!flg) break;
738 #if CMK_SMP_TRACE_COMMTHREAD
739     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
740 #endif
741
742     recd = 1;
743     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
744     msg = (char *) CmiAlloc(nbytes);
745     
746     START_EVENT();
747
748     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
749       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
750
751     /*END_EVENT(30);*/
752
753 #endif
754
755 #if CMK_SMP_TRACE_COMMTHREAD
756         traceBeginCommOp(msg);
757         traceChangeLastTimestamp(CpvAccess(projTraceStart));
758         traceEndCommOp(msg);
759         #if CMI_MPI_TRACE_MOREDETAILED
760         char tmp[32];
761         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
762         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
763         #endif
764 #endif  
765         
766     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
767     CMI_CHECK_CHECKSUM(msg, nbytes);
768     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
769       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
770       CmiFree(msg);
771       CmiAbort("Abort!\n");
772       continue;
773     }
774         
775 #if CMK_BROADCAST_SPANNING_TREE
776     if (CMI_BROADCAST_ROOT(msg))
777       SendSpanningChildren(nbytes, msg);
778 #elif CMK_BROADCAST_HYPERCUBE
779     if (CMI_BROADCAST_ROOT(msg))
780       SendHypercube(nbytes, msg);
781 #endif
782         
783         /* In SMP mode, this push operation needs to be executed
784      * after forwarding broadcast messages. If it is executed
785      * earlier, then during the bcast msg forwarding period,    
786          * the msg could be already freed on the worker thread.
787          * As a result, the forwarded message could be wrong! 
788          * --Chao Mei
789          */
790 #if CMK_NODE_QUEUE_AVAILABLE
791     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
792       CmiPushNode(msg);
793     else
794 #endif
795         CmiPushPE(CMI_DEST_RANK(msg), msg);     
796         
797 #if CMI_EXERT_RECV_CAP
798         recvCnt++;
799 #endif  
800   }
801
802   
803 #if CMK_IMMEDIATE_MSG && !CMK_SMP
804   CmiHandleImmediate();
805 #endif
806   
807   MACHSTATE(2,"} PumpMsgs end ");
808   return recd;
809 }
810
811 /* blocking version */
812 static void PumpMsgsBlocking(void)
813 {
814   static int maxbytes = 20000000;
815   static char *buf = NULL;
816   int nbytes, flg;
817   MPI_Status sts;
818   char *msg;
819   int recd=0;
820
821   if (!PCQueueEmpty(CmiGetState()->recv)) return;
822   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
823   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
824   if (sent_msgs)  return;
825
826 #if 0
827   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
828 #endif
829
830   if (buf == NULL) {
831     buf = (char *) CmiAlloc(maxbytes);
832     _MEMCHECK(buf);
833   }
834
835
836 #if MPI_POST_RECV_COUNT > 0
837 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
838 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
839 #endif
840
841   START_EVENT();
842
843   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
844       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
845
846   /*END_EVENT(30);*/
847     
848    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
849    msg = (char *) CmiAlloc(nbytes);
850    memcpy(msg, buf, nbytes);
851
852 #if CMK_SMP_TRACE_COMMTHREAD
853         traceBeginCommOp(msg);
854         traceChangeLastTimestamp(CpvAccess(projTraceStart));
855         traceEndCommOp(msg);
856         #if CMI_MPI_TRACE_MOREDETAILED
857         char tmp[32];
858         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
859         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
860         #endif
861 #endif
862
863 #if CMK_BROADCAST_SPANNING_TREE
864    if (CMI_BROADCAST_ROOT(msg))
865       SendSpanningChildren(nbytes, msg);
866 #elif CMK_BROADCAST_HYPERCUBE
867    if (CMI_BROADCAST_ROOT(msg))
868       SendHypercube(nbytes, msg);
869 #endif
870   
871         /* In SMP mode, this push operation needs to be executed
872      * after forwarding broadcast messages. If it is executed
873      * earlier, then during the bcast msg forwarding period,    
874          * the msg could be already freed on the worker thread.
875          * As a result, the forwarded message could be wrong! 
876          * --Chao Mei
877          */  
878 #if CMK_NODE_QUEUE_AVAILABLE
879    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
880       CmiPushNode(msg);
881    else
882 #endif
883       CmiPushPE(CMI_DEST_RANK(msg), msg);
884 }
885
886 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
887
888 #if CMK_SMP
889
890 static int inexit = 0;
891 static CmiNodeLock  exitLock = 0;
892
893 static int MsgQueueEmpty()
894 {
895   int i;
896 #if MULTI_SENDQUEUE
897   for (i=0; i<_Cmi_mynodesize; i++)
898     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
899 #else
900   return PCQueueEmpty(sendMsgBuf);
901 #endif
902   return 1;
903 }
904
905 static int SendMsgBuf();
906
907 /* test if all processors recv queues are empty */
908 static int RecvQueueEmpty()
909 {
910   int i;
911   for (i=0; i<_Cmi_mynodesize; i++) {
912     CmiState cs=CmiGetStateN(i);
913     if (!PCQueueEmpty(cs->recv)) return 0;
914   }
915   return 1;
916 }
917
918 /**
919 CommunicationServer calls MPI to send messages in the queues and probe message from network.
920 */
921
922 #define REPORT_COMM_METRICS 0
923 #if REPORT_COMM_METRICS
924 static double pumptime = 0.0;
925 static double releasetime = 0.0;
926 static double sendtime = 0.0;
927 #endif
928
929 static void CommunicationServer(int sleepTime)
930 {
931   int static count=0;
932 /*
933   count ++;
934   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
935 */
936 #if REPORT_COMM_METRICS
937   double t1, t2, t3, t4;
938   t1 = CmiWallTimer();
939 #endif
940   PumpMsgs();
941 #if REPORT_COMM_METRICS
942   t2 = CmiWallTimer();
943 #endif
944   CmiReleaseSentMessages();
945 #if REPORT_COMM_METRICS
946   t3 = CmiWallTimer();
947 #endif
948   SendMsgBuf();
949 #if REPORT_COMM_METRICS
950   t4 = CmiWallTimer();
951   pumptime += (t2-t1);
952   releasetime += (t3-t2);
953   sendtime += (t4-t3);
954 #endif
955 /*
956   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
957 */
958   if (inexit == CmiMyNodeSize()) {
959     MACHSTATE(2, "CommunicationServer exiting {");
960 #if 0
961     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
962 #endif
963     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
964       CmiReleaseSentMessages();
965       SendMsgBuf();
966       PumpMsgs();
967     }
968     MACHSTATE(2, "CommunicationServer barrier begin {");
969
970     START_EVENT();
971
972     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
973       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
974
975     END_EVENT(10);
976
977     MACHSTATE(2, "} CommunicationServer barrier end");
978 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
979     if (CmiMyNode() == 0){
980       CmiPrintf("End of program\n");
981     }
982 #endif
983     MACHSTATE(2, "} CommunicationServer EXIT");
984
985     ConverseCommonExit();   
986 #if REPORT_COMM_METRICS
987     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
988 #endif
989
990 #if ! CMK_AUTOBUILD
991     signal(SIGINT, signal_int);
992     MPI_Finalize();
993     #endif
994     exit(0);
995   }
996 }
997
998 #endif
999
1000 static void CommunicationServerThread(int sleepTime)
1001 {
1002 #if CMK_SMP
1003   CommunicationServer(sleepTime);
1004 #endif
1005 #if CMK_IMMEDIATE_MSG
1006   CmiHandleImmediate();
1007 #endif
1008 }
1009
1010 #if CMK_NODE_QUEUE_AVAILABLE
1011 char *CmiGetNonLocalNodeQ(void)
1012 {
1013   CmiState cs = CmiGetState();
1014   char *result = 0;
1015   CmiIdleLock_checkMessage(&cs->idle);
1016 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1017     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1018     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1019     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1020     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1021     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1022 /*  }  */
1023
1024   return result;
1025 }
1026 #endif
1027
1028 void *CmiGetNonLocal(void)
1029 {
1030   static int count=0;
1031   CmiState cs = CmiGetState();
1032   void *msg;
1033
1034 #if ! CMK_SMP
1035   if (CmiNumPes() == 1) return NULL;
1036 #endif
1037
1038   CmiIdleLock_checkMessage(&cs->idle);
1039   /* although it seems that lock is not needed, I found it crashes very often
1040      on mpi-smp without lock */
1041
1042 #if ! CMK_SMP
1043   CmiReleaseSentMessages();
1044   PumpMsgs();
1045 #endif
1046
1047   /* CmiLock(procState[cs->rank].recvLock); */
1048   msg =  PCQueuePop(cs->recv);
1049   /* CmiUnlock(procState[cs->rank].recvLock); */
1050
1051 /*
1052   if (msg) {
1053     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1054   else {
1055     count++;
1056     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1057   }
1058 */
1059 #if ! CMK_SMP
1060   if (no_outstanding_sends) {
1061     while (MsgQueueLen>0) {
1062       CmiReleaseSentMessages();
1063       PumpMsgs();
1064     }
1065   }
1066
1067   if(!msg) {
1068     CmiReleaseSentMessages();
1069     if (PumpMsgs())
1070       return  PCQueuePop(cs->recv);
1071     else
1072       return 0;
1073   }
1074 #endif
1075
1076   return msg;
1077 }
1078
1079 /* called in non-smp mode */
1080 void CmiNotifyIdle(void)
1081 {
1082   CmiReleaseSentMessages();
1083   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1084 }
1085
1086
1087 /********************************************************
1088     The call to probe immediate messages has been renamed to
1089     CmiMachineProgressImpl
1090 ******************************************************/
1091 /* user call to handle immediate message, only useful in non SMP version
1092    using polling method to schedule message.
1093 */
1094 /*
1095 #if CMK_IMMEDIATE_MSG
1096 void CmiProbeImmediateMsg()
1097 {
1098 #if !CMK_SMP
1099   PumpMsgs();
1100   CmiHandleImmediate();
1101 #endif
1102 }
1103 #endif
1104 */
1105
1106 /* Network progress function is used to poll the network when for
1107    messages. This flushes receive buffers on some  implementations*/
1108 #if CMK_MACHINE_PROGRESS_DEFINED
1109 void CmiMachineProgressImpl()
1110 {
1111 #if !CMK_SMP
1112     PumpMsgs();
1113 #if CMK_IMMEDIATE_MSG
1114     CmiHandleImmediate();
1115 #endif
1116 #else
1117     /*Not implemented yet. Communication server does not seem to be
1118       thread safe, so only communication thread call it */
1119     if (CmiMyRank() == CmiMyNodeSize())
1120         CommunicationServerThread(0);
1121 #endif
1122 }
1123 #endif
1124
1125 /********************* MESSAGE SEND FUNCTIONS ******************/
1126
1127 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1128
1129 static void CmiSendSelf(char *msg)
1130 {
1131 #if CMK_IMMEDIATE_MSG
1132     if (CmiIsImmediate(msg)) {
1133       /* CmiBecomeNonImmediate(msg); */
1134       CmiPushImmediateMsg(msg);
1135       CmiHandleImmediate();
1136       return;
1137     }
1138 #endif
1139     CQdCreate(CpvAccess(cQdState), 1);
1140     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1141 }
1142
1143 void CmiSyncSendFn(int destPE, int size, char *msg)
1144 {
1145   CmiState cs = CmiGetState();
1146   char *dupmsg = (char *) CmiAlloc(size);
1147   memcpy(dupmsg, msg, size);
1148
1149   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1150
1151   if (cs->pe==destPE) {
1152     CmiSendSelf(dupmsg);
1153   }
1154   else
1155     CmiAsyncSendFn_(destPE, size, dupmsg);
1156 }
1157
1158 #if CMK_SMP
1159
1160 /* called by communication thread in SMP */
1161 static int SendMsgBuf()
1162 {
1163   SMSG_LIST *msg_tmp;
1164   char *msg;
1165   int node, rank, size;
1166   int i;
1167   int sent = 0;
1168
1169 #if CMI_EXERT_SEND_CAP
1170         int sentCnt = 0;
1171 #endif  
1172         
1173   MACHSTATE(2,"SendMsgBuf begin {");
1174 #if MULTI_SENDQUEUE
1175   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1176   {
1177     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1178     {
1179       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1180 #else
1181     /* single message sending queue */
1182     /* CmiLock(sendMsgBufLock); */
1183     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1184     /* CmiUnlock(sendMsgBufLock); */
1185     while (NULL != msg_tmp)
1186     {
1187 #endif
1188       node = msg_tmp->destpe;
1189       size = msg_tmp->size;
1190       msg = msg_tmp->msg;
1191       msg_tmp->next = 0;
1192       while (MsgQueueLen > request_max) {
1193         CmiReleaseSentMessages();
1194         PumpMsgs();
1195       }
1196       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1197       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1198       CMI_SET_CHECKSUM(msg, size);
1199
1200 #if MPI_POST_RECV_COUNT > 0
1201         if(size <= MPI_POST_RECV_SIZE){
1202
1203           START_EVENT();
1204           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1205                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1206
1207           STOP_EVENT(40);
1208         }
1209         else {
1210             START_EVENT();
1211             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1212                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1213             STOP_EVENT(40);
1214         }
1215 #else
1216         START_EVENT();
1217         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1218             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1219         /*END_EVENT(40);*/
1220 #endif
1221         
1222 #if CMK_SMP_TRACE_COMMTHREAD
1223         traceBeginCommOp(msg);
1224         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1225         /* traceSendMsgComm must execute after traceBeginCommOp because
1226          * we pretend we execute an entry method, and inside this we
1227          * pretend we will send another message. Otherwise how could
1228          * a message creation just before an entry method invocation?
1229          * If such logic is broken, the projections will not trace
1230          * messages correctly! -Chao Mei
1231          */
1232         traceSendMsgComm(msg);
1233         traceEndCommOp(msg);
1234         #if CMI_MPI_TRACE_MOREDETAILED
1235         char tmp[64];
1236         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1237         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1238         #endif
1239 #endif
1240                 
1241                 
1242       MACHSTATE(3,"}MPI_send end");
1243       MsgQueueLen++;
1244       if(sent_msgs==0)
1245         sent_msgs = msg_tmp;
1246       else
1247         end_sent->next = msg_tmp;
1248       end_sent = msg_tmp;
1249       sent=1;
1250           
1251 #if CMI_EXERT_SEND_CAP    
1252           if(++sentCnt == SEND_CAP) break;
1253 #endif    
1254           
1255 #if ! MULTI_SENDQUEUE
1256       /* CmiLock(sendMsgBufLock); */
1257       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1258       /* CmiUnlock(sendMsgBufLock); */
1259 #endif
1260     }
1261 #if MULTI_SENDQUEUE
1262   }
1263 #endif
1264   MACHSTATE(2,"}SendMsgBuf end ");
1265   return sent;
1266 }
1267
1268 void EnqueueMsg(void *m, int size, int node)
1269 {
1270   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1271   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1272   msg_tmp->msg = m;
1273   msg_tmp->size = size;
1274   msg_tmp->destpe = node;
1275         
1276 #if CMK_SMP_TRACE_COMMTHREAD
1277         msg_tmp->srcpe = CmiMyPe();
1278 #endif  
1279
1280 #if MULTI_SENDQUEUE
1281   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1282 #else
1283   CmiLock(sendMsgBufLock);
1284   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1285   CmiUnlock(sendMsgBufLock);
1286 #endif
1287         
1288   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1289 }
1290
1291 #endif
1292
1293 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1294 {
1295   CmiState cs = CmiGetState();
1296   SMSG_LIST *msg_tmp;
1297   CmiUInt2  rank, node;
1298
1299   if(destPE == cs->pe) {
1300     char *dupmsg = (char *) CmiAlloc(size);
1301     memcpy(dupmsg, msg, size);
1302     CmiSendSelf(dupmsg);
1303     return 0;
1304   }
1305   CQdCreate(CpvAccess(cQdState), 1);
1306 #if CMK_SMP
1307   node = CmiNodeOf(destPE);
1308   rank = CmiRankOf(destPE);
1309   if (node == CmiMyNode())  {
1310     CmiPushPE(rank, msg);
1311     return 0;
1312   }
1313   CMI_DEST_RANK(msg) = rank;
1314   EnqueueMsg(msg, size, node);
1315   return 0;
1316 #else
1317   /* non smp */
1318   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1319   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1320   msg_tmp->msg = msg;
1321   msg_tmp->next = 0;
1322   while (MsgQueueLen > request_max) {
1323         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1324         CmiReleaseSentMessages();
1325         PumpMsgs();
1326   }
1327   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1328   CMI_SET_CHECKSUM(msg, size);
1329
1330 #if MPI_POST_RECV_COUNT > 0
1331         if(size <= MPI_POST_RECV_SIZE){
1332
1333           START_EVENT();
1334           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1335                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1336           END_EVENT(40);
1337         }
1338         else {
1339           START_EVENT();
1340           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1341                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1342           END_EVENT(40);
1343         }
1344 #else
1345   START_EVENT();
1346   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1347     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1348   END_EVENT(40);
1349 #endif
1350
1351   MsgQueueLen++;
1352   if(sent_msgs==0)
1353     sent_msgs = msg_tmp;
1354   else
1355     end_sent->next = msg_tmp;
1356   end_sent = msg_tmp;
1357   return (CmiCommHandle) &(msg_tmp->req);
1358 #endif              /* non-smp */
1359 }
1360
1361 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1362 {
1363   CMI_SET_BROADCAST_ROOT(msg, 0);
1364   CmiAsyncSendFn_(destPE, size, msg);
1365 }
1366
1367 void CmiFreeSendFn(int destPE, int size, char *msg)
1368 {
1369   CmiState cs = CmiGetState();
1370   CMI_SET_BROADCAST_ROOT(msg, 0);
1371
1372   if (cs->pe==destPE) {
1373     CmiSendSelf(msg);
1374   } else {
1375     CmiAsyncSendFn_(destPE, size, msg);
1376   }
1377 }
1378
1379 /*********************** BROADCAST FUNCTIONS **********************/
1380
1381 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1382 void CmiSyncSendFn1(int destPE, int size, char *msg)
1383 {
1384   CmiState cs = CmiGetState();
1385   char *dupmsg = (char *) CmiAlloc(size);
1386   memcpy(dupmsg, msg, size);
1387   if (cs->pe==destPE)
1388     CmiSendSelf(dupmsg);
1389   else
1390     CmiAsyncSendFn_(destPE, size, dupmsg);
1391 }
1392
1393 /* send msg to its spanning children in broadcast. G. Zheng */
1394 void SendSpanningChildren(int size, char *msg)
1395 {
1396   CmiState cs = CmiGetState();
1397   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1398   int startnode = CmiNodeOf(startpe);
1399   int i, exceptRank;
1400         
1401    /* first send msgs to other nodes */
1402   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1403   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1404     int nd = CmiMyNode()-startnode;
1405     if (nd<0) nd+=CmiNumNodes();
1406     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1407     if (nd > CmiNumNodes() - 1) break;
1408     nd += startnode;
1409     nd = nd%CmiNumNodes();
1410     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1411         #if CMK_SMP
1412         /* always send to the first rank of other nodes */
1413         char *newmsg = CmiCopyMsg(msg, size);
1414         CMI_DEST_RANK(newmsg) = 0;
1415     EnqueueMsg(newmsg, size, nd);
1416         #else
1417         CmiSyncSendFn1(nd, size, msg);
1418         #endif
1419   }
1420 #if CMK_SMP  
1421    /* second send msgs to my peers on this node */
1422   /* FIXME: now it's just a flat p2p send!! When node size is large,
1423    * it should also be sent in a tree
1424    */
1425    exceptRank = CMI_DEST_RANK(msg);
1426    for(i=0; i<exceptRank; i++){
1427            CmiPushPE(i, CmiCopyMsg(msg, size));
1428    }
1429    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1430            CmiPushPE(i, CmiCopyMsg(msg, size));
1431    }
1432 #endif
1433 }
1434
1435 #include <math.h>
1436
1437 /* send msg along the hypercube in broadcast. (Sameer) */
1438 void SendHypercube(int size, char *msg)
1439 {
1440   CmiState cs = CmiGetState();
1441   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1442   int startnode = CmiNodeOf(startpe);
1443   int i, exceptRank, cnt, tmp, relPE;
1444   int dims=0;
1445
1446   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1447   tmp = CmiNumNodes()-1;
1448   while(tmp>0){
1449           dims++;
1450           tmp = tmp >> 1;
1451   }
1452   if(CmiNumNodes()==1) dims=1;
1453   
1454    /* first send msgs to other nodes */  
1455   relPE = CmiMyNode()-startnode;
1456   if(relPE < 0) relPE += CmiNumNodes();
1457   cnt=0;
1458   tmp = relPE;
1459   /* count how many zeros (in binary format) relPE has */
1460   for(i=0; i<dims; i++, cnt++){
1461     if(tmp & 1 == 1) break;
1462     tmp = tmp >> 1;
1463   }
1464   
1465   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1466   for (i = cnt-1; i >= 0; i--) {
1467     int nd = relPE + (1 << i);
1468         if(nd >= CmiNumNodes()) continue;
1469         nd = (nd+startnode)%CmiNumNodes();
1470         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1471 #if CMK_SMP
1472     /* always send to the first rank of other nodes */
1473     char *newmsg = CmiCopyMsg(msg, size);
1474     CMI_DEST_RANK(newmsg) = 0;
1475     EnqueueMsg(newmsg, size, nd);
1476 #else
1477         CmiSyncSendFn1(nd, size, msg);
1478 #endif
1479   }
1480   
1481 #if CMK_SMP
1482    /* second send msgs to my peers on this node */
1483    /* FIXME: now it's just a flat p2p send!! When node size is large,
1484     * it should also be sent in a tree
1485     */
1486    exceptRank = CMI_DEST_RANK(msg);
1487    for(i=0; i<exceptRank; i++){
1488            CmiPushPE(i, CmiCopyMsg(msg, size));
1489    }
1490    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1491            CmiPushPE(i, CmiCopyMsg(msg, size));
1492    }
1493 #endif
1494 }
1495
1496 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1497 {
1498   CmiState cs = CmiGetState();
1499
1500 #if CMK_SMP     
1501   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1502   CMI_DEST_RANK(msg) = CmiMyRank();
1503 #endif
1504         
1505 #if CMK_BROADCAST_SPANNING_TREE
1506   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1507   SendSpanningChildren(size, msg);
1508
1509 #elif CMK_BROADCAST_HYPERCUBE
1510   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1511   SendHypercube(size, msg);
1512
1513 #else
1514   int i;
1515
1516   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1517     CmiSyncSendFn(i, size,msg) ;
1518   for ( i=0; i<cs->pe; i++ )
1519     CmiSyncSendFn(i, size,msg) ;
1520 #endif
1521
1522   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1523 }
1524
1525
1526 /*  FIXME: luckily async is never used  G. Zheng */
1527 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1528 {
1529   CmiState cs = CmiGetState();
1530   int i ;
1531
1532   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1533     CmiAsyncSendFn(i,size,msg) ;
1534   for ( i=0; i<cs->pe; i++ )
1535     CmiAsyncSendFn(i,size,msg) ;
1536
1537   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1538   CmiAbort("CmiAsyncBroadcastFn should never be called");
1539   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1540 }
1541
1542 void CmiFreeBroadcastFn(int size, char *msg)
1543 {
1544    CmiSyncBroadcastFn(size,msg);
1545    CmiFree(msg);
1546 }
1547
1548 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1549 {
1550
1551 #if CMK_SMP     
1552   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1553   CMI_DEST_RANK(msg) = CmiMyRank();
1554 #endif
1555
1556 #if CMK_BROADCAST_SPANNING_TREE
1557   CmiState cs = CmiGetState();
1558   CmiSyncSendFn(cs->pe, size,msg) ;
1559   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1560   SendSpanningChildren(size, msg);
1561
1562 #elif CMK_BROADCAST_HYPERCUBE
1563   CmiState cs = CmiGetState();
1564   CmiSyncSendFn(cs->pe, size,msg) ;
1565   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1566   SendHypercube(size, msg);
1567
1568 #else
1569     int i ;
1570
1571   for ( i=0; i<_Cmi_numpes; i++ )
1572     CmiSyncSendFn(i,size,msg) ;
1573 #endif
1574
1575   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1576 }
1577
1578 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1579 {
1580   int i ;
1581
1582   for ( i=1; i<_Cmi_numpes; i++ )
1583     CmiAsyncSendFn(i,size,msg) ;
1584
1585   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1586
1587   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1588 }
1589
1590 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1591 {
1592 #if CMK_SMP     
1593   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1594   CMI_DEST_RANK(msg) = CmiMyRank();
1595 #endif
1596
1597 #if CMK_BROADCAST_SPANNING_TREE
1598   CmiState cs = CmiGetState();
1599   CmiSyncSendFn(cs->pe, size,msg) ;
1600   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1601   SendSpanningChildren(size, msg);
1602
1603 #elif CMK_BROADCAST_HYPERCUBE
1604   CmiState cs = CmiGetState();
1605   CmiSyncSendFn(cs->pe, size,msg) ;
1606   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1607   SendHypercube(size, msg);
1608
1609 #else
1610   int i ;
1611
1612   for ( i=0; i<_Cmi_numpes; i++ )
1613     CmiSyncSendFn(i,size,msg) ;
1614 #endif
1615   CmiFree(msg) ;
1616   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1617 }
1618
1619 #if CMK_NODE_QUEUE_AVAILABLE
1620
1621 static void CmiSendNodeSelf(char *msg)
1622 {
1623 #if CMK_IMMEDIATE_MSG
1624 #if 0
1625     if (CmiIsImmediate(msg) && !_immRunning) {
1626       /*CmiHandleImmediateMessage(msg); */
1627       CmiPushImmediateMsg(msg);
1628       CmiHandleImmediate();
1629       return;
1630     }
1631 #endif
1632     if (CmiIsImmediate(msg))
1633     {
1634       CmiPushImmediateMsg(msg);
1635       if (!_immRunning) CmiHandleImmediate();
1636       return;
1637     }
1638 #endif
1639     CQdCreate(CpvAccess(cQdState), 1);
1640     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1641     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1642     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1643 }
1644
1645 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1646 {
1647   int i;
1648   SMSG_LIST *msg_tmp;
1649   char *dupmsg;
1650
1651   CMI_SET_BROADCAST_ROOT(msg, 0);
1652   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1653   switch (dstNode) {
1654   case NODE_BROADCAST_ALL:
1655     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1656   case NODE_BROADCAST_OTHERS:
1657     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1658     for (i=0; i<_Cmi_numnodes; i++)
1659       if (i!=_Cmi_mynode) {
1660         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1661       }
1662     break;
1663   default:
1664     dupmsg = (char *)CmiCopyMsg(msg,size);
1665     if(dstNode == _Cmi_mynode) {
1666       CmiSendNodeSelf(dupmsg);
1667     }
1668     else {
1669       CQdCreate(CpvAccess(cQdState), 1);
1670       EnqueueMsg(dupmsg, size, dstNode);
1671     }
1672   }
1673   return 0;
1674 }
1675
1676 void CmiSyncNodeSendFn(int p, int s, char *m)
1677 {
1678   CmiAsyncNodeSendFn(p, s, m);
1679 }
1680
1681 /* need */
1682 void CmiFreeNodeSendFn(int p, int s, char *m)
1683 {
1684   CmiAsyncNodeSendFn(p, s, m);
1685   CmiFree(m);
1686 }
1687
1688 /* need */
1689 void CmiSyncNodeBroadcastFn(int s, char *m)
1690 {
1691   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1692 }
1693
1694 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1695 {
1696 }
1697
1698 /* need */
1699 void CmiFreeNodeBroadcastFn(int s, char *m)
1700 {
1701   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1702   CmiFree(m);
1703 }
1704
1705 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1706 {
1707   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1708 }
1709
1710 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1711 {
1712   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1713 }
1714
1715 /* need */
1716 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1717 {
1718   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1719   CmiFree(m);
1720 }
1721 #endif
1722
1723 /************************** MAIN ***********************************/
1724 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1725
1726 void ConverseExit(void)
1727 {
1728 #if ! CMK_SMP
1729   while(!CmiAllAsyncMsgsSent()) {
1730     PumpMsgs();
1731     CmiReleaseSentMessages();
1732   }
1733   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1734     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1735
1736   ConverseCommonExit();
1737 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1738   if (CmiMyPe() == 0){
1739     CmiPrintf("End of program\n");
1740 #if MPI_POST_RECV_COUNT > 0
1741     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1742 #endif
1743 }
1744 #endif
1745 #if ! CMK_AUTOBUILD
1746   signal(SIGINT, signal_int);
1747   MPI_Finalize();
1748 #endif
1749   exit(0);
1750
1751 #else
1752     /* SMP version, communication thread will exit */
1753   ConverseCommonExit();
1754   /* atomic increment */
1755   CmiLock(exitLock);
1756   inexit++;
1757   CmiUnlock(exitLock);
1758   while (1) CmiYield();
1759 #endif
1760 }
1761
1762 static void registerMPITraceEvents() {
1763 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1764     traceRegisterUserEvent("MPI_Barrier", 10);
1765     traceRegisterUserEvent("MPI_Send", 20);
1766     traceRegisterUserEvent("MPI_Recv", 30);
1767     traceRegisterUserEvent("MPI_Isend", 40);
1768     traceRegisterUserEvent("MPI_Irecv", 50);
1769     traceRegisterUserEvent("MPI_Test", 60);
1770     traceRegisterUserEvent("MPI_Iprobe", 70);
1771 #endif
1772 }
1773
1774
1775 static char     **Cmi_argv;
1776 static char     **Cmi_argvcopy;
1777 static CmiStartFn Cmi_startfn;   /* The start function */
1778 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1779
1780 typedef struct {
1781   int sleepMs; /*Milliseconds to sleep while idle*/
1782   int nIdles; /*Number of times we've been idle in a row*/
1783   CmiState cs; /*Machine state*/
1784 } CmiIdleState;
1785
1786 static CmiIdleState *CmiNotifyGetState(void)
1787 {
1788   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1789   s->sleepMs=0;
1790   s->nIdles=0;
1791   s->cs=CmiGetState();
1792   return s;
1793 }
1794
1795 static void CmiNotifyBeginIdle(CmiIdleState *s)
1796 {
1797   s->sleepMs=0;
1798   s->nIdles=0;
1799 }
1800
1801 static void CmiNotifyStillIdle(CmiIdleState *s)
1802 {
1803 #if ! CMK_SMP
1804   CmiReleaseSentMessages();
1805   PumpMsgs();
1806 #else
1807 /*  CmiYield();  */
1808 #endif
1809
1810 #if 1
1811   {
1812   int nSpins=20; /*Number of times to spin before sleeping*/
1813   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1814   s->nIdles++;
1815   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1816     s->sleepMs+=2;
1817     if (s->sleepMs>10) s->sleepMs=10;
1818   }
1819   /*Comm. thread will listen on sockets-- just sleep*/
1820   if (s->sleepMs>0) {
1821     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1822     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1823     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1824   }
1825   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1826   }
1827 #endif
1828 }
1829
1830 #if MACHINE_DEBUG_LOG
1831 FILE *debugLog = NULL;
1832 #endif
1833
1834 static int machine_exit_idx;
1835 static void machine_exit(char *m) {
1836   EmergencyExit();
1837   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1838   fflush(stdout);
1839   CmiNodeBarrier();
1840   if (CmiMyRank() == 0) {
1841     MPI_Barrier(MPI_COMM_WORLD);
1842     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1843     MPI_Abort(MPI_COMM_WORLD, 1);
1844   } else {
1845     while (1) CmiYield();
1846   }
1847 }
1848
1849 static void KillOnAllSigs(int sigNo) {
1850   static int already_in_signal_handler = 0;
1851   char *m;
1852   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1853   already_in_signal_handler = 1;
1854 #if CMK_CCS_AVAILABLE
1855   if (CpvAccess(cmiArgDebugFlag)) {
1856     CpdNotify(CPD_SIGNAL, sigNo);
1857     CpdFreeze();
1858   }
1859 #endif
1860   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1861       "Signal: %d\n",CmiMyPe(),sigNo);
1862   CmiPrintStackTrace(1);
1863
1864   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1865   CmiSetHandler(m, machine_exit_idx);
1866   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1867   machine_exit(m);
1868 }
1869
1870 static void ConverseRunPE(int everReturn)
1871 {
1872   CmiIdleState *s=CmiNotifyGetState();
1873   CmiState cs;
1874   char** CmiMyArgv;
1875
1876   CmiNodeAllBarrier();
1877
1878   cs = CmiGetState();
1879   CpvInitialize(void *,CmiLocalQueue);
1880   CpvAccess(CmiLocalQueue) = cs->localqueue;
1881
1882   if (CmiMyRank())
1883     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1884   else
1885     CmiMyArgv=Cmi_argv;
1886
1887   CthInit(CmiMyArgv);
1888
1889   ConverseCommonInit(CmiMyArgv);
1890   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1891
1892 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1893   CpvInitialize(double, projTraceStart);
1894   /* only PE 0 needs to care about registration (to generate sts file). */
1895   if (CmiMyPe() == 0) {
1896     registerMachineUserEventsFunction(&registerMPITraceEvents);
1897   }
1898 #endif
1899
1900   /* initialize the network progress counter*/
1901   /* Network progress function is used to poll the network when for
1902      messages. This flushes receive buffers on some  implementations*/
1903   CpvInitialize(int , networkProgressCount);
1904   CpvAccess(networkProgressCount) = 0;
1905
1906 #if CMK_SMP
1907   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1908   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1909 #else
1910   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1911 #endif
1912
1913 #if MACHINE_DEBUG_LOG
1914   if (CmiMyRank() == 0) {
1915     char ln[200];
1916     sprintf(ln,"debugLog.%d",CmiMyNode());
1917     debugLog=fopen(ln,"w");
1918   }
1919 #endif
1920
1921   /* Converse initialization finishes, immediate messages can be processed.
1922      node barrier previously should take care of the node synchronization */
1923   _immediateReady = 1;
1924
1925   /* communication thread */
1926   if (CmiMyRank() == CmiMyNodeSize()) {
1927     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1928     while (1) CommunicationServerThread(5);
1929   }
1930   else {  /* worker thread */
1931   if (!everReturn) {
1932     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1933     if (Cmi_usrsched==0) CsdScheduler(-1);
1934     ConverseExit();
1935   }
1936   }
1937 }
1938
1939 static char *thread_level_tostring(int thread_level)
1940 {
1941 #if CMK_MPI_INIT_THREAD
1942   switch (thread_level) {
1943   case MPI_THREAD_SINGLE:
1944       return "MPI_THREAD_SINGLE";
1945   case MPI_THREAD_FUNNELED:
1946       return "MPI_THREAD_FUNNELED";
1947   case MPI_THREAD_SERIALIZED:
1948       return "MPI_THREAD_SERIALIZED";
1949   case MPI_THREAD_MULTIPLE :
1950       return "MPI_THREAD_MULTIPLE ";
1951   default: {
1952       char *str = (char*)malloc(5);
1953       sprintf(str,"%d", thread_level);
1954       return str;
1955       }
1956   }
1957   return  "unknown";
1958 #else
1959   char *str = (char*)malloc(5);
1960   sprintf(str,"%d", thread_level);
1961   return str;
1962 #endif
1963 }
1964
1965 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1966 {
1967   int n,i;
1968   int ver, subver;
1969   int provided;
1970   int thread_level;
1971
1972 #if MACHINE_DEBUG
1973   debugLog=NULL;
1974 #endif
1975 #if CMK_USE_HP_MAIN_FIX
1976 #if FOR_CPLUS
1977   _main(argc,argv);
1978 #endif
1979 #endif
1980
1981 #if CMK_MPI_INIT_THREAD
1982 #if CMK_SMP
1983   thread_level = MPI_THREAD_FUNNELED;
1984 #else
1985   thread_level = MPI_THREAD_SINGLE;
1986 #endif
1987   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1988   _thread_provided = provided;
1989 #else
1990   MPI_Init(&argc, &argv);
1991   thread_level = 0;
1992   provided = -1;
1993 #endif
1994   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1995   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1996
1997   MPI_Get_version(&ver, &subver);
1998   if (_Cmi_mynode == 0) {
1999     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2000   }
2001
2002   /* processor per node */
2003   _Cmi_mynodesize = 1;
2004   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2005     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2006 #if ! CMK_SMP
2007   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2008     CmiAbort("+ppn cannot be used in non SMP version!\n");
2009 #endif
2010   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2011   if (idleblock && _Cmi_mynode == 0) {
2012     printf("Charm++: Running in idle blocking mode.\n");
2013   }
2014
2015   /* setup signal handlers */
2016   signal(SIGSEGV, KillOnAllSigs);
2017   signal(SIGFPE, KillOnAllSigs);
2018   signal(SIGILL, KillOnAllSigs);
2019   signal_int = signal(SIGINT, KillOnAllSigs);
2020   signal(SIGTERM, KillOnAllSigs);
2021   signal(SIGABRT, KillOnAllSigs);
2022 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2023   signal(SIGQUIT, KillOnAllSigs);
2024   signal(SIGBUS, KillOnAllSigs);
2025 /*#     if CMK_HANDLE_SIGUSR
2026   signal(SIGUSR1, HandleUserSignals);
2027   signal(SIGUSR2, HandleUserSignals);
2028 #     endif*/
2029 #   endif /*UNIX*/
2030   
2031 #if CMK_NO_OUTSTANDING_SENDS
2032   no_outstanding_sends=1;
2033 #endif
2034   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2035     no_outstanding_sends = 1;
2036     if (_Cmi_mynode == 0)
2037       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2038         no_outstanding_sends?"":" not");
2039   }
2040   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2041   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2042   Cmi_argvcopy = CmiCopyArgs(argv);
2043   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2044   /* find dim = log2(numpes), to pretend we are a hypercube */
2045   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2046     Cmi_dim++ ;
2047  /* CmiSpanTreeInit();*/
2048   request_max=MAX_QLEN;
2049   CmiGetArgInt(argv,"+requestmax",&request_max);
2050   /*printf("request max=%d\n", request_max);*/
2051
2052   /* checksum flag */
2053   if (CmiGetArgFlag(argv,"+checksum")) {
2054 #if !CMK_OPTIMIZE
2055     checksum_flag = 1;
2056     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2057 #else
2058     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2059 #endif
2060   }
2061
2062   {
2063   int debug = CmiGetArgFlag(argv,"++debug");
2064   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2065   if (debug || debug_no_pause)
2066   {   /*Pause so user has a chance to start and attach debugger*/
2067 #if CMK_HAS_GETPID
2068     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2069     fflush(stdout);
2070     if (!debug_no_pause)
2071       sleep(15);
2072 #else
2073     printf("++debug ignored.\n");
2074 #endif
2075   }
2076   }
2077
2078 #if MPI_POST_RECV_COUNT > 0
2079
2080   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2081   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2082   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2083   CpvInitialize(char*,CmiPostedRecvBuffers);
2084
2085     /* Post some extra recvs to help out with incoming messages */
2086     /* On some MPIs the messages are unexpected and thus slow */
2087
2088     /* An array of request handles for posted recvs */
2089     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2090
2091     /* An array of buffers for posted recvs */
2092     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2093
2094     /* Post Recvs */
2095     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2096         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2097                     MPI_POST_RECV_SIZE,
2098                     MPI_BYTE,
2099                     MPI_ANY_SOURCE,
2100                     POST_RECV_TAG,
2101                     MPI_COMM_WORLD,
2102                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2103           CmiAbort("MPI_Irecv failed\n");
2104     }
2105
2106 #endif
2107
2108
2109
2110   /* CmiTimerInit(); */
2111
2112 #if 0
2113   CthInit(argv);
2114   ConverseCommonInit(argv);
2115
2116   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2117   if (initret==0) {
2118     fn(CmiGetArgc(argv), argv);
2119     if (usched==0) CsdScheduler(-1);
2120     ConverseExit();
2121   }
2122 #endif
2123
2124   CsvInitialize(CmiNodeState, NodeState);
2125   CmiNodeStateInit(&CsvAccess(NodeState));
2126
2127   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2128
2129   for (i=0; i<_Cmi_mynodesize+1; i++) {
2130 #if MULTI_SENDQUEUE
2131     procState[i].sendMsgBuf = PCQueueCreate();
2132 #endif
2133     procState[i].recvLock = CmiCreateLock();
2134   }
2135 #if CMK_SMP
2136 #if !MULTI_SENDQUEUE
2137   sendMsgBuf = PCQueueCreate();
2138   sendMsgBufLock = CmiCreateLock();
2139 #endif
2140   exitLock = CmiCreateLock();            /* exit count lock */
2141 #endif
2142
2143   /* Network progress function is used to poll the network when for
2144      messages. This flushes receive buffers on some  implementations*/
2145   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2146   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2147
2148   CmiStartThreads(argv);
2149   ConverseRunPE(initret);
2150 }
2151
2152 /***********************************************************************
2153  *
2154  * Abort function:
2155  *
2156  ************************************************************************/
2157
2158 void CmiAbort(const char *message)
2159 {
2160   char *m;
2161   /* if CharmDebug is attached simply try to send a message to it */
2162 #if CMK_CCS_AVAILABLE
2163   if (CpvAccess(cmiArgDebugFlag)) {
2164     CpdNotify(CPD_ABORT, message);
2165     CpdFreeze();
2166   }
2167 #endif  
2168   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2169         "Reason: %s\n",CmiMyPe(),message);
2170  /*  CmiError(message); */
2171   CmiPrintStackTrace(0);
2172   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2173   CmiSetHandler(m, machine_exit_idx);
2174   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2175   machine_exit(m);
2176   /* Program never reaches here */
2177   MPI_Abort(MPI_COMM_WORLD, 1);
2178 }
2179
2180
2181 #if 0
2182
2183 /* ****************************************************************** */
2184 /*    The following internal functions implement recd msg queue       */
2185 /* ****************************************************************** */
2186
2187 static void ** AllocBlock(unsigned int len)
2188 {
2189   void ** blk;
2190
2191   blk=(void **)CmiAlloc(len*sizeof(void *));
2192   if(blk==(void **)0) {
2193     CmiError("Cannot Allocate Memory!\n");
2194     MPI_Abort(MPI_COMM_WORLD, 1);
2195   }
2196   return blk;
2197 }
2198
2199 static void
2200 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2201 {
2202   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2203   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2204 }
2205
2206 void recdQueueInit(void)
2207 {
2208   recdQueue_blk = AllocBlock(BLK_LEN);
2209   recdQueue_blk_len = BLK_LEN;
2210   recdQueue_first = 0;
2211   recdQueue_len = 0;
2212 }
2213
2214 void recdQueueAddToBack(void *element)
2215 {
2216 #if NODE_0_IS_CONVHOST
2217   inside_comm = 1;
2218 #endif
2219   if(recdQueue_len==recdQueue_blk_len) {
2220     void **blk;
2221     recdQueue_blk_len *= 3;
2222     blk = AllocBlock(recdQueue_blk_len);
2223     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2224     CmiFree(recdQueue_blk);
2225     recdQueue_blk = blk;
2226     recdQueue_first = 0;
2227   }
2228   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2229 #if NODE_0_IS_CONVHOST
2230   inside_comm = 0;
2231 #endif
2232 }
2233
2234
2235 void * recdQueueRemoveFromFront(void)
2236 {
2237   if(recdQueue_len) {
2238     void *element;
2239     element = recdQueue_blk[recdQueue_first++];
2240     recdQueue_first %= recdQueue_blk_len;
2241     recdQueue_len--;
2242     return element;
2243   }
2244   return 0;
2245 }
2246
2247 #endif
2248
2249 /*@}*/