Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #define CMK_TRACE_COMMOVERHEAD 0
66 #if CMK_TRACE_COMMOVERHEAD
67 #undef CMI_MPI_TRACE_USEREVENTS
68 #define CMI_MPI_TRACE_USEREVENTS 1
69 #endif
70
71 #include "machine.h"
72
73 #include "pcqueue.h"
74
75 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
76
77 #if CMK_BLUEGENEL
78 #define MAX_QLEN 8
79 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
80 #else
81 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
82 #define MAX_QLEN 200
83 #endif
84
85 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
86 CpvStaticDeclare(double, projTraceStart);
87 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
88 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
89 #else
90 # define  START_EVENT()
91 # define  END_EVENT(x)
92 #endif
93
94 /*
95     To reduce the buffer used in broadcast and distribute the load from
96   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
97   spanning tree broadcast algorithm.
98     This will use the fourth short in message as an indicator of spanning tree
99   root.
100 */
101 #define CMK_BROADCAST_SPANNING_TREE    1
102 #define CMK_BROADCAST_HYPERCUBE        0
103
104 #define BROADCAST_SPANNING_FACTOR      4
105 /* The number of children used when a msg is broadcast inside a node */
106 #define BROADCAST_SPANNING_INTRA_FACTOR      8
107
108 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
109 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
110
111 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
112 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
113
114 /* FIXME: need a random number that everyone agrees ! */
115 #define CHARM_MAGIC_NUMBER               126
116
117 #if CMK_ERROR_CHECKING
118 static int checksum_flag = 0;
119 #define CMI_SET_CHECKSUM(msg, len)      \
120         if (checksum_flag)  {   \
121           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
122           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
123         }
124 #define CMI_CHECK_CHECKSUM(msg, len)    \
125         if (checksum_flag)      \
126           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
127             CmiAbort("Fatal error: checksum doesn't agree!\n");
128 #else
129 #define CMI_SET_CHECKSUM(msg, len)
130 #define CMI_CHECK_CHECKSUM(msg, len)
131 #endif
132
133 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
134 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
135 #else
136 #define CMI_SET_BROADCAST_ROOT(msg, root) 
137 #endif
138
139
140 /** 
141     If MPI_POST_RECV is defined, we provide default values for size 
142     and number of posted recieves. If MPI_POST_RECV_COUNT is set
143     then a default value for MPI_POST_RECV_SIZE is used if not specified
144     by the user.
145 */
146 #ifdef MPI_POST_RECV
147 #define MPI_POST_RECV_COUNT 10
148 #undef MPI_POST_RECV
149 #endif
150 #if MPI_POST_RECV_COUNT > 0
151 #warning "Using MPI posted receives which have not yet been tested"
152 #ifndef MPI_POST_RECV_SIZE
153 #define MPI_POST_RECV_SIZE 200
154 #endif
155 /* #undef  MPI_POST_RECV_DEBUG  */
156 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
157 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
158 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
159 CpvDeclare(char*,CmiPostedRecvBuffers);
160 #endif
161
162 /*
163  to avoid MPI's in order delivery, changing MPI Tag all the time
164 */
165 #define TAG     1375
166
167 #if MPI_POST_RECV_COUNT > 0
168 #define POST_RECV_TAG TAG+1
169 #define BARRIER_ZERO_TAG TAG
170 #else
171 #define BARRIER_ZERO_TAG     1375
172 #endif
173
174 #include <signal.h>
175 void (*signal_int)(int);
176
177 /*
178 static int mpi_tag = TAG;
179 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
180 */
181
182 static int        _thread_provided = -1;
183 int               _Cmi_numpes;
184 int               _Cmi_mynode;    /* Which address space am I */
185 int               _Cmi_mynodesize;/* Number of processors in my address space */
186 int               _Cmi_numnodes;  /* Total number of address spaces */
187 int               _Cmi_numpes;    /* Total number of processors */
188 static int        Cmi_nodestart; /* First processor in this address space */
189 CpvDeclare(void*, CmiLocalQueue);
190
191 /*Network progress utility variables. Period controls the rate at
192   which the network poll is called */
193 CpvDeclare(unsigned , networkProgressCount);
194 int networkProgressPeriod;
195
196 int               idleblock = 0;
197
198 #define BLK_LEN  512
199
200 #if CMK_NODE_QUEUE_AVAILABLE
201 #define DGRAM_NODEMESSAGE   (0xFB)
202
203 #define NODE_BROADCAST_OTHERS (-1)
204 #define NODE_BROADCAST_ALL    (-2)
205 #endif
206
207 #if 0
208 static void **recdQueue_blk;
209 static unsigned int recdQueue_blk_len;
210 static unsigned int recdQueue_first;
211 static unsigned int recdQueue_len;
212 static void recdQueueInit(void);
213 static void recdQueueAddToBack(void *element);
214 static void *recdQueueRemoveFromFront(void);
215 #endif
216
217 static void ConverseRunPE(int everReturn);
218 static void CommunicationServer(int sleepTime);
219 static void CommunicationServerThread(int sleepTime);
220
221 typedef struct msg_list {
222      char *msg;
223      struct msg_list *next;
224      int size, destpe;
225
226 #if CMK_SMP_TRACE_COMMTHREAD
227         int srcpe;
228 #endif  
229         
230      MPI_Request req;
231 } SMSG_LIST;
232
233 int MsgQueueLen=0;
234 static int request_max;
235
236 static SMSG_LIST *sent_msgs=0;
237 static SMSG_LIST *end_sent=0;
238
239 static int Cmi_dim;
240
241 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
242
243 #if NODE_0_IS_CONVHOST
244 int inside_comm = 0;
245 #endif
246
247 void CmiAbort(const char *message);
248 static void PerrorExit(const char *msg);
249
250 void SendSpanningChildren(int size, char *msg);
251 void SendHypercube(int size, char *msg);
252
253 static void PerrorExit(const char *msg)
254 {
255   perror(msg);
256   exit(1);
257 }
258
259 extern unsigned char computeCheckSum(unsigned char *data, int len);
260
261 /**************************  TIMER FUNCTIONS **************************/
262
263 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
264
265 /* MPI calls are not threadsafe, even the timer on some machines */
266 static CmiNodeLock  timerLock = 0;
267 static double starttimer = 0;
268 static int _is_global = 0;
269
270 int CmiTimerIsSynchronized()
271 {
272   int  flag;
273   void *v;
274
275   /*  check if it using synchronized timer */
276   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
277     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
278   if (flag) {
279     _is_global = *(int*)v;
280     if (_is_global && CmiMyPe() == 0)
281       printf("Charm++> MPI timer is synchronized!\n");
282   }
283   return _is_global;
284 }
285
286 void CmiTimerInit()
287 {
288   _is_global = CmiTimerIsSynchronized();
289
290   if (_is_global) {
291     if (CmiMyRank() == 0) {
292       double minTimer;
293 #if CMK_TIMER_USE_XT3_DCLOCK
294       starttimer = dclock();
295 #else
296       starttimer = MPI_Wtime();
297 #endif
298
299       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
300                                   MPI_COMM_WORLD );
301       starttimer = minTimer;
302     }
303   }
304   else {  /* we don't have a synchronous timer, set our own start time */
305     CmiBarrier();
306     CmiBarrier();
307     CmiBarrier();
308 #if CMK_TIMER_USE_XT3_DCLOCK
309     starttimer = dclock();
310 #else
311     starttimer = MPI_Wtime();
312 #endif
313   }
314
315 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
316   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
317     timerLock = CmiCreateLock();
318 #endif
319   CmiNodeAllBarrier();          /* for smp */
320 }
321
322 /**
323  * Since the timerLock is never created, and is
324  * always NULL, then all the if-condition inside
325  * the timer functions could be disabled right
326  * now in the case of SMP. --Chao Mei
327  */
328 double CmiTimer(void)
329 {
330   double t;
331 #if 0 && CMK_SMP
332   if (timerLock) CmiLock(timerLock);
333 #endif
334 #if CMK_TIMER_USE_XT3_DCLOCK
335   t = dclock() - starttimer;
336 #else
337   t = MPI_Wtime() - starttimer;
338 #endif
339
340 #if 0 && CMK_SMP
341   if (timerLock) CmiUnlock(timerLock);
342 #endif
343
344   return t;
345 }
346
347 double CmiWallTimer(void)
348 {
349   double t;
350 #if 0 && CMK_SMP
351   if (timerLock) CmiLock(timerLock);
352 #endif
353 #if CMK_TIMER_USE_XT3_DCLOCK
354   t = dclock() - starttimer;
355 #else
356   t = MPI_Wtime() - starttimer;
357 #endif
358 #if 0 && CMK_SMP
359   if (timerLock) CmiUnlock(timerLock);
360 #endif
361   return t;
362 }
363
364 double CmiCpuTimer(void)
365 {
366   double t;
367 #if 0 && CMK_SMP
368   if (timerLock) CmiLock(timerLock);
369 #endif
370 #if CMK_TIMER_USE_XT3_DCLOCK
371   t = dclock() - starttimer;
372 #else
373   t = MPI_Wtime() - starttimer;
374 #endif
375 #if 0 && CMK_SMP
376   if (timerLock) CmiUnlock(timerLock);
377 #endif
378   return t;
379 }
380
381 #endif
382
383 /* must be called on all ranks including comm thread in SMP */
384 int CmiBarrier()
385 {
386 #if CMK_SMP
387     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
388   CmiNodeAllBarrier();
389   if (CmiMyRank() == CmiMyNodeSize()) 
390 #else
391   if (CmiMyRank() == 0) 
392 #endif
393   {
394 /**
395  *  The call of CmiBarrier is usually before the initialization
396  *  of trace module of Charm++, therefore, the START_EVENT
397  *  and END_EVENT are disabled here. -Chao Mei
398  */     
399     /*START_EVENT();*/
400
401     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
402         CmiAbort("Timernit: MPI_Barrier failed!\n");
403
404     /*END_EVENT(10);*/
405   }
406   CmiNodeAllBarrier();
407   return 0;
408 }
409
410 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
411 int CmiBarrierZero()
412 {
413   int i;
414 #if CMK_SMP
415   if (CmiMyRank() == CmiMyNodeSize()) 
416 #else
417   if (CmiMyRank() == 0) 
418 #endif
419   {
420     char msg[1];
421     MPI_Status sts;
422     if (CmiMyNode() == 0)  {
423       for (i=0; i<CmiNumNodes()-1; i++) {
424          START_EVENT();
425
426          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
427             CmiPrintf("MPI_Recv failed!\n");
428
429          END_EVENT(30);
430       }
431     }
432     else {
433       START_EVENT();
434
435       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
436          printf("MPI_Send failed!\n");
437
438       END_EVENT(20);
439     }
440   }
441   CmiNodeAllBarrier();
442   return 0;
443 }
444
445 typedef struct ProcState {
446 #if MULTI_SENDQUEUE
447 PCQueue      sendMsgBuf;       /* per processor message sending queue */
448 #endif
449 CmiNodeLock  recvLock;              /* for cs->recv */
450 } ProcState;
451
452 static ProcState  *procState;
453
454 #if CMK_SMP
455
456 #if !MULTI_SENDQUEUE
457 static PCQueue sendMsgBuf;
458 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
459 #endif
460
461 #endif
462
463 /************************************************************
464  *
465  * Processor state structure
466  *
467  ************************************************************/
468
469 /* fake Cmi_charmrun_fd */
470 static int Cmi_charmrun_fd = 0;
471 #include "machine-smp.c"
472
473 CsvDeclare(CmiNodeState, NodeState);
474
475 #include "immediate.c"
476
477 #if CMK_SHARED_VARS_UNAVAILABLE
478 /************ non SMP **************/
479 static struct CmiStateStruct Cmi_state;
480 int _Cmi_mype;
481 int _Cmi_myrank;
482
483 void CmiMemLock() {}
484 void CmiMemUnlock() {}
485
486 #define CmiGetState() (&Cmi_state)
487 #define CmiGetStateN(n) (&Cmi_state)
488
489 void CmiYield(void) { sleep(0); }
490
491 static void CmiStartThreads(char **argv)
492 {
493   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
494   _Cmi_mype = Cmi_nodestart;
495   _Cmi_myrank = 0;
496 }
497 #endif  /* non smp */
498
499 /*Add a message to this processor's receive queue, pe is a rank */
500 void CmiPushPE(int pe,void *msg)
501 {
502   CmiState cs = CmiGetStateN(pe);
503   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
504 #if CMK_IMMEDIATE_MSG
505   if (CmiIsImmediate(msg)) {
506 /*
507 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
508     CmiHandleMessage(msg);
509 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
510 */
511     /**(CmiUInt2 *)msg = pe;*/
512     CMI_DEST_RANK(msg) = pe;
513     CmiPushImmediateMsg(msg);
514     return;
515   }
516 #endif
517
518 #if CMK_SMP
519   CmiLock(procState[pe].recvLock);
520 #endif
521   PCQueuePush(cs->recv,msg);
522 #if CMK_SMP
523   CmiUnlock(procState[pe].recvLock);
524 #endif
525   CmiIdleLock_addMessage(&cs->idle);
526   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
527 }
528
529 #if CMK_NODE_QUEUE_AVAILABLE
530 /*Add a message to this processor's receive queue */
531 static void CmiPushNode(void *msg)
532 {
533   MACHSTATE(3,"Pushing message into NodeRecv queue");
534 #if CMK_IMMEDIATE_MSG
535   if (CmiIsImmediate(msg)) {
536     CMI_DEST_RANK(msg) = 0;
537     CmiPushImmediateMsg(msg);
538     return;
539   }
540 #endif
541   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
542   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
543   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
544   {
545   CmiState cs=CmiGetStateN(0);
546   CmiIdleLock_addMessage(&cs->idle);
547   }
548 }
549 #endif
550
551 #ifndef CmiMyPe
552 int CmiMyPe(void)
553 {
554   return CmiGetState()->pe;
555 }
556 #endif
557
558 #ifndef CmiMyRank
559 int CmiMyRank(void)
560 {
561   return CmiGetState()->rank;
562 }
563 #endif
564
565 #ifndef CmiNodeFirst
566 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
567 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
568 #endif
569
570 #ifndef CmiNodeOf
571 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
572 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
573 #endif
574
575 static size_t CmiAllAsyncMsgsSent(void)
576 {
577    SMSG_LIST *msg_tmp = sent_msgs;
578    MPI_Status sts;
579    int done;
580
581    while(msg_tmp!=0) {
582     done = 0;
583     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
584       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
585     if(!done)
586       return 0;
587     msg_tmp = msg_tmp->next;
588 /*    MsgQueueLen--; ????? */
589    }
590    return 1;
591 }
592
593 int CmiAsyncMsgSent(CmiCommHandle c) {
594
595   SMSG_LIST *msg_tmp = sent_msgs;
596   int done;
597   MPI_Status sts;
598
599   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
600     msg_tmp = msg_tmp->next;
601   if(msg_tmp) {
602     done = 0;
603     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
604       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
605     return ((done)?1:0);
606   } else {
607     return 1;
608   }
609 }
610
611 void CmiReleaseCommHandle(CmiCommHandle c)
612 {
613   return;
614 }
615
616 #if CMK_BLUEGENEL
617 extern void MPID_Progress_test();
618 #endif
619
620 void CmiReleaseSentMessages(void)
621 {
622   SMSG_LIST *msg_tmp=sent_msgs;
623   SMSG_LIST *prev=0;
624   SMSG_LIST *temp;
625   int done;
626   MPI_Status sts;
627
628 #if CMK_BLUEGENEL
629   MPID_Progress_test();
630 #endif
631
632   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
633   while(msg_tmp!=0) {
634     done =0;
635 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
636     double startT = CmiWallTimer();
637 #endif
638     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
639       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
640     if(done) {
641       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
642       MsgQueueLen--;
643       /* Release the message */
644       temp = msg_tmp->next;
645       if(prev==0)  /* first message */
646         sent_msgs = temp;
647       else
648         prev->next = temp;
649       CmiFree(msg_tmp->msg);
650       CmiFree(msg_tmp);
651       msg_tmp = temp;
652     } else {
653       prev = msg_tmp;
654       msg_tmp = msg_tmp->next;
655     }
656 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
657     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
658 #endif
659   }
660   end_sent = prev;
661   MACHSTATE(2,"} CmiReleaseSentMessages end");
662 }
663
664 int PumpMsgs(void)
665 {
666   int nbytes, flg, res;
667   char *msg;
668   MPI_Status sts;
669   int recd=0;
670
671 #if CMI_EXERT_RECV_CAP
672   int recvCnt=0;
673 #endif
674         
675 #if CMK_BLUEGENEL
676   MPID_Progress_test();
677 #endif
678
679   MACHSTATE(2,"PumpMsgs begin {");
680
681         
682   while(1) {
683 #if CMI_EXERT_RECV_CAP
684         if(recvCnt==RECV_CAP) break;
685 #endif
686           
687     /* First check posted recvs then do  probe unmatched outstanding messages */
688 #if MPI_POST_RECV_COUNT > 0 
689     int completed_index=-1;
690     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
691         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
692     if(flg){
693         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
694             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
695
696         recd = 1;
697         msg = (char *) CmiAlloc(nbytes);
698         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
699         /* and repost the recv */
700
701         START_EVENT();
702
703         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
704             MPI_POST_RECV_SIZE,
705             MPI_BYTE,
706             MPI_ANY_SOURCE,
707             POST_RECV_TAG,
708             MPI_COMM_WORLD,
709             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
710                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
711
712         END_EVENT(50);
713
714         CpvAccess(Cmi_posted_recv_total)++;
715     }
716     else {
717         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
718         if(res != MPI_SUCCESS)
719         CmiAbort("MPI_Iprobe failed\n");
720         if(!flg) break;
721         recd = 1;
722         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
723         msg = (char *) CmiAlloc(nbytes);
724
725         START_EVENT();
726
727         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
728             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
729
730         END_EVENT(30);
731
732         CpvAccess(Cmi_unposted_recv_total)++;
733     }
734 #else
735     /* Original version */
736 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
737   double startT = CmiWallTimer(); 
738 #endif
739     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
740     if(res != MPI_SUCCESS)
741       CmiAbort("MPI_Iprobe failed\n");
742
743     if(!flg) break;
744 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
745     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
746 #endif
747
748     recd = 1;
749     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
750     msg = (char *) CmiAlloc(nbytes);
751     
752     START_EVENT();
753
754     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
755       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
756
757     /*END_EVENT(30);*/
758
759 #endif
760
761 #if CMK_SMP_TRACE_COMMTHREAD
762         traceBeginCommOp(msg);
763         traceChangeLastTimestamp(CpvAccess(projTraceStart));
764         traceEndCommOp(msg);
765         #if CMI_MPI_TRACE_MOREDETAILED
766         char tmp[32];
767         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
768         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
769         #endif
770 #elif CMK_TRACE_COMMOVERHEAD
771         char tmp[32];
772         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
773         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
774 #endif
775         
776         
777     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
778     CMI_CHECK_CHECKSUM(msg, nbytes);
779 #if CMK_ERROR_CHECKING
780     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
781       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
782       CmiFree(msg);
783       CmiAbort("Abort!\n");
784       continue;
785     }
786 #endif
787         
788 #if CMK_BROADCAST_SPANNING_TREE
789     if (CMI_BROADCAST_ROOT(msg))
790       SendSpanningChildren(nbytes, msg);
791 #elif CMK_BROADCAST_HYPERCUBE
792     if (CMI_BROADCAST_ROOT(msg))
793       SendHypercube(nbytes, msg);
794 #endif
795         
796         /* In SMP mode, this push operation needs to be executed
797      * after forwarding broadcast messages. If it is executed
798      * earlier, then during the bcast msg forwarding period,    
799          * the msg could be already freed on the worker thread.
800          * As a result, the forwarded message could be wrong! 
801          * --Chao Mei
802          */
803 #if CMK_NODE_QUEUE_AVAILABLE
804     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
805       CmiPushNode(msg);
806     else
807 #endif
808         CmiPushPE(CMI_DEST_RANK(msg), msg);     
809         
810 #if CMI_EXERT_RECV_CAP
811         recvCnt++;
812 #endif  
813   }
814
815   
816 #if CMK_IMMEDIATE_MSG && !CMK_SMP
817   CmiHandleImmediate();
818 #endif
819   
820   MACHSTATE(2,"} PumpMsgs end ");
821   return recd;
822 }
823
824 /* blocking version */
825 static void PumpMsgsBlocking(void)
826 {
827   static int maxbytes = 20000000;
828   static char *buf = NULL;
829   int nbytes, flg;
830   MPI_Status sts;
831   char *msg;
832   int recd=0;
833
834   if (!PCQueueEmpty(CmiGetState()->recv)) return;
835   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
836   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
837   if (sent_msgs)  return;
838
839 #if 0
840   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
841 #endif
842
843   if (buf == NULL) {
844     buf = (char *) CmiAlloc(maxbytes);
845     _MEMCHECK(buf);
846   }
847
848
849 #if MPI_POST_RECV_COUNT > 0
850 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
851 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
852 #endif
853
854   START_EVENT();
855
856   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
857       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
858
859   /*END_EVENT(30);*/
860     
861    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
862    msg = (char *) CmiAlloc(nbytes);
863    memcpy(msg, buf, nbytes);
864
865 #if CMK_SMP_TRACE_COMMTHREAD
866         traceBeginCommOp(msg);
867         traceChangeLastTimestamp(CpvAccess(projTraceStart));
868         traceEndCommOp(msg);
869         #if CMI_MPI_TRACE_MOREDETAILED
870         char tmp[32];
871         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
872         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
873         #endif
874 #endif
875
876 #if CMK_BROADCAST_SPANNING_TREE
877    if (CMI_BROADCAST_ROOT(msg))
878       SendSpanningChildren(nbytes, msg);
879 #elif CMK_BROADCAST_HYPERCUBE
880    if (CMI_BROADCAST_ROOT(msg))
881       SendHypercube(nbytes, msg);
882 #endif
883   
884         /* In SMP mode, this push operation needs to be executed
885      * after forwarding broadcast messages. If it is executed
886      * earlier, then during the bcast msg forwarding period,    
887          * the msg could be already freed on the worker thread.
888          * As a result, the forwarded message could be wrong! 
889          * --Chao Mei
890          */  
891 #if CMK_NODE_QUEUE_AVAILABLE
892    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
893       CmiPushNode(msg);
894    else
895 #endif
896       CmiPushPE(CMI_DEST_RANK(msg), msg);
897 }
898
899 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
900
901 #if CMK_SMP
902
903 static int inexit = 0;
904 static CmiNodeLock  exitLock = 0;
905
906 static int MsgQueueEmpty()
907 {
908   int i;
909 #if MULTI_SENDQUEUE
910   for (i=0; i<_Cmi_mynodesize; i++)
911     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
912 #else
913   return PCQueueEmpty(sendMsgBuf);
914 #endif
915   return 1;
916 }
917
918 static int SendMsgBuf();
919
920 /* test if all processors recv queues are empty */
921 static int RecvQueueEmpty()
922 {
923   int i;
924   for (i=0; i<_Cmi_mynodesize; i++) {
925     CmiState cs=CmiGetStateN(i);
926     if (!PCQueueEmpty(cs->recv)) return 0;
927   }
928   return 1;
929 }
930
931 /**
932 CommunicationServer calls MPI to send messages in the queues and probe message from network.
933 */
934
935 #define REPORT_COMM_METRICS 0
936 #if REPORT_COMM_METRICS
937 static double pumptime = 0.0;
938 static double releasetime = 0.0;
939 static double sendtime = 0.0;
940 #endif
941
942 static void CommunicationServer(int sleepTime)
943 {
944   int static count=0;
945 /*
946   count ++;
947   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
948 */
949 #if REPORT_COMM_METRICS
950   double t1, t2, t3, t4;
951   t1 = CmiWallTimer();
952 #endif
953   PumpMsgs();
954 #if REPORT_COMM_METRICS
955   t2 = CmiWallTimer();
956 #endif
957   CmiReleaseSentMessages();
958 #if REPORT_COMM_METRICS
959   t3 = CmiWallTimer();
960 #endif
961   SendMsgBuf();
962 #if REPORT_COMM_METRICS
963   t4 = CmiWallTimer();
964   pumptime += (t2-t1);
965   releasetime += (t3-t2);
966   sendtime += (t4-t3);
967 #endif
968 /*
969   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
970 */
971   if (inexit == CmiMyNodeSize()) {
972     MACHSTATE(2, "CommunicationServer exiting {");
973 #if 0
974     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
975 #endif
976     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
977       CmiReleaseSentMessages();
978       SendMsgBuf();
979       PumpMsgs();
980     }
981     MACHSTATE(2, "CommunicationServer barrier begin {");
982
983     START_EVENT();
984
985     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
986       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
987
988     END_EVENT(10);
989
990     MACHSTATE(2, "} CommunicationServer barrier end");
991 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
992     if (CmiMyNode() == 0){
993       CmiPrintf("End of program\n");
994     }
995 #endif
996     MACHSTATE(2, "} CommunicationServer EXIT");
997
998     ConverseCommonExit();   
999 #if REPORT_COMM_METRICS
1000     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1001 #endif
1002
1003 #if ! CMK_AUTOBUILD
1004     signal(SIGINT, signal_int);
1005     MPI_Finalize();
1006     #endif
1007     exit(0);
1008   }
1009 }
1010
1011 #endif
1012
1013 static void CommunicationServerThread(int sleepTime)
1014 {
1015 #if CMK_SMP
1016   CommunicationServer(sleepTime);
1017 #endif
1018 #if CMK_IMMEDIATE_MSG
1019   CmiHandleImmediate();
1020 #endif
1021 }
1022
1023 #if CMK_NODE_QUEUE_AVAILABLE
1024 char *CmiGetNonLocalNodeQ(void)
1025 {
1026   CmiState cs = CmiGetState();
1027   char *result = 0;
1028   CmiIdleLock_checkMessage(&cs->idle);
1029 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1030     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1031     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1032     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1033     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1034     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1035 /*  }  */
1036
1037   return result;
1038 }
1039 #endif
1040
1041 void *CmiGetNonLocal(void)
1042 {
1043   static int count=0;
1044   CmiState cs = CmiGetState();
1045   void *msg;
1046
1047 #if ! CMK_SMP
1048   if (CmiNumPes() == 1) return NULL;
1049 #endif
1050
1051   CmiIdleLock_checkMessage(&cs->idle);
1052   /* although it seems that lock is not needed, I found it crashes very often
1053      on mpi-smp without lock */
1054
1055 #if ! CMK_SMP
1056   CmiReleaseSentMessages();
1057   PumpMsgs();
1058 #endif
1059
1060   /* CmiLock(procState[cs->rank].recvLock); */
1061   msg =  PCQueuePop(cs->recv);
1062   /* CmiUnlock(procState[cs->rank].recvLock); */
1063
1064 /*
1065   if (msg) {
1066     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1067   else {
1068     count++;
1069     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1070   }
1071 */
1072 #if ! CMK_SMP
1073   if (no_outstanding_sends) {
1074     while (MsgQueueLen>0) {
1075       CmiReleaseSentMessages();
1076       PumpMsgs();
1077     }
1078   }
1079
1080   if(!msg) {
1081     CmiReleaseSentMessages();
1082     if (PumpMsgs())
1083       return  PCQueuePop(cs->recv);
1084     else
1085       return 0;
1086   }
1087 #endif
1088
1089   return msg;
1090 }
1091
1092 /* called in non-smp mode */
1093 void CmiNotifyIdle(void)
1094 {
1095   CmiReleaseSentMessages();
1096   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1097 }
1098
1099
1100 /********************************************************
1101     The call to probe immediate messages has been renamed to
1102     CmiMachineProgressImpl
1103 ******************************************************/
1104 /* user call to handle immediate message, only useful in non SMP version
1105    using polling method to schedule message.
1106 */
1107 /*
1108 #if CMK_IMMEDIATE_MSG
1109 void CmiProbeImmediateMsg()
1110 {
1111 #if !CMK_SMP
1112   PumpMsgs();
1113   CmiHandleImmediate();
1114 #endif
1115 }
1116 #endif
1117 */
1118
1119 /* Network progress function is used to poll the network when for
1120    messages. This flushes receive buffers on some  implementations*/
1121 #if CMK_MACHINE_PROGRESS_DEFINED
1122 void CmiMachineProgressImpl()
1123 {
1124 #if !CMK_SMP
1125     PumpMsgs();
1126 #if CMK_IMMEDIATE_MSG
1127     CmiHandleImmediate();
1128 #endif
1129 #else
1130     /*Not implemented yet. Communication server does not seem to be
1131       thread safe, so only communication thread call it */
1132     if (CmiMyRank() == CmiMyNodeSize())
1133         CommunicationServerThread(0);
1134 #endif
1135 }
1136 #endif
1137
1138 /********************* MESSAGE SEND FUNCTIONS ******************/
1139
1140 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1141
1142 static void CmiSendSelf(char *msg)
1143 {
1144 #if CMK_IMMEDIATE_MSG
1145     if (CmiIsImmediate(msg)) {
1146       /* CmiBecomeNonImmediate(msg); */
1147       CmiPushImmediateMsg(msg);
1148       CmiHandleImmediate();
1149       return;
1150     }
1151 #endif
1152     CQdCreate(CpvAccess(cQdState), 1);
1153     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1154 }
1155
1156 void CmiSyncSendFn(int destPE, int size, char *msg)
1157 {
1158   CmiState cs = CmiGetState();
1159   char *dupmsg = (char *) CmiAlloc(size);
1160   memcpy(dupmsg, msg, size);
1161
1162   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1163
1164   if (cs->pe==destPE) {
1165     CmiSendSelf(dupmsg);
1166   }
1167   else
1168     CmiAsyncSendFn_(destPE, size, dupmsg);
1169 }
1170
1171 #if CMK_SMP
1172
1173 /* called by communication thread in SMP */
1174 static int SendMsgBuf()
1175 {
1176   SMSG_LIST *msg_tmp;
1177   char *msg;
1178   int node, rank, size;
1179   int i;
1180   int sent = 0;
1181
1182 #if CMI_EXERT_SEND_CAP
1183         int sentCnt = 0;
1184 #endif  
1185         
1186   MACHSTATE(2,"SendMsgBuf begin {");
1187 #if MULTI_SENDQUEUE
1188   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1189   {
1190     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1191     {
1192       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1193 #else
1194     /* single message sending queue */
1195     /* CmiLock(sendMsgBufLock); */
1196     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1197     /* CmiUnlock(sendMsgBufLock); */
1198     while (NULL != msg_tmp)
1199     {
1200 #endif
1201       node = msg_tmp->destpe;
1202       size = msg_tmp->size;
1203       msg = msg_tmp->msg;
1204       msg_tmp->next = 0;
1205       while (MsgQueueLen > request_max) {
1206         CmiReleaseSentMessages();
1207         PumpMsgs();
1208       }
1209       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1210 #if CMK_ERROR_CHECKING
1211       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1212 #endif
1213       CMI_SET_CHECKSUM(msg, size);
1214
1215 #if MPI_POST_RECV_COUNT > 0
1216         if(size <= MPI_POST_RECV_SIZE){
1217
1218           START_EVENT();
1219           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1220                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1221
1222           STOP_EVENT(40);
1223         }
1224         else {
1225             START_EVENT();
1226             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1227                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1228             STOP_EVENT(40);
1229         }
1230 #else
1231         START_EVENT();
1232         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1233             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1234         /*END_EVENT(40);*/
1235 #endif
1236         
1237 #if CMK_SMP_TRACE_COMMTHREAD
1238         traceBeginCommOp(msg);
1239         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1240         /* traceSendMsgComm must execute after traceBeginCommOp because
1241          * we pretend we execute an entry method, and inside this we
1242          * pretend we will send another message. Otherwise how could
1243          * a message creation just before an entry method invocation?
1244          * If such logic is broken, the projections will not trace
1245          * messages correctly! -Chao Mei
1246          */
1247         traceSendMsgComm(msg);
1248         traceEndCommOp(msg);
1249         #if CMI_MPI_TRACE_MOREDETAILED
1250         char tmp[64];
1251         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1252         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1253         #endif
1254 #endif
1255                 
1256                 
1257       MACHSTATE(3,"}MPI_send end");
1258       MsgQueueLen++;
1259       if(sent_msgs==0)
1260         sent_msgs = msg_tmp;
1261       else
1262         end_sent->next = msg_tmp;
1263       end_sent = msg_tmp;
1264       sent=1;
1265           
1266 #if CMI_EXERT_SEND_CAP    
1267           if(++sentCnt == SEND_CAP) break;
1268 #endif    
1269           
1270 #if ! MULTI_SENDQUEUE
1271       /* CmiLock(sendMsgBufLock); */
1272       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1273       /* CmiUnlock(sendMsgBufLock); */
1274 #endif
1275     }
1276 #if MULTI_SENDQUEUE
1277   }
1278 #endif
1279   MACHSTATE(2,"}SendMsgBuf end ");
1280   return sent;
1281 }
1282
1283 void EnqueueMsg(void *m, int size, int node)
1284 {
1285   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1286   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1287   msg_tmp->msg = m;
1288   msg_tmp->size = size;
1289   msg_tmp->destpe = node;
1290         
1291 #if CMK_SMP_TRACE_COMMTHREAD
1292         msg_tmp->srcpe = CmiMyPe();
1293 #endif  
1294
1295 #if MULTI_SENDQUEUE
1296   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1297 #else
1298   CmiLock(sendMsgBufLock);
1299   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1300   CmiUnlock(sendMsgBufLock);
1301 #endif
1302         
1303   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1304 }
1305
1306 #endif
1307
1308 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1309 {
1310   CmiState cs = CmiGetState();
1311   SMSG_LIST *msg_tmp;
1312   CmiUInt2  rank, node;
1313
1314   if(destPE == cs->pe) {
1315     char *dupmsg = (char *) CmiAlloc(size);
1316     memcpy(dupmsg, msg, size);
1317     CmiSendSelf(dupmsg);
1318     return 0;
1319   }
1320   CQdCreate(CpvAccess(cQdState), 1);
1321 #if CMK_SMP
1322   node = CmiNodeOf(destPE);
1323   rank = CmiRankOf(destPE);
1324   if (node == CmiMyNode())  {
1325     CmiPushPE(rank, msg);
1326     return 0;
1327   }
1328   CMI_DEST_RANK(msg) = rank;
1329   EnqueueMsg(msg, size, node);
1330   return 0;
1331 #else
1332   /* non smp */
1333   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1334   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1335   msg_tmp->msg = msg;
1336   msg_tmp->next = 0;
1337   while (MsgQueueLen > request_max) {
1338         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1339         CmiReleaseSentMessages();
1340         PumpMsgs();
1341   }
1342 #if CMK_ERROR_CHECKING
1343   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1344 #endif
1345   CMI_SET_CHECKSUM(msg, size);
1346
1347 #if MPI_POST_RECV_COUNT > 0
1348         if(size <= MPI_POST_RECV_SIZE){
1349
1350           START_EVENT();
1351           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1352                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1353           END_EVENT(40);
1354         }
1355         else {
1356           START_EVENT();
1357           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1358                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1359           END_EVENT(40);
1360         }
1361 #else
1362   START_EVENT();
1363   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1364     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1365   /*END_EVENT(40);*/
1366   #if CMK_TRACE_COMMOVERHEAD
1367         char tmp[64];
1368         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1369         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1370   #endif
1371 #endif
1372
1373   MsgQueueLen++;
1374   if(sent_msgs==0)
1375     sent_msgs = msg_tmp;
1376   else
1377     end_sent->next = msg_tmp;
1378   end_sent = msg_tmp;
1379   return (CmiCommHandle) &(msg_tmp->req);
1380 #endif              /* non-smp */
1381 }
1382
1383 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1384 {
1385   CMI_SET_BROADCAST_ROOT(msg, 0);
1386   CmiAsyncSendFn_(destPE, size, msg);
1387 }
1388
1389 void CmiFreeSendFn(int destPE, int size, char *msg)
1390 {
1391   CmiState cs = CmiGetState();
1392   CMI_SET_BROADCAST_ROOT(msg, 0);
1393
1394   if (cs->pe==destPE) {
1395     CmiSendSelf(msg);
1396   } else {
1397     CmiAsyncSendFn_(destPE, size, msg);
1398   }
1399 }
1400
1401 /*********************** BROADCAST FUNCTIONS **********************/
1402
1403 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1404 void CmiSyncSendFn1(int destPE, int size, char *msg)
1405 {
1406   CmiState cs = CmiGetState();
1407   char *dupmsg = (char *) CmiAlloc(size);
1408   memcpy(dupmsg, msg, size);
1409   if (cs->pe==destPE)
1410     CmiSendSelf(dupmsg);
1411   else
1412     CmiAsyncSendFn_(destPE, size, dupmsg);
1413 }
1414
1415 /* send msg to its spanning children in broadcast. G. Zheng */
1416 void SendSpanningChildren(int size, char *msg)
1417 {
1418   CmiState cs = CmiGetState();
1419   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1420   int startnode = CmiNodeOf(startpe);
1421   int i, exceptRank;
1422         
1423    /* first send msgs to other nodes */
1424   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1425   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1426     int nd = CmiMyNode()-startnode;
1427     if (nd<0) nd+=CmiNumNodes();
1428     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1429     if (nd > CmiNumNodes() - 1) break;
1430     nd += startnode;
1431     nd = nd%CmiNumNodes();
1432     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1433         #if CMK_SMP
1434         /* always send to the first rank of other nodes */
1435         char *newmsg = CmiCopyMsg(msg, size);
1436         CMI_DEST_RANK(newmsg) = 0;
1437     EnqueueMsg(newmsg, size, nd);
1438         #else
1439         CmiSyncSendFn1(nd, size, msg);
1440         #endif
1441   }
1442 #if CMK_SMP  
1443    /* second send msgs to my peers on this node */
1444   /* FIXME: now it's just a flat p2p send!! When node size is large,
1445    * it should also be sent in a tree
1446    */
1447    exceptRank = CMI_DEST_RANK(msg);
1448    for(i=0; i<exceptRank; i++){
1449            CmiPushPE(i, CmiCopyMsg(msg, size));
1450    }
1451    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1452            CmiPushPE(i, CmiCopyMsg(msg, size));
1453    }
1454 #endif
1455 }
1456
1457 #include <math.h>
1458
1459 /* send msg along the hypercube in broadcast. (Sameer) */
1460 void SendHypercube(int size, char *msg)
1461 {
1462   CmiState cs = CmiGetState();
1463   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1464   int startnode = CmiNodeOf(startpe);
1465   int i, exceptRank, cnt, tmp, relPE;
1466   int dims=0;
1467
1468   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1469   tmp = CmiNumNodes()-1;
1470   while(tmp>0){
1471           dims++;
1472           tmp = tmp >> 1;
1473   }
1474   if(CmiNumNodes()==1) dims=1;
1475   
1476    /* first send msgs to other nodes */  
1477   relPE = CmiMyNode()-startnode;
1478   if(relPE < 0) relPE += CmiNumNodes();
1479   cnt=0;
1480   tmp = relPE;
1481   /* count how many zeros (in binary format) relPE has */
1482   for(i=0; i<dims; i++, cnt++){
1483     if(tmp & 1 == 1) break;
1484     tmp = tmp >> 1;
1485   }
1486   
1487   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1488   for (i = cnt-1; i >= 0; i--) {
1489     int nd = relPE + (1 << i);
1490         if(nd >= CmiNumNodes()) continue;
1491         nd = (nd+startnode)%CmiNumNodes();
1492         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1493 #if CMK_SMP
1494     /* always send to the first rank of other nodes */
1495     char *newmsg = CmiCopyMsg(msg, size);
1496     CMI_DEST_RANK(newmsg) = 0;
1497     EnqueueMsg(newmsg, size, nd);
1498 #else
1499         CmiSyncSendFn1(nd, size, msg);
1500 #endif
1501   }
1502   
1503 #if CMK_SMP
1504    /* second send msgs to my peers on this node */
1505    /* FIXME: now it's just a flat p2p send!! When node size is large,
1506     * it should also be sent in a tree
1507     */
1508    exceptRank = CMI_DEST_RANK(msg);
1509    for(i=0; i<exceptRank; i++){
1510            CmiPushPE(i, CmiCopyMsg(msg, size));
1511    }
1512    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1513            CmiPushPE(i, CmiCopyMsg(msg, size));
1514    }
1515 #endif
1516 }
1517
1518 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1519 {
1520   CmiState cs = CmiGetState();
1521
1522 #if CMK_SMP     
1523   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1524   CMI_DEST_RANK(msg) = CmiMyRank();
1525 #endif
1526         
1527 #if CMK_BROADCAST_SPANNING_TREE
1528   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1529   SendSpanningChildren(size, msg);
1530
1531 #elif CMK_BROADCAST_HYPERCUBE
1532   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1533   SendHypercube(size, msg);
1534
1535 #else
1536   int i;
1537
1538   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1539     CmiSyncSendFn(i, size,msg) ;
1540   for ( i=0; i<cs->pe; i++ )
1541     CmiSyncSendFn(i, size,msg) ;
1542 #endif
1543
1544   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1545 }
1546
1547
1548 /*  FIXME: luckily async is never used  G. Zheng */
1549 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1550 {
1551   CmiState cs = CmiGetState();
1552   int i ;
1553
1554   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1555     CmiAsyncSendFn(i,size,msg) ;
1556   for ( i=0; i<cs->pe; i++ )
1557     CmiAsyncSendFn(i,size,msg) ;
1558
1559   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1560   CmiAbort("CmiAsyncBroadcastFn should never be called");
1561   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1562 }
1563
1564 void CmiFreeBroadcastFn(int size, char *msg)
1565 {
1566    CmiSyncBroadcastFn(size,msg);
1567    CmiFree(msg);
1568 }
1569
1570 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1571 {
1572
1573 #if CMK_SMP     
1574   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1575   CMI_DEST_RANK(msg) = CmiMyRank();
1576 #endif
1577
1578 #if CMK_BROADCAST_SPANNING_TREE
1579   CmiState cs = CmiGetState();
1580   CmiSyncSendFn(cs->pe, size,msg) ;
1581   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1582   SendSpanningChildren(size, msg);
1583
1584 #elif CMK_BROADCAST_HYPERCUBE
1585   CmiState cs = CmiGetState();
1586   CmiSyncSendFn(cs->pe, size,msg) ;
1587   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1588   SendHypercube(size, msg);
1589
1590 #else
1591     int i ;
1592
1593   for ( i=0; i<_Cmi_numpes; i++ )
1594     CmiSyncSendFn(i,size,msg) ;
1595 #endif
1596
1597   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1598 }
1599
1600 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1601 {
1602   int i ;
1603
1604   for ( i=1; i<_Cmi_numpes; i++ )
1605     CmiAsyncSendFn(i,size,msg) ;
1606
1607   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1608
1609   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1610 }
1611
1612 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1613 {
1614 #if CMK_SMP     
1615   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1616   CMI_DEST_RANK(msg) = CmiMyRank();
1617 #endif
1618
1619 #if CMK_BROADCAST_SPANNING_TREE
1620   CmiState cs = CmiGetState();
1621   CmiSyncSendFn(cs->pe, size,msg) ;
1622   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1623   SendSpanningChildren(size, msg);
1624
1625 #elif CMK_BROADCAST_HYPERCUBE
1626   CmiState cs = CmiGetState();
1627   CmiSyncSendFn(cs->pe, size,msg) ;
1628   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1629   SendHypercube(size, msg);
1630
1631 #else
1632   int i ;
1633
1634   for ( i=0; i<_Cmi_numpes; i++ )
1635     CmiSyncSendFn(i,size,msg) ;
1636 #endif
1637   CmiFree(msg) ;
1638   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1639 }
1640
1641 #if CMK_NODE_QUEUE_AVAILABLE
1642
1643 static void CmiSendNodeSelf(char *msg)
1644 {
1645 #if CMK_IMMEDIATE_MSG
1646 #if 0
1647     if (CmiIsImmediate(msg) && !_immRunning) {
1648       /*CmiHandleImmediateMessage(msg); */
1649       CmiPushImmediateMsg(msg);
1650       CmiHandleImmediate();
1651       return;
1652     }
1653 #endif
1654     if (CmiIsImmediate(msg))
1655     {
1656       CmiPushImmediateMsg(msg);
1657       if (!_immRunning) CmiHandleImmediate();
1658       return;
1659     }
1660 #endif
1661     CQdCreate(CpvAccess(cQdState), 1);
1662     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1663     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1664     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1665 }
1666
1667 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1668 {
1669   int i;
1670   SMSG_LIST *msg_tmp;
1671   char *dupmsg;
1672
1673   CMI_SET_BROADCAST_ROOT(msg, 0);
1674   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1675   switch (dstNode) {
1676   case NODE_BROADCAST_ALL:
1677     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1678   case NODE_BROADCAST_OTHERS:
1679     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1680     for (i=0; i<_Cmi_numnodes; i++)
1681       if (i!=_Cmi_mynode) {
1682         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1683       }
1684     break;
1685   default:
1686     dupmsg = (char *)CmiCopyMsg(msg,size);
1687     if(dstNode == _Cmi_mynode) {
1688       CmiSendNodeSelf(dupmsg);
1689     }
1690     else {
1691       CQdCreate(CpvAccess(cQdState), 1);
1692       EnqueueMsg(dupmsg, size, dstNode);
1693     }
1694   }
1695   return 0;
1696 }
1697
1698 void CmiSyncNodeSendFn(int p, int s, char *m)
1699 {
1700   CmiAsyncNodeSendFn(p, s, m);
1701 }
1702
1703 /* need */
1704 void CmiFreeNodeSendFn(int p, int s, char *m)
1705 {
1706   CmiAsyncNodeSendFn(p, s, m);
1707   CmiFree(m);
1708 }
1709
1710 /* need */
1711 void CmiSyncNodeBroadcastFn(int s, char *m)
1712 {
1713   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1714 }
1715
1716 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1717 {
1718 }
1719
1720 /* need */
1721 void CmiFreeNodeBroadcastFn(int s, char *m)
1722 {
1723   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1724   CmiFree(m);
1725 }
1726
1727 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1728 {
1729   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1730 }
1731
1732 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1733 {
1734   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1735 }
1736
1737 /* need */
1738 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1739 {
1740   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1741   CmiFree(m);
1742 }
1743 #endif
1744
1745 /************************** MAIN ***********************************/
1746 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1747
1748 void ConverseExit(void)
1749 {
1750 #if ! CMK_SMP
1751   while(!CmiAllAsyncMsgsSent()) {
1752     PumpMsgs();
1753     CmiReleaseSentMessages();
1754   }
1755   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1756     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1757
1758   ConverseCommonExit();
1759 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1760   if (CmiMyPe() == 0){
1761     CmiPrintf("End of program\n");
1762 #if MPI_POST_RECV_COUNT > 0
1763     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1764 #endif
1765 }
1766 #endif
1767 #if ! CMK_AUTOBUILD
1768   signal(SIGINT, signal_int);
1769   MPI_Finalize();
1770 #endif
1771   exit(0);
1772
1773 #else
1774     /* SMP version, communication thread will exit */
1775   ConverseCommonExit();
1776   /* atomic increment */
1777   CmiLock(exitLock);
1778   inexit++;
1779   CmiUnlock(exitLock);
1780   while (1) CmiYield();
1781 #endif
1782 }
1783
1784 static void registerMPITraceEvents() {
1785 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1786     traceRegisterUserEvent("MPI_Barrier", 10);
1787     traceRegisterUserEvent("MPI_Send", 20);
1788     traceRegisterUserEvent("MPI_Recv", 30);
1789     traceRegisterUserEvent("MPI_Isend", 40);
1790     traceRegisterUserEvent("MPI_Irecv", 50);
1791     traceRegisterUserEvent("MPI_Test", 60);
1792     traceRegisterUserEvent("MPI_Iprobe", 70);
1793 #endif
1794 }
1795
1796
1797 static char     **Cmi_argv;
1798 static char     **Cmi_argvcopy;
1799 static CmiStartFn Cmi_startfn;   /* The start function */
1800 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1801
1802 typedef struct {
1803   int sleepMs; /*Milliseconds to sleep while idle*/
1804   int nIdles; /*Number of times we've been idle in a row*/
1805   CmiState cs; /*Machine state*/
1806 } CmiIdleState;
1807
1808 static CmiIdleState *CmiNotifyGetState(void)
1809 {
1810   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1811   s->sleepMs=0;
1812   s->nIdles=0;
1813   s->cs=CmiGetState();
1814   return s;
1815 }
1816
1817 static void CmiNotifyBeginIdle(CmiIdleState *s)
1818 {
1819   s->sleepMs=0;
1820   s->nIdles=0;
1821 }
1822
1823 static void CmiNotifyStillIdle(CmiIdleState *s)
1824 {
1825 #if ! CMK_SMP
1826   CmiReleaseSentMessages();
1827   PumpMsgs();
1828 #else
1829 /*  CmiYield();  */
1830 #endif
1831
1832 #if 1
1833   {
1834   int nSpins=20; /*Number of times to spin before sleeping*/
1835   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1836   s->nIdles++;
1837   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1838     s->sleepMs+=2;
1839     if (s->sleepMs>10) s->sleepMs=10;
1840   }
1841   /*Comm. thread will listen on sockets-- just sleep*/
1842   if (s->sleepMs>0) {
1843     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1844     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1845     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1846   }
1847   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1848   }
1849 #endif
1850 }
1851
1852 #if MACHINE_DEBUG_LOG
1853 FILE *debugLog = NULL;
1854 #endif
1855
1856 static int machine_exit_idx;
1857 static void machine_exit(char *m) {
1858   EmergencyExit();
1859   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1860   fflush(stdout);
1861   CmiNodeBarrier();
1862   if (CmiMyRank() == 0) {
1863     MPI_Barrier(MPI_COMM_WORLD);
1864     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1865     MPI_Abort(MPI_COMM_WORLD, 1);
1866   } else {
1867     while (1) CmiYield();
1868   }
1869 }
1870
1871 static void KillOnAllSigs(int sigNo) {
1872   static int already_in_signal_handler = 0;
1873   char *m;
1874   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1875   already_in_signal_handler = 1;
1876 #if CMK_CCS_AVAILABLE
1877   if (CpvAccess(cmiArgDebugFlag)) {
1878     CpdNotify(CPD_SIGNAL, sigNo);
1879     CpdFreeze();
1880   }
1881 #endif
1882   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1883       "Signal: %d\n",CmiMyPe(),sigNo);
1884   CmiPrintStackTrace(1);
1885
1886   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1887   CmiSetHandler(m, machine_exit_idx);
1888   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1889   machine_exit(m);
1890 }
1891
1892 static void ConverseRunPE(int everReturn)
1893 {
1894   CmiIdleState *s=CmiNotifyGetState();
1895   CmiState cs;
1896   char** CmiMyArgv;
1897
1898   CmiNodeAllBarrier();
1899
1900   cs = CmiGetState();
1901   CpvInitialize(void *,CmiLocalQueue);
1902   CpvAccess(CmiLocalQueue) = cs->localqueue;
1903
1904   if (CmiMyRank())
1905     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1906   else
1907     CmiMyArgv=Cmi_argv;
1908
1909   CthInit(CmiMyArgv);
1910
1911   ConverseCommonInit(CmiMyArgv);
1912   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1913
1914 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1915   CpvInitialize(double, projTraceStart);
1916   /* only PE 0 needs to care about registration (to generate sts file). */
1917   if (CmiMyPe() == 0) {
1918     registerMachineUserEventsFunction(&registerMPITraceEvents);
1919   }
1920 #endif
1921
1922   /* initialize the network progress counter*/
1923   /* Network progress function is used to poll the network when for
1924      messages. This flushes receive buffers on some  implementations*/
1925   CpvInitialize(int , networkProgressCount);
1926   CpvAccess(networkProgressCount) = 0;
1927
1928 #if CMK_SMP
1929   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1930   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1931 #else
1932   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1933 #endif
1934
1935 #if MACHINE_DEBUG_LOG
1936   if (CmiMyRank() == 0) {
1937     char ln[200];
1938     sprintf(ln,"debugLog.%d",CmiMyNode());
1939     debugLog=fopen(ln,"w");
1940   }
1941 #endif
1942
1943   /* Converse initialization finishes, immediate messages can be processed.
1944      node barrier previously should take care of the node synchronization */
1945   _immediateReady = 1;
1946
1947   /* communication thread */
1948   if (CmiMyRank() == CmiMyNodeSize()) {
1949     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1950     while (1) CommunicationServerThread(5);
1951   }
1952   else {  /* worker thread */
1953   if (!everReturn) {
1954     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1955     if (Cmi_usrsched==0) CsdScheduler(-1);
1956     ConverseExit();
1957   }
1958   }
1959 }
1960
1961 static char *thread_level_tostring(int thread_level)
1962 {
1963 #if CMK_MPI_INIT_THREAD
1964   switch (thread_level) {
1965   case MPI_THREAD_SINGLE:
1966       return "MPI_THREAD_SINGLE";
1967   case MPI_THREAD_FUNNELED:
1968       return "MPI_THREAD_FUNNELED";
1969   case MPI_THREAD_SERIALIZED:
1970       return "MPI_THREAD_SERIALIZED";
1971   case MPI_THREAD_MULTIPLE :
1972       return "MPI_THREAD_MULTIPLE ";
1973   default: {
1974       char *str = (char*)malloc(5);
1975       sprintf(str,"%d", thread_level);
1976       return str;
1977       }
1978   }
1979   return  "unknown";
1980 #else
1981   char *str = (char*)malloc(5);
1982   sprintf(str,"%d", thread_level);
1983   return str;
1984 #endif
1985 }
1986
1987 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1988 {
1989   int n,i;
1990   int ver, subver;
1991   int provided;
1992   int thread_level;
1993
1994 #if MACHINE_DEBUG
1995   debugLog=NULL;
1996 #endif
1997 #if CMK_USE_HP_MAIN_FIX
1998 #if FOR_CPLUS
1999   _main(argc,argv);
2000 #endif
2001 #endif
2002
2003 #if CMK_MPI_INIT_THREAD
2004 #if CMK_SMP
2005   thread_level = MPI_THREAD_FUNNELED;
2006 #else
2007   thread_level = MPI_THREAD_SINGLE;
2008 #endif
2009   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2010   _thread_provided = provided;
2011 #else
2012   MPI_Init(&argc, &argv);
2013   thread_level = 0;
2014   provided = -1;
2015 #endif
2016   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2017   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2018
2019   MPI_Get_version(&ver, &subver);
2020   if (_Cmi_mynode == 0) {
2021     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2022   }
2023
2024   /* processor per node */
2025   _Cmi_mynodesize = 1;
2026   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2027     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2028 #if ! CMK_SMP
2029   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2030     CmiAbort("+ppn cannot be used in non SMP version!\n");
2031 #endif
2032   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2033   if (idleblock && _Cmi_mynode == 0) {
2034     printf("Charm++: Running in idle blocking mode.\n");
2035   }
2036
2037   /* setup signal handlers */
2038   signal(SIGSEGV, KillOnAllSigs);
2039   signal(SIGFPE, KillOnAllSigs);
2040   signal(SIGILL, KillOnAllSigs);
2041   signal_int = signal(SIGINT, KillOnAllSigs);
2042   signal(SIGTERM, KillOnAllSigs);
2043   signal(SIGABRT, KillOnAllSigs);
2044 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2045   signal(SIGQUIT, KillOnAllSigs);
2046   signal(SIGBUS, KillOnAllSigs);
2047 /*#     if CMK_HANDLE_SIGUSR
2048   signal(SIGUSR1, HandleUserSignals);
2049   signal(SIGUSR2, HandleUserSignals);
2050 #     endif*/
2051 #   endif /*UNIX*/
2052   
2053 #if CMK_NO_OUTSTANDING_SENDS
2054   no_outstanding_sends=1;
2055 #endif
2056   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2057     no_outstanding_sends = 1;
2058     if (_Cmi_mynode == 0)
2059       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2060         no_outstanding_sends?"":" not");
2061   }
2062   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2063   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2064   Cmi_argvcopy = CmiCopyArgs(argv);
2065   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2066   /* find dim = log2(numpes), to pretend we are a hypercube */
2067   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2068     Cmi_dim++ ;
2069  /* CmiSpanTreeInit();*/
2070   request_max=MAX_QLEN;
2071   CmiGetArgInt(argv,"+requestmax",&request_max);
2072   /*printf("request max=%d\n", request_max);*/
2073
2074   /* checksum flag */
2075   if (CmiGetArgFlag(argv,"+checksum")) {
2076 #if !CMK_OPTIMIZE
2077     checksum_flag = 1;
2078     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2079 #else
2080     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2081 #endif
2082   }
2083
2084   {
2085   int debug = CmiGetArgFlag(argv,"++debug");
2086   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2087   if (debug || debug_no_pause)
2088   {   /*Pause so user has a chance to start and attach debugger*/
2089 #if CMK_HAS_GETPID
2090     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2091     fflush(stdout);
2092     if (!debug_no_pause)
2093       sleep(15);
2094 #else
2095     printf("++debug ignored.\n");
2096 #endif
2097   }
2098   }
2099
2100 #if MPI_POST_RECV_COUNT > 0
2101
2102   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2103   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2104   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2105   CpvInitialize(char*,CmiPostedRecvBuffers);
2106
2107     /* Post some extra recvs to help out with incoming messages */
2108     /* On some MPIs the messages are unexpected and thus slow */
2109
2110     /* An array of request handles for posted recvs */
2111     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2112
2113     /* An array of buffers for posted recvs */
2114     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2115
2116     /* Post Recvs */
2117     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2118         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2119                     MPI_POST_RECV_SIZE,
2120                     MPI_BYTE,
2121                     MPI_ANY_SOURCE,
2122                     POST_RECV_TAG,
2123                     MPI_COMM_WORLD,
2124                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2125           CmiAbort("MPI_Irecv failed\n");
2126     }
2127
2128 #endif
2129
2130
2131
2132   /* CmiTimerInit(); */
2133
2134 #if 0
2135   CthInit(argv);
2136   ConverseCommonInit(argv);
2137
2138   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2139   if (initret==0) {
2140     fn(CmiGetArgc(argv), argv);
2141     if (usched==0) CsdScheduler(-1);
2142     ConverseExit();
2143   }
2144 #endif
2145
2146   CsvInitialize(CmiNodeState, NodeState);
2147   CmiNodeStateInit(&CsvAccess(NodeState));
2148
2149   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2150
2151   for (i=0; i<_Cmi_mynodesize+1; i++) {
2152 #if MULTI_SENDQUEUE
2153     procState[i].sendMsgBuf = PCQueueCreate();
2154 #endif
2155     procState[i].recvLock = CmiCreateLock();
2156   }
2157 #if CMK_SMP
2158 #if !MULTI_SENDQUEUE
2159   sendMsgBuf = PCQueueCreate();
2160   sendMsgBufLock = CmiCreateLock();
2161 #endif
2162   exitLock = CmiCreateLock();            /* exit count lock */
2163 #endif
2164
2165   /* Network progress function is used to poll the network when for
2166      messages. This flushes receive buffers on some  implementations*/
2167   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2168   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2169
2170   CmiStartThreads(argv);
2171   ConverseRunPE(initret);
2172 }
2173
2174 /***********************************************************************
2175  *
2176  * Abort function:
2177  *
2178  ************************************************************************/
2179
2180 void CmiAbort(const char *message)
2181 {
2182   char *m;
2183   /* if CharmDebug is attached simply try to send a message to it */
2184 #if CMK_CCS_AVAILABLE
2185   if (CpvAccess(cmiArgDebugFlag)) {
2186     CpdNotify(CPD_ABORT, message);
2187     CpdFreeze();
2188   }
2189 #endif  
2190   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2191         "Reason: %s\n",CmiMyPe(),message);
2192  /*  CmiError(message); */
2193   CmiPrintStackTrace(0);
2194   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2195   CmiSetHandler(m, machine_exit_idx);
2196   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2197   machine_exit(m);
2198   /* Program never reaches here */
2199   MPI_Abort(MPI_COMM_WORLD, 1);
2200 }
2201
2202
2203 #if 0
2204
2205 /* ****************************************************************** */
2206 /*    The following internal functions implement recd msg queue       */
2207 /* ****************************************************************** */
2208
2209 static void ** AllocBlock(unsigned int len)
2210 {
2211   void ** blk;
2212
2213   blk=(void **)CmiAlloc(len*sizeof(void *));
2214   if(blk==(void **)0) {
2215     CmiError("Cannot Allocate Memory!\n");
2216     MPI_Abort(MPI_COMM_WORLD, 1);
2217   }
2218   return blk;
2219 }
2220
2221 static void
2222 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2223 {
2224   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2225   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2226 }
2227
2228 void recdQueueInit(void)
2229 {
2230   recdQueue_blk = AllocBlock(BLK_LEN);
2231   recdQueue_blk_len = BLK_LEN;
2232   recdQueue_first = 0;
2233   recdQueue_len = 0;
2234 }
2235
2236 void recdQueueAddToBack(void *element)
2237 {
2238 #if NODE_0_IS_CONVHOST
2239   inside_comm = 1;
2240 #endif
2241   if(recdQueue_len==recdQueue_blk_len) {
2242     void **blk;
2243     recdQueue_blk_len *= 3;
2244     blk = AllocBlock(recdQueue_blk_len);
2245     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2246     CmiFree(recdQueue_blk);
2247     recdQueue_blk = blk;
2248     recdQueue_first = 0;
2249   }
2250   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2251 #if NODE_0_IS_CONVHOST
2252   inside_comm = 0;
2253 #endif
2254 }
2255
2256
2257 void * recdQueueRemoveFromFront(void)
2258 {
2259   if(recdQueue_len) {
2260     void *element;
2261     element = recdQueue_blk[recdQueue_first++];
2262     recdQueue_first %= recdQueue_blk_len;
2263     recdQueue_len--;
2264     return element;
2265   }
2266   return 0;
2267 }
2268
2269 #endif
2270
2271 /*@}*/