Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #define CMK_BROADCAST_SPANNING_TREE    1
96 #define CMK_BROADCAST_HYPERCUBE        0
97
98 #define BROADCAST_SPANNING_FACTOR      4
99 /* The number of children used when a msg is broadcast inside a node */
100 #define BROADCAST_SPANNING_INTRA_FACTOR      8
101
102 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
103 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
107
108 /* FIXME: need a random number that everyone agrees ! */
109 #define CHARM_MAGIC_NUMBER               126
110
111 #if CMK_ERROR_CHECKING
112 static int checksum_flag = 0;
113 #define CMI_SET_CHECKSUM(msg, len)      \
114         if (checksum_flag)  {   \
115           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
116           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
117         }
118 #define CMI_CHECK_CHECKSUM(msg, len)    \
119         if (checksum_flag)      \
120           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
121             CmiAbort("Fatal error: checksum doesn't agree!\n");
122 #else
123 #define CMI_SET_CHECKSUM(msg, len)
124 #define CMI_CHECK_CHECKSUM(msg, len)
125 #endif
126
127 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
128 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
129 #else
130 #define CMI_SET_BROADCAST_ROOT(msg, root) 
131 #endif
132
133
134 /** 
135     If MPI_POST_RECV is defined, we provide default values for size 
136     and number of posted recieves. If MPI_POST_RECV_COUNT is set
137     then a default value for MPI_POST_RECV_SIZE is used if not specified
138     by the user.
139 */
140 #ifdef MPI_POST_RECV
141 #define MPI_POST_RECV_COUNT 10
142 #undef MPI_POST_RECV
143 #endif
144 #if MPI_POST_RECV_COUNT > 0
145 #warning "Using MPI posted receives which have not yet been tested"
146 #ifndef MPI_POST_RECV_SIZE
147 #define MPI_POST_RECV_SIZE 200
148 #endif
149 /* #undef  MPI_POST_RECV_DEBUG  */
150 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
151 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
152 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
153 CpvDeclare(char*,CmiPostedRecvBuffers);
154 #endif
155
156 /*
157  to avoid MPI's in order delivery, changing MPI Tag all the time
158 */
159 #define TAG     1375
160
161 #if MPI_POST_RECV_COUNT > 0
162 #define POST_RECV_TAG TAG+1
163 #define BARRIER_ZERO_TAG TAG
164 #else
165 #define BARRIER_ZERO_TAG     1375
166 #endif
167
168 #include <signal.h>
169 void (*signal_int)(int);
170
171 /*
172 static int mpi_tag = TAG;
173 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
174 */
175
176 static int        _thread_provided = -1;
177 int               _Cmi_numpes;
178 int               _Cmi_mynode;    /* Which address space am I */
179 int               _Cmi_mynodesize;/* Number of processors in my address space */
180 int               _Cmi_numnodes;  /* Total number of address spaces */
181 int               _Cmi_numpes;    /* Total number of processors */
182 static int        Cmi_nodestart; /* First processor in this address space */
183 CpvDeclare(void*, CmiLocalQueue);
184
185 /*Network progress utility variables. Period controls the rate at
186   which the network poll is called */
187 CpvDeclare(unsigned , networkProgressCount);
188 int networkProgressPeriod;
189
190 int               idleblock = 0;
191
192 #define BLK_LEN  512
193
194 #if CMK_NODE_QUEUE_AVAILABLE
195 #define DGRAM_NODEMESSAGE   (0xFB)
196
197 #define NODE_BROADCAST_OTHERS (-1)
198 #define NODE_BROADCAST_ALL    (-2)
199 #endif
200
201 #if 0
202 static void **recdQueue_blk;
203 static unsigned int recdQueue_blk_len;
204 static unsigned int recdQueue_first;
205 static unsigned int recdQueue_len;
206 static void recdQueueInit(void);
207 static void recdQueueAddToBack(void *element);
208 static void *recdQueueRemoveFromFront(void);
209 #endif
210
211 static void ConverseRunPE(int everReturn);
212 static void CommunicationServer(int sleepTime);
213 static void CommunicationServerThread(int sleepTime);
214
215 typedef struct msg_list {
216      char *msg;
217      struct msg_list *next;
218      int size, destpe;
219
220 #if CMK_SMP_TRACE_COMMTHREAD
221         int srcpe;
222 #endif  
223         
224      MPI_Request req;
225 } SMSG_LIST;
226
227 int MsgQueueLen=0;
228 static int request_max;
229
230 static SMSG_LIST *sent_msgs=0;
231 static SMSG_LIST *end_sent=0;
232
233 static int Cmi_dim;
234
235 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
236
237 #if NODE_0_IS_CONVHOST
238 int inside_comm = 0;
239 #endif
240
241 void CmiAbort(const char *message);
242 static void PerrorExit(const char *msg);
243
244 void SendSpanningChildren(int size, char *msg);
245 void SendHypercube(int size, char *msg);
246
247 static void PerrorExit(const char *msg)
248 {
249   perror(msg);
250   exit(1);
251 }
252
253 extern unsigned char computeCheckSum(unsigned char *data, int len);
254
255 /**************************  TIMER FUNCTIONS **************************/
256
257 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
258
259 /* MPI calls are not threadsafe, even the timer on some machines */
260 static CmiNodeLock  timerLock = 0;
261 static double starttimer = 0;
262 static int _is_global = 0;
263
264 int CmiTimerIsSynchronized()
265 {
266   int  flag;
267   void *v;
268
269   /*  check if it using synchronized timer */
270   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
271     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
272   if (flag) {
273     _is_global = *(int*)v;
274     if (_is_global && CmiMyPe() == 0)
275       printf("Charm++> MPI timer is synchronized!\n");
276   }
277   return _is_global;
278 }
279
280 void CmiTimerInit()
281 {
282   _is_global = CmiTimerIsSynchronized();
283
284   if (_is_global) {
285     if (CmiMyRank() == 0) {
286       double minTimer;
287 #if CMK_TIMER_USE_XT3_DCLOCK
288       starttimer = dclock();
289 #else
290       starttimer = MPI_Wtime();
291 #endif
292
293       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
294                                   MPI_COMM_WORLD );
295       starttimer = minTimer;
296     }
297   }
298   else {  /* we don't have a synchronous timer, set our own start time */
299     CmiBarrier();
300     CmiBarrier();
301     CmiBarrier();
302 #if CMK_TIMER_USE_XT3_DCLOCK
303     starttimer = dclock();
304 #else
305     starttimer = MPI_Wtime();
306 #endif
307   }
308
309 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
310   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
311     timerLock = CmiCreateLock();
312 #endif
313   CmiNodeAllBarrier();          /* for smp */
314 }
315
316 /**
317  * Since the timerLock is never created, and is
318  * always NULL, then all the if-condition inside
319  * the timer functions could be disabled right
320  * now in the case of SMP. --Chao Mei
321  */
322 double CmiTimer(void)
323 {
324   double t;
325 #if 0 && CMK_SMP
326   if (timerLock) CmiLock(timerLock);
327 #endif
328 #if CMK_TIMER_USE_XT3_DCLOCK
329   t = dclock() - starttimer;
330 #else
331   t = MPI_Wtime() - starttimer;
332 #endif
333
334 #if 0 && CMK_SMP
335   if (timerLock) CmiUnlock(timerLock);
336 #endif
337
338   return t;
339 }
340
341 double CmiWallTimer(void)
342 {
343   double t;
344 #if 0 && CMK_SMP
345   if (timerLock) CmiLock(timerLock);
346 #endif
347 #if CMK_TIMER_USE_XT3_DCLOCK
348   t = dclock() - starttimer;
349 #else
350   t = MPI_Wtime() - starttimer;
351 #endif
352 #if 0 && CMK_SMP
353   if (timerLock) CmiUnlock(timerLock);
354 #endif
355   return t;
356 }
357
358 double CmiCpuTimer(void)
359 {
360   double t;
361 #if 0 && CMK_SMP
362   if (timerLock) CmiLock(timerLock);
363 #endif
364 #if CMK_TIMER_USE_XT3_DCLOCK
365   t = dclock() - starttimer;
366 #else
367   t = MPI_Wtime() - starttimer;
368 #endif
369 #if 0 && CMK_SMP
370   if (timerLock) CmiUnlock(timerLock);
371 #endif
372   return t;
373 }
374
375 #endif
376
377 /* must be called on all ranks including comm thread in SMP */
378 int CmiBarrier()
379 {
380 #if CMK_SMP
381     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
382   CmiNodeAllBarrier();
383   if (CmiMyRank() == CmiMyNodeSize()) 
384 #else
385   if (CmiMyRank() == 0) 
386 #endif
387   {
388 /**
389  *  The call of CmiBarrier is usually before the initialization
390  *  of trace module of Charm++, therefore, the START_EVENT
391  *  and END_EVENT are disabled here. -Chao Mei
392  */     
393     /*START_EVENT();*/
394
395     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
396         CmiAbort("Timernit: MPI_Barrier failed!\n");
397
398     /*END_EVENT(10);*/
399   }
400   CmiNodeAllBarrier();
401   return 0;
402 }
403
404 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
405 int CmiBarrierZero()
406 {
407   int i;
408 #if CMK_SMP
409   if (CmiMyRank() == CmiMyNodeSize()) 
410 #else
411   if (CmiMyRank() == 0) 
412 #endif
413   {
414     char msg[1];
415     MPI_Status sts;
416     if (CmiMyNode() == 0)  {
417       for (i=0; i<CmiNumNodes()-1; i++) {
418          START_EVENT();
419
420          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
421             CmiPrintf("MPI_Recv failed!\n");
422
423          END_EVENT(30);
424       }
425     }
426     else {
427       START_EVENT();
428
429       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
430          printf("MPI_Send failed!\n");
431
432       END_EVENT(20);
433     }
434   }
435   CmiNodeAllBarrier();
436   return 0;
437 }
438
439 typedef struct ProcState {
440 #if MULTI_SENDQUEUE
441 PCQueue      sendMsgBuf;       /* per processor message sending queue */
442 #endif
443 CmiNodeLock  recvLock;              /* for cs->recv */
444 } ProcState;
445
446 static ProcState  *procState;
447
448 #if CMK_SMP
449
450 #if !MULTI_SENDQUEUE
451 static PCQueue sendMsgBuf;
452 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
453 #endif
454
455 #endif
456
457 /************************************************************
458  *
459  * Processor state structure
460  *
461  ************************************************************/
462
463 /* fake Cmi_charmrun_fd */
464 static int Cmi_charmrun_fd = 0;
465 #include "machine-smp.c"
466
467 CsvDeclare(CmiNodeState, NodeState);
468
469 #include "immediate.c"
470
471 #if CMK_SHARED_VARS_UNAVAILABLE
472 /************ non SMP **************/
473 static struct CmiStateStruct Cmi_state;
474 int _Cmi_mype;
475 int _Cmi_myrank;
476
477 void CmiMemLock() {}
478 void CmiMemUnlock() {}
479
480 #define CmiGetState() (&Cmi_state)
481 #define CmiGetStateN(n) (&Cmi_state)
482
483 void CmiYield(void) { sleep(0); }
484
485 static void CmiStartThreads(char **argv)
486 {
487   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
488   _Cmi_mype = Cmi_nodestart;
489   _Cmi_myrank = 0;
490 }
491 #endif  /* non smp */
492
493 /*Add a message to this processor's receive queue, pe is a rank */
494 void CmiPushPE(int pe,void *msg)
495 {
496   CmiState cs = CmiGetStateN(pe);
497   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
498 #if CMK_IMMEDIATE_MSG
499   if (CmiIsImmediate(msg)) {
500 /*
501 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
502     CmiHandleMessage(msg);
503 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
504 */
505     /**(CmiUInt2 *)msg = pe;*/
506     CMI_DEST_RANK(msg) = pe;
507     CmiPushImmediateMsg(msg);
508     return;
509   }
510 #endif
511
512 #if CMK_SMP
513   CmiLock(procState[pe].recvLock);
514 #endif
515   PCQueuePush(cs->recv,msg);
516 #if CMK_SMP
517   CmiUnlock(procState[pe].recvLock);
518 #endif
519   CmiIdleLock_addMessage(&cs->idle);
520   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
521 }
522
523 #if CMK_NODE_QUEUE_AVAILABLE
524 /*Add a message to this processor's receive queue */
525 static void CmiPushNode(void *msg)
526 {
527   MACHSTATE(3,"Pushing message into NodeRecv queue");
528 #if CMK_IMMEDIATE_MSG
529   if (CmiIsImmediate(msg)) {
530     CMI_DEST_RANK(msg) = 0;
531     CmiPushImmediateMsg(msg);
532     return;
533   }
534 #endif
535   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
536   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
537   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
538   {
539   CmiState cs=CmiGetStateN(0);
540   CmiIdleLock_addMessage(&cs->idle);
541   }
542 }
543 #endif
544
545 #ifndef CmiMyPe
546 int CmiMyPe(void)
547 {
548   return CmiGetState()->pe;
549 }
550 #endif
551
552 #ifndef CmiMyRank
553 int CmiMyRank(void)
554 {
555   return CmiGetState()->rank;
556 }
557 #endif
558
559 #ifndef CmiNodeFirst
560 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
561 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
562 #endif
563
564 #ifndef CmiNodeOf
565 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
566 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
567 #endif
568
569 static size_t CmiAllAsyncMsgsSent(void)
570 {
571    SMSG_LIST *msg_tmp = sent_msgs;
572    MPI_Status sts;
573    int done;
574
575    while(msg_tmp!=0) {
576     done = 0;
577     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
578       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
579     if(!done)
580       return 0;
581     msg_tmp = msg_tmp->next;
582 /*    MsgQueueLen--; ????? */
583    }
584    return 1;
585 }
586
587 int CmiAsyncMsgSent(CmiCommHandle c) {
588
589   SMSG_LIST *msg_tmp = sent_msgs;
590   int done;
591   MPI_Status sts;
592
593   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
594     msg_tmp = msg_tmp->next;
595   if(msg_tmp) {
596     done = 0;
597     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
598       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
599     return ((done)?1:0);
600   } else {
601     return 1;
602   }
603 }
604
605 void CmiReleaseCommHandle(CmiCommHandle c)
606 {
607   return;
608 }
609
610 #if CMK_BLUEGENEL
611 extern void MPID_Progress_test();
612 #endif
613
614 void CmiReleaseSentMessages(void)
615 {
616   SMSG_LIST *msg_tmp=sent_msgs;
617   SMSG_LIST *prev=0;
618   SMSG_LIST *temp;
619   int done;
620   MPI_Status sts;
621
622 #if CMK_BLUEGENEL
623   MPID_Progress_test();
624 #endif
625
626   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
627   while(msg_tmp!=0) {
628     done =0;
629 #if CMK_SMP_TRACE_COMMTHREAD
630     double startT = CmiWallTimer();
631 #endif
632     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
633       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
634     if(done) {
635       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
636       MsgQueueLen--;
637       /* Release the message */
638       temp = msg_tmp->next;
639       if(prev==0)  /* first message */
640         sent_msgs = temp;
641       else
642         prev->next = temp;
643       CmiFree(msg_tmp->msg);
644       CmiFree(msg_tmp);
645       msg_tmp = temp;
646     } else {
647       prev = msg_tmp;
648       msg_tmp = msg_tmp->next;
649     }
650 #if CMK_SMP_TRACE_COMMTHREAD
651     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
652 #endif
653   }
654   end_sent = prev;
655   MACHSTATE(2,"} CmiReleaseSentMessages end");
656 }
657
658 int PumpMsgs(void)
659 {
660   int nbytes, flg, res;
661   char *msg;
662   MPI_Status sts;
663   int recd=0;
664
665 #if CMI_EXERT_RECV_CAP
666   int recvCnt=0;
667 #endif
668         
669 #if CMK_BLUEGENEL
670   MPID_Progress_test();
671 #endif
672
673   MACHSTATE(2,"PumpMsgs begin {");
674
675         
676   while(1) {
677 #if CMI_EXERT_RECV_CAP
678         if(recvCnt==RECV_CAP) break;
679 #endif
680           
681     /* First check posted recvs then do  probe unmatched outstanding messages */
682 #if MPI_POST_RECV_COUNT > 0 
683     int completed_index=-1;
684     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
685         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
686     if(flg){
687         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
688             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
689
690         recd = 1;
691         msg = (char *) CmiAlloc(nbytes);
692         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
693         /* and repost the recv */
694
695         START_EVENT();
696
697         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
698             MPI_POST_RECV_SIZE,
699             MPI_BYTE,
700             MPI_ANY_SOURCE,
701             POST_RECV_TAG,
702             MPI_COMM_WORLD,
703             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
704                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
705
706         END_EVENT(50);
707
708         CpvAccess(Cmi_posted_recv_total)++;
709     }
710     else {
711         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
712         if(res != MPI_SUCCESS)
713         CmiAbort("MPI_Iprobe failed\n");
714         if(!flg) break;
715         recd = 1;
716         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
717         msg = (char *) CmiAlloc(nbytes);
718
719         START_EVENT();
720
721         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
722             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
723
724         END_EVENT(30);
725
726         CpvAccess(Cmi_unposted_recv_total)++;
727     }
728 #else
729     /* Original version */
730 #if CMK_SMP_TRACE_COMMTHREAD
731   double startT = CmiWallTimer(); 
732 #endif
733     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
734     if(res != MPI_SUCCESS)
735       CmiAbort("MPI_Iprobe failed\n");
736
737     if(!flg) break;
738 #if CMK_SMP_TRACE_COMMTHREAD
739     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
740 #endif
741
742     recd = 1;
743     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
744     msg = (char *) CmiAlloc(nbytes);
745     
746     START_EVENT();
747
748     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
749       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
750
751     /*END_EVENT(30);*/
752
753 #endif
754
755 #if CMK_SMP_TRACE_COMMTHREAD
756         traceBeginCommOp(msg);
757         traceChangeLastTimestamp(CpvAccess(projTraceStart));
758         traceEndCommOp(msg);
759         #if CMI_MPI_TRACE_MOREDETAILED
760         char tmp[32];
761         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
762         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
763         #endif
764 #endif  
765         
766     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
767     CMI_CHECK_CHECKSUM(msg, nbytes);
768 #if CMK_ERROR_CHECKING
769     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
770       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
771       CmiFree(msg);
772       CmiAbort("Abort!\n");
773       continue;
774     }
775 #endif
776         
777 #if CMK_BROADCAST_SPANNING_TREE
778     if (CMI_BROADCAST_ROOT(msg))
779       SendSpanningChildren(nbytes, msg);
780 #elif CMK_BROADCAST_HYPERCUBE
781     if (CMI_BROADCAST_ROOT(msg))
782       SendHypercube(nbytes, msg);
783 #endif
784         
785         /* In SMP mode, this push operation needs to be executed
786      * after forwarding broadcast messages. If it is executed
787      * earlier, then during the bcast msg forwarding period,    
788          * the msg could be already freed on the worker thread.
789          * As a result, the forwarded message could be wrong! 
790          * --Chao Mei
791          */
792 #if CMK_NODE_QUEUE_AVAILABLE
793     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
794       CmiPushNode(msg);
795     else
796 #endif
797         CmiPushPE(CMI_DEST_RANK(msg), msg);     
798         
799 #if CMI_EXERT_RECV_CAP
800         recvCnt++;
801 #endif  
802   }
803
804   
805 #if CMK_IMMEDIATE_MSG && !CMK_SMP
806   CmiHandleImmediate();
807 #endif
808   
809   MACHSTATE(2,"} PumpMsgs end ");
810   return recd;
811 }
812
813 /* blocking version */
814 static void PumpMsgsBlocking(void)
815 {
816   static int maxbytes = 20000000;
817   static char *buf = NULL;
818   int nbytes, flg;
819   MPI_Status sts;
820   char *msg;
821   int recd=0;
822
823   if (!PCQueueEmpty(CmiGetState()->recv)) return;
824   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
825   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
826   if (sent_msgs)  return;
827
828 #if 0
829   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
830 #endif
831
832   if (buf == NULL) {
833     buf = (char *) CmiAlloc(maxbytes);
834     _MEMCHECK(buf);
835   }
836
837
838 #if MPI_POST_RECV_COUNT > 0
839 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
840 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
841 #endif
842
843   START_EVENT();
844
845   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
846       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
847
848   /*END_EVENT(30);*/
849     
850    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
851    msg = (char *) CmiAlloc(nbytes);
852    memcpy(msg, buf, nbytes);
853
854 #if CMK_SMP_TRACE_COMMTHREAD
855         traceBeginCommOp(msg);
856         traceChangeLastTimestamp(CpvAccess(projTraceStart));
857         traceEndCommOp(msg);
858         #if CMI_MPI_TRACE_MOREDETAILED
859         char tmp[32];
860         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
861         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
862         #endif
863 #endif
864
865 #if CMK_BROADCAST_SPANNING_TREE
866    if (CMI_BROADCAST_ROOT(msg))
867       SendSpanningChildren(nbytes, msg);
868 #elif CMK_BROADCAST_HYPERCUBE
869    if (CMI_BROADCAST_ROOT(msg))
870       SendHypercube(nbytes, msg);
871 #endif
872   
873         /* In SMP mode, this push operation needs to be executed
874      * after forwarding broadcast messages. If it is executed
875      * earlier, then during the bcast msg forwarding period,    
876          * the msg could be already freed on the worker thread.
877          * As a result, the forwarded message could be wrong! 
878          * --Chao Mei
879          */  
880 #if CMK_NODE_QUEUE_AVAILABLE
881    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
882       CmiPushNode(msg);
883    else
884 #endif
885       CmiPushPE(CMI_DEST_RANK(msg), msg);
886 }
887
888 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
889
890 #if CMK_SMP
891
892 static int inexit = 0;
893 static CmiNodeLock  exitLock = 0;
894
895 static int MsgQueueEmpty()
896 {
897   int i;
898 #if MULTI_SENDQUEUE
899   for (i=0; i<_Cmi_mynodesize; i++)
900     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
901 #else
902   return PCQueueEmpty(sendMsgBuf);
903 #endif
904   return 1;
905 }
906
907 static int SendMsgBuf();
908
909 /* test if all processors recv queues are empty */
910 static int RecvQueueEmpty()
911 {
912   int i;
913   for (i=0; i<_Cmi_mynodesize; i++) {
914     CmiState cs=CmiGetStateN(i);
915     if (!PCQueueEmpty(cs->recv)) return 0;
916   }
917   return 1;
918 }
919
920 /**
921 CommunicationServer calls MPI to send messages in the queues and probe message from network.
922 */
923
924 #define REPORT_COMM_METRICS 0
925 #if REPORT_COMM_METRICS
926 static double pumptime = 0.0;
927 static double releasetime = 0.0;
928 static double sendtime = 0.0;
929 #endif
930
931 static void CommunicationServer(int sleepTime)
932 {
933   int static count=0;
934 /*
935   count ++;
936   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
937 */
938 #if REPORT_COMM_METRICS
939   double t1, t2, t3, t4;
940   t1 = CmiWallTimer();
941 #endif
942   PumpMsgs();
943 #if REPORT_COMM_METRICS
944   t2 = CmiWallTimer();
945 #endif
946   CmiReleaseSentMessages();
947 #if REPORT_COMM_METRICS
948   t3 = CmiWallTimer();
949 #endif
950   SendMsgBuf();
951 #if REPORT_COMM_METRICS
952   t4 = CmiWallTimer();
953   pumptime += (t2-t1);
954   releasetime += (t3-t2);
955   sendtime += (t4-t3);
956 #endif
957 /*
958   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
959 */
960   if (inexit == CmiMyNodeSize()) {
961     MACHSTATE(2, "CommunicationServer exiting {");
962 #if 0
963     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
964 #endif
965     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
966       CmiReleaseSentMessages();
967       SendMsgBuf();
968       PumpMsgs();
969     }
970     MACHSTATE(2, "CommunicationServer barrier begin {");
971
972     START_EVENT();
973
974     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
975       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
976
977     END_EVENT(10);
978
979     MACHSTATE(2, "} CommunicationServer barrier end");
980 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
981     if (CmiMyNode() == 0){
982       CmiPrintf("End of program\n");
983     }
984 #endif
985     MACHSTATE(2, "} CommunicationServer EXIT");
986
987     ConverseCommonExit();   
988 #if REPORT_COMM_METRICS
989     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
990 #endif
991
992 #if ! CMK_AUTOBUILD
993     signal(SIGINT, signal_int);
994     MPI_Finalize();
995     #endif
996     exit(0);
997   }
998 }
999
1000 #endif
1001
1002 static void CommunicationServerThread(int sleepTime)
1003 {
1004 #if CMK_SMP
1005   CommunicationServer(sleepTime);
1006 #endif
1007 #if CMK_IMMEDIATE_MSG
1008   CmiHandleImmediate();
1009 #endif
1010 }
1011
1012 #if CMK_NODE_QUEUE_AVAILABLE
1013 char *CmiGetNonLocalNodeQ(void)
1014 {
1015   CmiState cs = CmiGetState();
1016   char *result = 0;
1017   CmiIdleLock_checkMessage(&cs->idle);
1018 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1019     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1020     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1021     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1022     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1023     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1024 /*  }  */
1025
1026   return result;
1027 }
1028 #endif
1029
1030 void *CmiGetNonLocal(void)
1031 {
1032   static int count=0;
1033   CmiState cs = CmiGetState();
1034   void *msg;
1035
1036 #if ! CMK_SMP
1037   if (CmiNumPes() == 1) return NULL;
1038 #endif
1039
1040   CmiIdleLock_checkMessage(&cs->idle);
1041   /* although it seems that lock is not needed, I found it crashes very often
1042      on mpi-smp without lock */
1043
1044 #if ! CMK_SMP
1045   CmiReleaseSentMessages();
1046   PumpMsgs();
1047 #endif
1048
1049   /* CmiLock(procState[cs->rank].recvLock); */
1050   msg =  PCQueuePop(cs->recv);
1051   /* CmiUnlock(procState[cs->rank].recvLock); */
1052
1053 /*
1054   if (msg) {
1055     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1056   else {
1057     count++;
1058     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1059   }
1060 */
1061 #if ! CMK_SMP
1062   if (no_outstanding_sends) {
1063     while (MsgQueueLen>0) {
1064       CmiReleaseSentMessages();
1065       PumpMsgs();
1066     }
1067   }
1068
1069   if(!msg) {
1070     CmiReleaseSentMessages();
1071     if (PumpMsgs())
1072       return  PCQueuePop(cs->recv);
1073     else
1074       return 0;
1075   }
1076 #endif
1077
1078   return msg;
1079 }
1080
1081 /* called in non-smp mode */
1082 void CmiNotifyIdle(void)
1083 {
1084   CmiReleaseSentMessages();
1085   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1086 }
1087
1088
1089 /********************************************************
1090     The call to probe immediate messages has been renamed to
1091     CmiMachineProgressImpl
1092 ******************************************************/
1093 /* user call to handle immediate message, only useful in non SMP version
1094    using polling method to schedule message.
1095 */
1096 /*
1097 #if CMK_IMMEDIATE_MSG
1098 void CmiProbeImmediateMsg()
1099 {
1100 #if !CMK_SMP
1101   PumpMsgs();
1102   CmiHandleImmediate();
1103 #endif
1104 }
1105 #endif
1106 */
1107
1108 /* Network progress function is used to poll the network when for
1109    messages. This flushes receive buffers on some  implementations*/
1110 #if CMK_MACHINE_PROGRESS_DEFINED
1111 void CmiMachineProgressImpl()
1112 {
1113 #if !CMK_SMP
1114     PumpMsgs();
1115 #if CMK_IMMEDIATE_MSG
1116     CmiHandleImmediate();
1117 #endif
1118 #else
1119     /*Not implemented yet. Communication server does not seem to be
1120       thread safe, so only communication thread call it */
1121     if (CmiMyRank() == CmiMyNodeSize())
1122         CommunicationServerThread(0);
1123 #endif
1124 }
1125 #endif
1126
1127 /********************* MESSAGE SEND FUNCTIONS ******************/
1128
1129 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1130
1131 static void CmiSendSelf(char *msg)
1132 {
1133 #if CMK_IMMEDIATE_MSG
1134     if (CmiIsImmediate(msg)) {
1135       /* CmiBecomeNonImmediate(msg); */
1136       CmiPushImmediateMsg(msg);
1137       CmiHandleImmediate();
1138       return;
1139     }
1140 #endif
1141     CQdCreate(CpvAccess(cQdState), 1);
1142     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1143 }
1144
1145 void CmiSyncSendFn(int destPE, int size, char *msg)
1146 {
1147   CmiState cs = CmiGetState();
1148   char *dupmsg = (char *) CmiAlloc(size);
1149   memcpy(dupmsg, msg, size);
1150
1151   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1152
1153   if (cs->pe==destPE) {
1154     CmiSendSelf(dupmsg);
1155   }
1156   else
1157     CmiAsyncSendFn_(destPE, size, dupmsg);
1158 }
1159
1160 #if CMK_SMP
1161
1162 /* called by communication thread in SMP */
1163 static int SendMsgBuf()
1164 {
1165   SMSG_LIST *msg_tmp;
1166   char *msg;
1167   int node, rank, size;
1168   int i;
1169   int sent = 0;
1170
1171 #if CMI_EXERT_SEND_CAP
1172         int sentCnt = 0;
1173 #endif  
1174         
1175   MACHSTATE(2,"SendMsgBuf begin {");
1176 #if MULTI_SENDQUEUE
1177   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1178   {
1179     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1180     {
1181       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1182 #else
1183     /* single message sending queue */
1184     /* CmiLock(sendMsgBufLock); */
1185     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1186     /* CmiUnlock(sendMsgBufLock); */
1187     while (NULL != msg_tmp)
1188     {
1189 #endif
1190       node = msg_tmp->destpe;
1191       size = msg_tmp->size;
1192       msg = msg_tmp->msg;
1193       msg_tmp->next = 0;
1194       while (MsgQueueLen > request_max) {
1195         CmiReleaseSentMessages();
1196         PumpMsgs();
1197       }
1198       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1199 #if CMK_ERROR_CHECKING
1200       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1201 #endif
1202       CMI_SET_CHECKSUM(msg, size);
1203
1204 #if MPI_POST_RECV_COUNT > 0
1205         if(size <= MPI_POST_RECV_SIZE){
1206
1207           START_EVENT();
1208           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1209                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1210
1211           STOP_EVENT(40);
1212         }
1213         else {
1214             START_EVENT();
1215             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1216                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1217             STOP_EVENT(40);
1218         }
1219 #else
1220         START_EVENT();
1221         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1222             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1223         /*END_EVENT(40);*/
1224 #endif
1225         
1226 #if CMK_SMP_TRACE_COMMTHREAD
1227         traceBeginCommOp(msg);
1228         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1229         /* traceSendMsgComm must execute after traceBeginCommOp because
1230          * we pretend we execute an entry method, and inside this we
1231          * pretend we will send another message. Otherwise how could
1232          * a message creation just before an entry method invocation?
1233          * If such logic is broken, the projections will not trace
1234          * messages correctly! -Chao Mei
1235          */
1236         traceSendMsgComm(msg);
1237         traceEndCommOp(msg);
1238         #if CMI_MPI_TRACE_MOREDETAILED
1239         char tmp[64];
1240         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1241         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1242         #endif
1243 #endif
1244                 
1245                 
1246       MACHSTATE(3,"}MPI_send end");
1247       MsgQueueLen++;
1248       if(sent_msgs==0)
1249         sent_msgs = msg_tmp;
1250       else
1251         end_sent->next = msg_tmp;
1252       end_sent = msg_tmp;
1253       sent=1;
1254           
1255 #if CMI_EXERT_SEND_CAP    
1256           if(++sentCnt == SEND_CAP) break;
1257 #endif    
1258           
1259 #if ! MULTI_SENDQUEUE
1260       /* CmiLock(sendMsgBufLock); */
1261       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1262       /* CmiUnlock(sendMsgBufLock); */
1263 #endif
1264     }
1265 #if MULTI_SENDQUEUE
1266   }
1267 #endif
1268   MACHSTATE(2,"}SendMsgBuf end ");
1269   return sent;
1270 }
1271
1272 void EnqueueMsg(void *m, int size, int node)
1273 {
1274   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1275   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1276   msg_tmp->msg = m;
1277   msg_tmp->size = size;
1278   msg_tmp->destpe = node;
1279         
1280 #if CMK_SMP_TRACE_COMMTHREAD
1281         msg_tmp->srcpe = CmiMyPe();
1282 #endif  
1283
1284 #if MULTI_SENDQUEUE
1285   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1286 #else
1287   CmiLock(sendMsgBufLock);
1288   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1289   CmiUnlock(sendMsgBufLock);
1290 #endif
1291         
1292   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1293 }
1294
1295 #endif
1296
1297 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1298 {
1299   CmiState cs = CmiGetState();
1300   SMSG_LIST *msg_tmp;
1301   CmiUInt2  rank, node;
1302
1303   if(destPE == cs->pe) {
1304     char *dupmsg = (char *) CmiAlloc(size);
1305     memcpy(dupmsg, msg, size);
1306     CmiSendSelf(dupmsg);
1307     return 0;
1308   }
1309   CQdCreate(CpvAccess(cQdState), 1);
1310 #if CMK_SMP
1311   node = CmiNodeOf(destPE);
1312   rank = CmiRankOf(destPE);
1313   if (node == CmiMyNode())  {
1314     CmiPushPE(rank, msg);
1315     return 0;
1316   }
1317   CMI_DEST_RANK(msg) = rank;
1318   EnqueueMsg(msg, size, node);
1319   return 0;
1320 #else
1321   /* non smp */
1322   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1323   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1324   msg_tmp->msg = msg;
1325   msg_tmp->next = 0;
1326   while (MsgQueueLen > request_max) {
1327         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1328         CmiReleaseSentMessages();
1329         PumpMsgs();
1330   }
1331 #if CMK_ERROR_CHECKING
1332   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1333 #endif
1334   CMI_SET_CHECKSUM(msg, size);
1335
1336 #if MPI_POST_RECV_COUNT > 0
1337         if(size <= MPI_POST_RECV_SIZE){
1338
1339           START_EVENT();
1340           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1341                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1342           END_EVENT(40);
1343         }
1344         else {
1345           START_EVENT();
1346           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1347                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1348           END_EVENT(40);
1349         }
1350 #else
1351   START_EVENT();
1352   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1353     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1354   END_EVENT(40);
1355 #endif
1356
1357   MsgQueueLen++;
1358   if(sent_msgs==0)
1359     sent_msgs = msg_tmp;
1360   else
1361     end_sent->next = msg_tmp;
1362   end_sent = msg_tmp;
1363   return (CmiCommHandle) &(msg_tmp->req);
1364 #endif              /* non-smp */
1365 }
1366
1367 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1368 {
1369   CMI_SET_BROADCAST_ROOT(msg, 0);
1370   CmiAsyncSendFn_(destPE, size, msg);
1371 }
1372
1373 void CmiFreeSendFn(int destPE, int size, char *msg)
1374 {
1375   CmiState cs = CmiGetState();
1376   CMI_SET_BROADCAST_ROOT(msg, 0);
1377
1378   if (cs->pe==destPE) {
1379     CmiSendSelf(msg);
1380   } else {
1381     CmiAsyncSendFn_(destPE, size, msg);
1382   }
1383 }
1384
1385 /*********************** BROADCAST FUNCTIONS **********************/
1386
1387 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1388 void CmiSyncSendFn1(int destPE, int size, char *msg)
1389 {
1390   CmiState cs = CmiGetState();
1391   char *dupmsg = (char *) CmiAlloc(size);
1392   memcpy(dupmsg, msg, size);
1393   if (cs->pe==destPE)
1394     CmiSendSelf(dupmsg);
1395   else
1396     CmiAsyncSendFn_(destPE, size, dupmsg);
1397 }
1398
1399 /* send msg to its spanning children in broadcast. G. Zheng */
1400 void SendSpanningChildren(int size, char *msg)
1401 {
1402   CmiState cs = CmiGetState();
1403   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1404   int startnode = CmiNodeOf(startpe);
1405   int i, exceptRank;
1406         
1407    /* first send msgs to other nodes */
1408   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1409   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1410     int nd = CmiMyNode()-startnode;
1411     if (nd<0) nd+=CmiNumNodes();
1412     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1413     if (nd > CmiNumNodes() - 1) break;
1414     nd += startnode;
1415     nd = nd%CmiNumNodes();
1416     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1417         #if CMK_SMP
1418         /* always send to the first rank of other nodes */
1419         char *newmsg = CmiCopyMsg(msg, size);
1420         CMI_DEST_RANK(newmsg) = 0;
1421     EnqueueMsg(newmsg, size, nd);
1422         #else
1423         CmiSyncSendFn1(nd, size, msg);
1424         #endif
1425   }
1426 #if CMK_SMP  
1427    /* second send msgs to my peers on this node */
1428   /* FIXME: now it's just a flat p2p send!! When node size is large,
1429    * it should also be sent in a tree
1430    */
1431    exceptRank = CMI_DEST_RANK(msg);
1432    for(i=0; i<exceptRank; i++){
1433            CmiPushPE(i, CmiCopyMsg(msg, size));
1434    }
1435    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1436            CmiPushPE(i, CmiCopyMsg(msg, size));
1437    }
1438 #endif
1439 }
1440
1441 #include <math.h>
1442
1443 /* send msg along the hypercube in broadcast. (Sameer) */
1444 void SendHypercube(int size, char *msg)
1445 {
1446   CmiState cs = CmiGetState();
1447   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1448   int startnode = CmiNodeOf(startpe);
1449   int i, exceptRank, cnt, tmp, relPE;
1450   int dims=0;
1451
1452   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1453   tmp = CmiNumNodes()-1;
1454   while(tmp>0){
1455           dims++;
1456           tmp = tmp >> 1;
1457   }
1458   if(CmiNumNodes()==1) dims=1;
1459   
1460    /* first send msgs to other nodes */  
1461   relPE = CmiMyNode()-startnode;
1462   if(relPE < 0) relPE += CmiNumNodes();
1463   cnt=0;
1464   tmp = relPE;
1465   /* count how many zeros (in binary format) relPE has */
1466   for(i=0; i<dims; i++, cnt++){
1467     if(tmp & 1 == 1) break;
1468     tmp = tmp >> 1;
1469   }
1470   
1471   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1472   for (i = cnt-1; i >= 0; i--) {
1473     int nd = relPE + (1 << i);
1474         if(nd >= CmiNumNodes()) continue;
1475         nd = (nd+startnode)%CmiNumNodes();
1476         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1477 #if CMK_SMP
1478     /* always send to the first rank of other nodes */
1479     char *newmsg = CmiCopyMsg(msg, size);
1480     CMI_DEST_RANK(newmsg) = 0;
1481     EnqueueMsg(newmsg, size, nd);
1482 #else
1483         CmiSyncSendFn1(nd, size, msg);
1484 #endif
1485   }
1486   
1487 #if CMK_SMP
1488    /* second send msgs to my peers on this node */
1489    /* FIXME: now it's just a flat p2p send!! When node size is large,
1490     * it should also be sent in a tree
1491     */
1492    exceptRank = CMI_DEST_RANK(msg);
1493    for(i=0; i<exceptRank; i++){
1494            CmiPushPE(i, CmiCopyMsg(msg, size));
1495    }
1496    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1497            CmiPushPE(i, CmiCopyMsg(msg, size));
1498    }
1499 #endif
1500 }
1501
1502 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1503 {
1504   CmiState cs = CmiGetState();
1505
1506 #if CMK_SMP     
1507   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1508   CMI_DEST_RANK(msg) = CmiMyRank();
1509 #endif
1510         
1511 #if CMK_BROADCAST_SPANNING_TREE
1512   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1513   SendSpanningChildren(size, msg);
1514
1515 #elif CMK_BROADCAST_HYPERCUBE
1516   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1517   SendHypercube(size, msg);
1518
1519 #else
1520   int i;
1521
1522   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1523     CmiSyncSendFn(i, size,msg) ;
1524   for ( i=0; i<cs->pe; i++ )
1525     CmiSyncSendFn(i, size,msg) ;
1526 #endif
1527
1528   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1529 }
1530
1531
1532 /*  FIXME: luckily async is never used  G. Zheng */
1533 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1534 {
1535   CmiState cs = CmiGetState();
1536   int i ;
1537
1538   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1539     CmiAsyncSendFn(i,size,msg) ;
1540   for ( i=0; i<cs->pe; i++ )
1541     CmiAsyncSendFn(i,size,msg) ;
1542
1543   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1544   CmiAbort("CmiAsyncBroadcastFn should never be called");
1545   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1546 }
1547
1548 void CmiFreeBroadcastFn(int size, char *msg)
1549 {
1550    CmiSyncBroadcastFn(size,msg);
1551    CmiFree(msg);
1552 }
1553
1554 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1555 {
1556
1557 #if CMK_SMP     
1558   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1559   CMI_DEST_RANK(msg) = CmiMyRank();
1560 #endif
1561
1562 #if CMK_BROADCAST_SPANNING_TREE
1563   CmiState cs = CmiGetState();
1564   CmiSyncSendFn(cs->pe, size,msg) ;
1565   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1566   SendSpanningChildren(size, msg);
1567
1568 #elif CMK_BROADCAST_HYPERCUBE
1569   CmiState cs = CmiGetState();
1570   CmiSyncSendFn(cs->pe, size,msg) ;
1571   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1572   SendHypercube(size, msg);
1573
1574 #else
1575     int i ;
1576
1577   for ( i=0; i<_Cmi_numpes; i++ )
1578     CmiSyncSendFn(i,size,msg) ;
1579 #endif
1580
1581   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1582 }
1583
1584 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1585 {
1586   int i ;
1587
1588   for ( i=1; i<_Cmi_numpes; i++ )
1589     CmiAsyncSendFn(i,size,msg) ;
1590
1591   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1592
1593   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1594 }
1595
1596 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1597 {
1598 #if CMK_SMP     
1599   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1600   CMI_DEST_RANK(msg) = CmiMyRank();
1601 #endif
1602
1603 #if CMK_BROADCAST_SPANNING_TREE
1604   CmiState cs = CmiGetState();
1605   CmiSyncSendFn(cs->pe, size,msg) ;
1606   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1607   SendSpanningChildren(size, msg);
1608
1609 #elif CMK_BROADCAST_HYPERCUBE
1610   CmiState cs = CmiGetState();
1611   CmiSyncSendFn(cs->pe, size,msg) ;
1612   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1613   SendHypercube(size, msg);
1614
1615 #else
1616   int i ;
1617
1618   for ( i=0; i<_Cmi_numpes; i++ )
1619     CmiSyncSendFn(i,size,msg) ;
1620 #endif
1621   CmiFree(msg) ;
1622   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1623 }
1624
1625 #if CMK_NODE_QUEUE_AVAILABLE
1626
1627 static void CmiSendNodeSelf(char *msg)
1628 {
1629 #if CMK_IMMEDIATE_MSG
1630 #if 0
1631     if (CmiIsImmediate(msg) && !_immRunning) {
1632       /*CmiHandleImmediateMessage(msg); */
1633       CmiPushImmediateMsg(msg);
1634       CmiHandleImmediate();
1635       return;
1636     }
1637 #endif
1638     if (CmiIsImmediate(msg))
1639     {
1640       CmiPushImmediateMsg(msg);
1641       if (!_immRunning) CmiHandleImmediate();
1642       return;
1643     }
1644 #endif
1645     CQdCreate(CpvAccess(cQdState), 1);
1646     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1647     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1648     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1649 }
1650
1651 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1652 {
1653   int i;
1654   SMSG_LIST *msg_tmp;
1655   char *dupmsg;
1656
1657   CMI_SET_BROADCAST_ROOT(msg, 0);
1658   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1659   switch (dstNode) {
1660   case NODE_BROADCAST_ALL:
1661     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1662   case NODE_BROADCAST_OTHERS:
1663     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1664     for (i=0; i<_Cmi_numnodes; i++)
1665       if (i!=_Cmi_mynode) {
1666         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1667       }
1668     break;
1669   default:
1670     dupmsg = (char *)CmiCopyMsg(msg,size);
1671     if(dstNode == _Cmi_mynode) {
1672       CmiSendNodeSelf(dupmsg);
1673     }
1674     else {
1675       CQdCreate(CpvAccess(cQdState), 1);
1676       EnqueueMsg(dupmsg, size, dstNode);
1677     }
1678   }
1679   return 0;
1680 }
1681
1682 void CmiSyncNodeSendFn(int p, int s, char *m)
1683 {
1684   CmiAsyncNodeSendFn(p, s, m);
1685 }
1686
1687 /* need */
1688 void CmiFreeNodeSendFn(int p, int s, char *m)
1689 {
1690   CmiAsyncNodeSendFn(p, s, m);
1691   CmiFree(m);
1692 }
1693
1694 /* need */
1695 void CmiSyncNodeBroadcastFn(int s, char *m)
1696 {
1697   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1698 }
1699
1700 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1701 {
1702 }
1703
1704 /* need */
1705 void CmiFreeNodeBroadcastFn(int s, char *m)
1706 {
1707   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1708   CmiFree(m);
1709 }
1710
1711 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1712 {
1713   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1714 }
1715
1716 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1717 {
1718   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1719 }
1720
1721 /* need */
1722 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1723 {
1724   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1725   CmiFree(m);
1726 }
1727 #endif
1728
1729 /************************** MAIN ***********************************/
1730 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1731
1732 void ConverseExit(void)
1733 {
1734 #if ! CMK_SMP
1735   while(!CmiAllAsyncMsgsSent()) {
1736     PumpMsgs();
1737     CmiReleaseSentMessages();
1738   }
1739   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1740     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1741
1742   ConverseCommonExit();
1743 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1744   if (CmiMyPe() == 0){
1745     CmiPrintf("End of program\n");
1746 #if MPI_POST_RECV_COUNT > 0
1747     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1748 #endif
1749 }
1750 #endif
1751 #if ! CMK_AUTOBUILD
1752   signal(SIGINT, signal_int);
1753   MPI_Finalize();
1754 #endif
1755   exit(0);
1756
1757 #else
1758     /* SMP version, communication thread will exit */
1759   ConverseCommonExit();
1760   /* atomic increment */
1761   CmiLock(exitLock);
1762   inexit++;
1763   CmiUnlock(exitLock);
1764   while (1) CmiYield();
1765 #endif
1766 }
1767
1768 static void registerMPITraceEvents() {
1769 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1770     traceRegisterUserEvent("MPI_Barrier", 10);
1771     traceRegisterUserEvent("MPI_Send", 20);
1772     traceRegisterUserEvent("MPI_Recv", 30);
1773     traceRegisterUserEvent("MPI_Isend", 40);
1774     traceRegisterUserEvent("MPI_Irecv", 50);
1775     traceRegisterUserEvent("MPI_Test", 60);
1776     traceRegisterUserEvent("MPI_Iprobe", 70);
1777 #endif
1778 }
1779
1780
1781 static char     **Cmi_argv;
1782 static char     **Cmi_argvcopy;
1783 static CmiStartFn Cmi_startfn;   /* The start function */
1784 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1785
1786 typedef struct {
1787   int sleepMs; /*Milliseconds to sleep while idle*/
1788   int nIdles; /*Number of times we've been idle in a row*/
1789   CmiState cs; /*Machine state*/
1790 } CmiIdleState;
1791
1792 static CmiIdleState *CmiNotifyGetState(void)
1793 {
1794   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1795   s->sleepMs=0;
1796   s->nIdles=0;
1797   s->cs=CmiGetState();
1798   return s;
1799 }
1800
1801 static void CmiNotifyBeginIdle(CmiIdleState *s)
1802 {
1803   s->sleepMs=0;
1804   s->nIdles=0;
1805 }
1806
1807 static void CmiNotifyStillIdle(CmiIdleState *s)
1808 {
1809 #if ! CMK_SMP
1810   CmiReleaseSentMessages();
1811   PumpMsgs();
1812 #else
1813 /*  CmiYield();  */
1814 #endif
1815
1816 #if 1
1817   {
1818   int nSpins=20; /*Number of times to spin before sleeping*/
1819   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1820   s->nIdles++;
1821   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1822     s->sleepMs+=2;
1823     if (s->sleepMs>10) s->sleepMs=10;
1824   }
1825   /*Comm. thread will listen on sockets-- just sleep*/
1826   if (s->sleepMs>0) {
1827     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1828     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1829     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1830   }
1831   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1832   }
1833 #endif
1834 }
1835
1836 #if MACHINE_DEBUG_LOG
1837 FILE *debugLog = NULL;
1838 #endif
1839
1840 static int machine_exit_idx;
1841 static void machine_exit(char *m) {
1842   EmergencyExit();
1843   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1844   fflush(stdout);
1845   CmiNodeBarrier();
1846   if (CmiMyRank() == 0) {
1847     MPI_Barrier(MPI_COMM_WORLD);
1848     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1849     MPI_Abort(MPI_COMM_WORLD, 1);
1850   } else {
1851     while (1) CmiYield();
1852   }
1853 }
1854
1855 static void KillOnAllSigs(int sigNo) {
1856   static int already_in_signal_handler = 0;
1857   char *m;
1858   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1859   already_in_signal_handler = 1;
1860 #if CMK_CCS_AVAILABLE
1861   if (CpvAccess(cmiArgDebugFlag)) {
1862     CpdNotify(CPD_SIGNAL, sigNo);
1863     CpdFreeze();
1864   }
1865 #endif
1866   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1867       "Signal: %d\n",CmiMyPe(),sigNo);
1868   CmiPrintStackTrace(1);
1869
1870   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1871   CmiSetHandler(m, machine_exit_idx);
1872   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1873   machine_exit(m);
1874 }
1875
1876 static void ConverseRunPE(int everReturn)
1877 {
1878   CmiIdleState *s=CmiNotifyGetState();
1879   CmiState cs;
1880   char** CmiMyArgv;
1881
1882   CmiNodeAllBarrier();
1883
1884   cs = CmiGetState();
1885   CpvInitialize(void *,CmiLocalQueue);
1886   CpvAccess(CmiLocalQueue) = cs->localqueue;
1887
1888   if (CmiMyRank())
1889     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1890   else
1891     CmiMyArgv=Cmi_argv;
1892
1893   CthInit(CmiMyArgv);
1894
1895   ConverseCommonInit(CmiMyArgv);
1896   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1897
1898 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1899   CpvInitialize(double, projTraceStart);
1900   /* only PE 0 needs to care about registration (to generate sts file). */
1901   if (CmiMyPe() == 0) {
1902     registerMachineUserEventsFunction(&registerMPITraceEvents);
1903   }
1904 #endif
1905
1906   /* initialize the network progress counter*/
1907   /* Network progress function is used to poll the network when for
1908      messages. This flushes receive buffers on some  implementations*/
1909   CpvInitialize(int , networkProgressCount);
1910   CpvAccess(networkProgressCount) = 0;
1911
1912 #if CMK_SMP
1913   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1914   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1915 #else
1916   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1917 #endif
1918
1919 #if MACHINE_DEBUG_LOG
1920   if (CmiMyRank() == 0) {
1921     char ln[200];
1922     sprintf(ln,"debugLog.%d",CmiMyNode());
1923     debugLog=fopen(ln,"w");
1924   }
1925 #endif
1926
1927   /* Converse initialization finishes, immediate messages can be processed.
1928      node barrier previously should take care of the node synchronization */
1929   _immediateReady = 1;
1930
1931   /* communication thread */
1932   if (CmiMyRank() == CmiMyNodeSize()) {
1933     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1934     while (1) CommunicationServerThread(5);
1935   }
1936   else {  /* worker thread */
1937   if (!everReturn) {
1938     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1939     if (Cmi_usrsched==0) CsdScheduler(-1);
1940     ConverseExit();
1941   }
1942   }
1943 }
1944
1945 static char *thread_level_tostring(int thread_level)
1946 {
1947 #if CMK_MPI_INIT_THREAD
1948   switch (thread_level) {
1949   case MPI_THREAD_SINGLE:
1950       return "MPI_THREAD_SINGLE";
1951   case MPI_THREAD_FUNNELED:
1952       return "MPI_THREAD_FUNNELED";
1953   case MPI_THREAD_SERIALIZED:
1954       return "MPI_THREAD_SERIALIZED";
1955   case MPI_THREAD_MULTIPLE :
1956       return "MPI_THREAD_MULTIPLE ";
1957   default: {
1958       char *str = (char*)malloc(5);
1959       sprintf(str,"%d", thread_level);
1960       return str;
1961       }
1962   }
1963   return  "unknown";
1964 #else
1965   char *str = (char*)malloc(5);
1966   sprintf(str,"%d", thread_level);
1967   return str;
1968 #endif
1969 }
1970
1971 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1972 {
1973   int n,i;
1974   int ver, subver;
1975   int provided;
1976   int thread_level;
1977
1978 #if MACHINE_DEBUG
1979   debugLog=NULL;
1980 #endif
1981 #if CMK_USE_HP_MAIN_FIX
1982 #if FOR_CPLUS
1983   _main(argc,argv);
1984 #endif
1985 #endif
1986
1987 #if CMK_MPI_INIT_THREAD
1988 #if CMK_SMP
1989   thread_level = MPI_THREAD_FUNNELED;
1990 #else
1991   thread_level = MPI_THREAD_SINGLE;
1992 #endif
1993   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1994   _thread_provided = provided;
1995 #else
1996   MPI_Init(&argc, &argv);
1997   thread_level = 0;
1998   provided = -1;
1999 #endif
2000   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2001   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2002
2003   MPI_Get_version(&ver, &subver);
2004   if (_Cmi_mynode == 0) {
2005     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2006   }
2007
2008   /* processor per node */
2009   _Cmi_mynodesize = 1;
2010   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2011     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2012 #if ! CMK_SMP
2013   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2014     CmiAbort("+ppn cannot be used in non SMP version!\n");
2015 #endif
2016   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2017   if (idleblock && _Cmi_mynode == 0) {
2018     printf("Charm++: Running in idle blocking mode.\n");
2019   }
2020
2021   /* setup signal handlers */
2022   signal(SIGSEGV, KillOnAllSigs);
2023   signal(SIGFPE, KillOnAllSigs);
2024   signal(SIGILL, KillOnAllSigs);
2025   signal_int = signal(SIGINT, KillOnAllSigs);
2026   signal(SIGTERM, KillOnAllSigs);
2027   signal(SIGABRT, KillOnAllSigs);
2028 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2029   signal(SIGQUIT, KillOnAllSigs);
2030   signal(SIGBUS, KillOnAllSigs);
2031 /*#     if CMK_HANDLE_SIGUSR
2032   signal(SIGUSR1, HandleUserSignals);
2033   signal(SIGUSR2, HandleUserSignals);
2034 #     endif*/
2035 #   endif /*UNIX*/
2036   
2037 #if CMK_NO_OUTSTANDING_SENDS
2038   no_outstanding_sends=1;
2039 #endif
2040   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2041     no_outstanding_sends = 1;
2042     if (_Cmi_mynode == 0)
2043       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2044         no_outstanding_sends?"":" not");
2045   }
2046   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2047   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2048   Cmi_argvcopy = CmiCopyArgs(argv);
2049   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2050   /* find dim = log2(numpes), to pretend we are a hypercube */
2051   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2052     Cmi_dim++ ;
2053  /* CmiSpanTreeInit();*/
2054   request_max=MAX_QLEN;
2055   CmiGetArgInt(argv,"+requestmax",&request_max);
2056   /*printf("request max=%d\n", request_max);*/
2057
2058   /* checksum flag */
2059   if (CmiGetArgFlag(argv,"+checksum")) {
2060 #if !CMK_OPTIMIZE
2061     checksum_flag = 1;
2062     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2063 #else
2064     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2065 #endif
2066   }
2067
2068   {
2069   int debug = CmiGetArgFlag(argv,"++debug");
2070   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2071   if (debug || debug_no_pause)
2072   {   /*Pause so user has a chance to start and attach debugger*/
2073 #if CMK_HAS_GETPID
2074     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2075     fflush(stdout);
2076     if (!debug_no_pause)
2077       sleep(15);
2078 #else
2079     printf("++debug ignored.\n");
2080 #endif
2081   }
2082   }
2083
2084 #if MPI_POST_RECV_COUNT > 0
2085
2086   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2087   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2088   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2089   CpvInitialize(char*,CmiPostedRecvBuffers);
2090
2091     /* Post some extra recvs to help out with incoming messages */
2092     /* On some MPIs the messages are unexpected and thus slow */
2093
2094     /* An array of request handles for posted recvs */
2095     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2096
2097     /* An array of buffers for posted recvs */
2098     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2099
2100     /* Post Recvs */
2101     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2102         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2103                     MPI_POST_RECV_SIZE,
2104                     MPI_BYTE,
2105                     MPI_ANY_SOURCE,
2106                     POST_RECV_TAG,
2107                     MPI_COMM_WORLD,
2108                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2109           CmiAbort("MPI_Irecv failed\n");
2110     }
2111
2112 #endif
2113
2114
2115
2116   /* CmiTimerInit(); */
2117
2118 #if 0
2119   CthInit(argv);
2120   ConverseCommonInit(argv);
2121
2122   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2123   if (initret==0) {
2124     fn(CmiGetArgc(argv), argv);
2125     if (usched==0) CsdScheduler(-1);
2126     ConverseExit();
2127   }
2128 #endif
2129
2130   CsvInitialize(CmiNodeState, NodeState);
2131   CmiNodeStateInit(&CsvAccess(NodeState));
2132
2133   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2134
2135   for (i=0; i<_Cmi_mynodesize+1; i++) {
2136 #if MULTI_SENDQUEUE
2137     procState[i].sendMsgBuf = PCQueueCreate();
2138 #endif
2139     procState[i].recvLock = CmiCreateLock();
2140   }
2141 #if CMK_SMP
2142 #if !MULTI_SENDQUEUE
2143   sendMsgBuf = PCQueueCreate();
2144   sendMsgBufLock = CmiCreateLock();
2145 #endif
2146   exitLock = CmiCreateLock();            /* exit count lock */
2147 #endif
2148
2149   /* Network progress function is used to poll the network when for
2150      messages. This flushes receive buffers on some  implementations*/
2151   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2152   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2153
2154   CmiStartThreads(argv);
2155   ConverseRunPE(initret);
2156 }
2157
2158 /***********************************************************************
2159  *
2160  * Abort function:
2161  *
2162  ************************************************************************/
2163
2164 void CmiAbort(const char *message)
2165 {
2166   char *m;
2167   /* if CharmDebug is attached simply try to send a message to it */
2168 #if CMK_CCS_AVAILABLE
2169   if (CpvAccess(cmiArgDebugFlag)) {
2170     CpdNotify(CPD_ABORT, message);
2171     CpdFreeze();
2172   }
2173 #endif  
2174   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2175         "Reason: %s\n",CmiMyPe(),message);
2176  /*  CmiError(message); */
2177   CmiPrintStackTrace(0);
2178   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2179   CmiSetHandler(m, machine_exit_idx);
2180   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2181   machine_exit(m);
2182   /* Program never reaches here */
2183   MPI_Abort(MPI_COMM_WORLD, 1);
2184 }
2185
2186
2187 #if 0
2188
2189 /* ****************************************************************** */
2190 /*    The following internal functions implement recd msg queue       */
2191 /* ****************************************************************** */
2192
2193 static void ** AllocBlock(unsigned int len)
2194 {
2195   void ** blk;
2196
2197   blk=(void **)CmiAlloc(len*sizeof(void *));
2198   if(blk==(void **)0) {
2199     CmiError("Cannot Allocate Memory!\n");
2200     MPI_Abort(MPI_COMM_WORLD, 1);
2201   }
2202   return blk;
2203 }
2204
2205 static void
2206 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2207 {
2208   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2209   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2210 }
2211
2212 void recdQueueInit(void)
2213 {
2214   recdQueue_blk = AllocBlock(BLK_LEN);
2215   recdQueue_blk_len = BLK_LEN;
2216   recdQueue_first = 0;
2217   recdQueue_len = 0;
2218 }
2219
2220 void recdQueueAddToBack(void *element)
2221 {
2222 #if NODE_0_IS_CONVHOST
2223   inside_comm = 1;
2224 #endif
2225   if(recdQueue_len==recdQueue_blk_len) {
2226     void **blk;
2227     recdQueue_blk_len *= 3;
2228     blk = AllocBlock(recdQueue_blk_len);
2229     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2230     CmiFree(recdQueue_blk);
2231     recdQueue_blk = blk;
2232     recdQueue_first = 0;
2233   }
2234   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2235 #if NODE_0_IS_CONVHOST
2236   inside_comm = 0;
2237 #endif
2238 }
2239
2240
2241 void * recdQueueRemoveFromFront(void)
2242 {
2243   if(recdQueue_len) {
2244     void *element;
2245     element = recdQueue_blk[recdQueue_first++];
2246     recdQueue_first %= recdQueue_blk_len;
2247     recdQueue_len--;
2248     return element;
2249   }
2250   return 0;
2251 }
2252
2253 #endif
2254
2255 /*@}*/