The implementation of SendHypercube is corrected to follow the algorithm exactly...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #define CMK_BROADCAST_SPANNING_TREE    1
96 #define CMK_BROADCAST_HYPERCUBE        0
97
98 #define BROADCAST_SPANNING_FACTOR      4
99 /* The number of children used when a msg is broadcast inside a node */
100 #define BROADCAST_SPANNING_INTRA_FACTOR      8
101
102 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
103 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
107
108 /* FIXME: need a random number that everyone agrees ! */
109 #define CHARM_MAGIC_NUMBER               126
110
111 #if !CMK_OPTIMIZE
112 static int checksum_flag = 0;
113 #define CMI_SET_CHECKSUM(msg, len)      \
114         if (checksum_flag)  {   \
115           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
116           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
117         }
118 #define CMI_CHECK_CHECKSUM(msg, len)    \
119         if (checksum_flag)      \
120           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
121             CmiAbort("Fatal error: checksum doesn't agree!\n");
122 #else
123 #define CMI_SET_CHECKSUM(msg, len)
124 #define CMI_CHECK_CHECKSUM(msg, len)
125 #endif
126
127 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
128 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
129 #endif
130
131
132 /** 
133     If MPI_POST_RECV is defined, we provide default values for size 
134     and number of posted recieves. If MPI_POST_RECV_COUNT is set
135     then a default value for MPI_POST_RECV_SIZE is used if not specified
136     by the user.
137 */
138 #ifdef MPI_POST_RECV
139 #define MPI_POST_RECV_COUNT 10
140 #undef MPI_POST_RECV
141 #endif
142 #if MPI_POST_RECV_COUNT > 0
143 #warning "Using MPI posted receives which have not yet been tested"
144 #ifndef MPI_POST_RECV_SIZE
145 #define MPI_POST_RECV_SIZE 200
146 #endif
147 /* #undef  MPI_POST_RECV_DEBUG  */
148 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
149 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
150 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
151 CpvDeclare(char*,CmiPostedRecvBuffers);
152 #endif
153
154 /*
155  to avoid MPI's in order delivery, changing MPI Tag all the time
156 */
157 #define TAG     1375
158
159 #if MPI_POST_RECV_COUNT > 0
160 #define POST_RECV_TAG TAG+1
161 #define BARRIER_ZERO_TAG TAG
162 #else
163 #define BARRIER_ZERO_TAG     1375
164 #endif
165
166 #include <signal.h>
167 void (*signal_int)(int);
168
169 /*
170 static int mpi_tag = TAG;
171 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
172 */
173
174 static int        _thread_provided = -1;
175 int               _Cmi_numpes;
176 int               _Cmi_mynode;    /* Which address space am I */
177 int               _Cmi_mynodesize;/* Number of processors in my address space */
178 int               _Cmi_numnodes;  /* Total number of address spaces */
179 int               _Cmi_numpes;    /* Total number of processors */
180 static int        Cmi_nodestart; /* First processor in this address space */
181 CpvDeclare(void*, CmiLocalQueue);
182
183 /*Network progress utility variables. Period controls the rate at
184   which the network poll is called */
185 CpvDeclare(unsigned , networkProgressCount);
186 int networkProgressPeriod;
187
188 int               idleblock = 0;
189
190 #define BLK_LEN  512
191
192 #if CMK_NODE_QUEUE_AVAILABLE
193 #define DGRAM_NODEMESSAGE   (0xFB)
194
195 #define NODE_BROADCAST_OTHERS (-1)
196 #define NODE_BROADCAST_ALL    (-2)
197 #endif
198
199 #if 0
200 static void **recdQueue_blk;
201 static unsigned int recdQueue_blk_len;
202 static unsigned int recdQueue_first;
203 static unsigned int recdQueue_len;
204 static void recdQueueInit(void);
205 static void recdQueueAddToBack(void *element);
206 static void *recdQueueRemoveFromFront(void);
207 #endif
208
209 static void ConverseRunPE(int everReturn);
210 static void CommunicationServer(int sleepTime);
211 static void CommunicationServerThread(int sleepTime);
212
213 typedef struct msg_list {
214      char *msg;
215      struct msg_list *next;
216      int size, destpe;
217
218 #if CMK_SMP_TRACE_COMMTHREAD
219         int srcpe;
220 #endif  
221         
222      MPI_Request req;
223 } SMSG_LIST;
224
225 int MsgQueueLen=0;
226 static int request_max;
227
228 static SMSG_LIST *sent_msgs=0;
229 static SMSG_LIST *end_sent=0;
230
231 static int Cmi_dim;
232
233 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
234
235 #if NODE_0_IS_CONVHOST
236 int inside_comm = 0;
237 #endif
238
239 void CmiAbort(const char *message);
240 static void PerrorExit(const char *msg);
241
242 void SendSpanningChildren(int size, char *msg);
243 void SendHypercube(int size, char *msg);
244
245 static void PerrorExit(const char *msg)
246 {
247   perror(msg);
248   exit(1);
249 }
250
251 extern unsigned char computeCheckSum(unsigned char *data, int len);
252
253 /**************************  TIMER FUNCTIONS **************************/
254
255 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
256
257 /* MPI calls are not threadsafe, even the timer on some machines */
258 static CmiNodeLock  timerLock = 0;
259 static double starttimer = 0;
260 static int _is_global = 0;
261
262 int CmiTimerIsSynchronized()
263 {
264   int  flag;
265   void *v;
266
267   /*  check if it using synchronized timer */
268   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
269     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
270   if (flag) {
271     _is_global = *(int*)v;
272     if (_is_global && CmiMyPe() == 0)
273       printf("Charm++> MPI timer is synchronized!\n");
274   }
275   return _is_global;
276 }
277
278 void CmiTimerInit()
279 {
280   _is_global = CmiTimerIsSynchronized();
281
282   if (_is_global) {
283     if (CmiMyRank() == 0) {
284       double minTimer;
285 #if CMK_TIMER_USE_XT3_DCLOCK
286       starttimer = dclock();
287 #else
288       starttimer = MPI_Wtime();
289 #endif
290
291       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
292                                   MPI_COMM_WORLD );
293       starttimer = minTimer;
294     }
295   }
296   else {  /* we don't have a synchronous timer, set our own start time */
297     CmiBarrier();
298     CmiBarrier();
299     CmiBarrier();
300 #if CMK_TIMER_USE_XT3_DCLOCK
301     starttimer = dclock();
302 #else
303     starttimer = MPI_Wtime();
304 #endif
305   }
306
307 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
308   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
309     timerLock = CmiCreateLock();
310 #endif
311   CmiNodeAllBarrier();          /* for smp */
312 }
313
314 /**
315  * Since the timerLock is never created, and is
316  * always NULL, then all the if-condition inside
317  * the timer functions could be disabled right
318  * now in the case of SMP. --Chao Mei
319  */
320 double CmiTimer(void)
321 {
322   double t;
323 #if 0 && CMK_SMP
324   if (timerLock) CmiLock(timerLock);
325 #endif
326 #if CMK_TIMER_USE_XT3_DCLOCK
327   t = dclock() - starttimer;
328 #else
329   t = MPI_Wtime() - starttimer;
330 #endif
331
332 #if 0 && CMK_SMP
333   if (timerLock) CmiUnlock(timerLock);
334 #endif
335
336   return t;
337 }
338
339 double CmiWallTimer(void)
340 {
341   double t;
342 #if 0 && CMK_SMP
343   if (timerLock) CmiLock(timerLock);
344 #endif
345 #if CMK_TIMER_USE_XT3_DCLOCK
346   t = dclock() - starttimer;
347 #else
348   t = MPI_Wtime() - starttimer;
349 #endif
350 #if 0 && CMK_SMP
351   if (timerLock) CmiUnlock(timerLock);
352 #endif
353   return t;
354 }
355
356 double CmiCpuTimer(void)
357 {
358   double t;
359 #if 0 && CMK_SMP
360   if (timerLock) CmiLock(timerLock);
361 #endif
362 #if CMK_TIMER_USE_XT3_DCLOCK
363   t = dclock() - starttimer;
364 #else
365   t = MPI_Wtime() - starttimer;
366 #endif
367 #if 0 && CMK_SMP
368   if (timerLock) CmiUnlock(timerLock);
369 #endif
370   return t;
371 }
372
373 #endif
374
375 /* must be called on all ranks including comm thread in SMP */
376 int CmiBarrier()
377 {
378 #if CMK_SMP
379     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
380   CmiNodeAllBarrier();
381   if (CmiMyRank() == CmiMyNodeSize()) 
382 #else
383   if (CmiMyRank() == 0) 
384 #endif
385   {
386 /**
387  *  The call of CmiBarrier is usually before the initialization
388  *  of trace module of Charm++, therefore, the START_EVENT
389  *  and END_EVENT are disabled here. -Chao Mei
390  */     
391     /*START_EVENT();*/
392
393     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
394         CmiAbort("Timernit: MPI_Barrier failed!\n");
395
396     /*END_EVENT(10);*/
397   }
398   CmiNodeAllBarrier();
399   return 0;
400 }
401
402 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
403 int CmiBarrierZero()
404 {
405   int i;
406 #if CMK_SMP
407   if (CmiMyRank() == CmiMyNodeSize()) 
408 #else
409   if (CmiMyRank() == 0) 
410 #endif
411   {
412     char msg[1];
413     MPI_Status sts;
414     if (CmiMyNode() == 0)  {
415       for (i=0; i<CmiNumNodes()-1; i++) {
416          START_EVENT();
417
418          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
419             CmiPrintf("MPI_Recv failed!\n");
420
421          END_EVENT(30);
422       }
423     }
424     else {
425       START_EVENT();
426
427       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
428          printf("MPI_Send failed!\n");
429
430       END_EVENT(20);
431     }
432   }
433   CmiNodeAllBarrier();
434   return 0;
435 }
436
437 typedef struct ProcState {
438 #if MULTI_SENDQUEUE
439 PCQueue      sendMsgBuf;       /* per processor message sending queue */
440 #endif
441 CmiNodeLock  recvLock;              /* for cs->recv */
442 } ProcState;
443
444 static ProcState  *procState;
445
446 #if CMK_SMP
447
448 #if !MULTI_SENDQUEUE
449 static PCQueue sendMsgBuf;
450 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
451 #endif
452
453 #endif
454
455 /************************************************************
456  *
457  * Processor state structure
458  *
459  ************************************************************/
460
461 /* fake Cmi_charmrun_fd */
462 static int Cmi_charmrun_fd = 0;
463 #include "machine-smp.c"
464
465 CsvDeclare(CmiNodeState, NodeState);
466
467 #include "immediate.c"
468
469 #if CMK_SHARED_VARS_UNAVAILABLE
470 /************ non SMP **************/
471 static struct CmiStateStruct Cmi_state;
472 int _Cmi_mype;
473 int _Cmi_myrank;
474
475 void CmiMemLock() {}
476 void CmiMemUnlock() {}
477
478 #define CmiGetState() (&Cmi_state)
479 #define CmiGetStateN(n) (&Cmi_state)
480
481 void CmiYield(void) { sleep(0); }
482
483 static void CmiStartThreads(char **argv)
484 {
485   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
486   _Cmi_mype = Cmi_nodestart;
487   _Cmi_myrank = 0;
488 }
489 #endif  /* non smp */
490
491 /*Add a message to this processor's receive queue, pe is a rank */
492 void CmiPushPE(int pe,void *msg)
493 {
494   CmiState cs = CmiGetStateN(pe);
495   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
496 #if CMK_IMMEDIATE_MSG
497   if (CmiIsImmediate(msg)) {
498 /*
499 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
500     CmiHandleMessage(msg);
501 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
502 */
503     /**(CmiUInt2 *)msg = pe;*/
504     CMI_DEST_RANK(msg) = pe;
505     CmiPushImmediateMsg(msg);
506     return;
507   }
508 #endif
509
510 #if CMK_SMP
511   CmiLock(procState[pe].recvLock);
512 #endif
513   PCQueuePush(cs->recv,msg);
514 #if CMK_SMP
515   CmiUnlock(procState[pe].recvLock);
516 #endif
517   CmiIdleLock_addMessage(&cs->idle);
518   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
519 }
520
521 #if CMK_NODE_QUEUE_AVAILABLE
522 /*Add a message to this processor's receive queue */
523 static void CmiPushNode(void *msg)
524 {
525   MACHSTATE(3,"Pushing message into NodeRecv queue");
526 #if CMK_IMMEDIATE_MSG
527   if (CmiIsImmediate(msg)) {
528     CMI_DEST_RANK(msg) = 0;
529     CmiPushImmediateMsg(msg);
530     return;
531   }
532 #endif
533   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
534   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
535   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
536   {
537   CmiState cs=CmiGetStateN(0);
538   CmiIdleLock_addMessage(&cs->idle);
539   }
540 }
541 #endif
542
543 #ifndef CmiMyPe
544 int CmiMyPe(void)
545 {
546   return CmiGetState()->pe;
547 }
548 #endif
549
550 #ifndef CmiMyRank
551 int CmiMyRank(void)
552 {
553   return CmiGetState()->rank;
554 }
555 #endif
556
557 #ifndef CmiNodeFirst
558 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
559 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
560 #endif
561
562 #ifndef CmiNodeOf
563 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
564 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
565 #endif
566
567 static size_t CmiAllAsyncMsgsSent(void)
568 {
569    SMSG_LIST *msg_tmp = sent_msgs;
570    MPI_Status sts;
571    int done;
572
573    while(msg_tmp!=0) {
574     done = 0;
575     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
576       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
577     if(!done)
578       return 0;
579     msg_tmp = msg_tmp->next;
580 /*    MsgQueueLen--; ????? */
581    }
582    return 1;
583 }
584
585 int CmiAsyncMsgSent(CmiCommHandle c) {
586
587   SMSG_LIST *msg_tmp = sent_msgs;
588   int done;
589   MPI_Status sts;
590
591   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
592     msg_tmp = msg_tmp->next;
593   if(msg_tmp) {
594     done = 0;
595     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
596       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
597     return ((done)?1:0);
598   } else {
599     return 1;
600   }
601 }
602
603 void CmiReleaseCommHandle(CmiCommHandle c)
604 {
605   return;
606 }
607
608 #if CMK_BLUEGENEL
609 extern void MPID_Progress_test();
610 #endif
611
612 void CmiReleaseSentMessages(void)
613 {
614   SMSG_LIST *msg_tmp=sent_msgs;
615   SMSG_LIST *prev=0;
616   SMSG_LIST *temp;
617   int done;
618   MPI_Status sts;
619
620 #if CMK_BLUEGENEL
621   MPID_Progress_test();
622 #endif
623
624   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
625   while(msg_tmp!=0) {
626     done =0;
627 #if CMK_SMP_TRACE_COMMTHREAD
628     double startT = CmiWallTimer();
629 #endif
630     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
631       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
632     if(done) {
633       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
634       MsgQueueLen--;
635       /* Release the message */
636       temp = msg_tmp->next;
637       if(prev==0)  /* first message */
638         sent_msgs = temp;
639       else
640         prev->next = temp;
641       CmiFree(msg_tmp->msg);
642       CmiFree(msg_tmp);
643       msg_tmp = temp;
644     } else {
645       prev = msg_tmp;
646       msg_tmp = msg_tmp->next;
647     }
648 #if CMK_SMP_TRACE_COMMTHREAD
649     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
650 #endif
651   }
652   end_sent = prev;
653   MACHSTATE(2,"} CmiReleaseSentMessages end");
654 }
655
656 int PumpMsgs(void)
657 {
658   int nbytes, flg, res;
659   char *msg;
660   MPI_Status sts;
661   int recd=0;
662
663 #if CMI_EXERT_RECV_CAP
664   int recvCnt=0;
665 #endif
666         
667 #if CMK_BLUEGENEL
668   MPID_Progress_test();
669 #endif
670
671   MACHSTATE(2,"PumpMsgs begin {");
672
673         
674   while(1) {
675 #if CMI_EXERT_RECV_CAP
676         if(recvCnt==RECV_CAP) break;
677 #endif
678           
679     /* First check posted recvs then do  probe unmatched outstanding messages */
680 #if MPI_POST_RECV_COUNT > 0 
681     int completed_index=-1;
682     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
683         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
684     if(flg){
685         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
686             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
687
688         recd = 1;
689         msg = (char *) CmiAlloc(nbytes);
690         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
691         /* and repost the recv */
692
693         START_EVENT();
694
695         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
696             MPI_POST_RECV_SIZE,
697             MPI_BYTE,
698             MPI_ANY_SOURCE,
699             POST_RECV_TAG,
700             MPI_COMM_WORLD,
701             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
702                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
703
704         END_EVENT(50);
705
706         CpvAccess(Cmi_posted_recv_total)++;
707     }
708     else {
709         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
710         if(res != MPI_SUCCESS)
711         CmiAbort("MPI_Iprobe failed\n");
712         if(!flg) break;
713         recd = 1;
714         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
715         msg = (char *) CmiAlloc(nbytes);
716
717         START_EVENT();
718
719         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
720             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
721
722         END_EVENT(30);
723
724         CpvAccess(Cmi_unposted_recv_total)++;
725     }
726 #else
727     /* Original version */
728 #if CMK_SMP_TRACE_COMMTHREAD
729   double startT = CmiWallTimer(); 
730 #endif
731     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
732     if(res != MPI_SUCCESS)
733       CmiAbort("MPI_Iprobe failed\n");
734
735     if(!flg) break;
736 #if CMK_SMP_TRACE_COMMTHREAD
737     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
738 #endif
739
740     recd = 1;
741     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
742     msg = (char *) CmiAlloc(nbytes);
743     
744     START_EVENT();
745
746     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
747       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
748
749     /*END_EVENT(30);*/
750
751 #endif
752
753 #if CMK_SMP_TRACE_COMMTHREAD
754         traceBeginCommOp(msg);
755         traceChangeLastTimestamp(CpvAccess(projTraceStart));
756         traceEndCommOp(msg);
757         #if CMI_MPI_TRACE_MOREDETAILED
758         char tmp[32];
759         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
760         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
761         #endif
762 #endif  
763         
764     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
765     CMI_CHECK_CHECKSUM(msg, nbytes);
766     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
767       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
768       CmiFree(msg);
769       CmiAbort("Abort!\n");
770       continue;
771     }
772         
773 #if CMK_BROADCAST_SPANNING_TREE
774     if (CMI_BROADCAST_ROOT(msg))
775       SendSpanningChildren(nbytes, msg);
776 #elif CMK_BROADCAST_HYPERCUBE
777     if (CMI_BROADCAST_ROOT(msg))
778       SendHypercube(nbytes, msg);
779 #endif
780         
781         /* In SMP mode, this push operation needs to be executed
782      * after forwarding broadcast messages. If it is executed
783      * earlier, then during the bcast msg forwarding period,    
784          * the msg could be already freed on the worker thread.
785          * As a result, the forwarded message could be wrong! 
786          * --Chao Mei
787          */
788 #if CMK_NODE_QUEUE_AVAILABLE
789     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
790       CmiPushNode(msg);
791     else
792 #endif
793         CmiPushPE(CMI_DEST_RANK(msg), msg);     
794         
795 #if CMI_EXERT_RECV_CAP
796         recvCnt++;
797 #endif  
798   }
799
800   
801 #if CMK_IMMEDIATE_MSG && !CMK_SMP
802   CmiHandleImmediate();
803 #endif
804   
805   MACHSTATE(2,"} PumpMsgs end ");
806   return recd;
807 }
808
809 /* blocking version */
810 static void PumpMsgsBlocking(void)
811 {
812   static int maxbytes = 20000000;
813   static char *buf = NULL;
814   int nbytes, flg;
815   MPI_Status sts;
816   char *msg;
817   int recd=0;
818
819   if (!PCQueueEmpty(CmiGetState()->recv)) return;
820   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
821   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
822   if (sent_msgs)  return;
823
824 #if 0
825   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
826 #endif
827
828   if (buf == NULL) {
829     buf = (char *) CmiAlloc(maxbytes);
830     _MEMCHECK(buf);
831   }
832
833
834 #if MPI_POST_RECV_COUNT > 0
835 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
836 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
837 #endif
838
839   START_EVENT();
840
841   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
842       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
843
844   /*END_EVENT(30);*/
845     
846    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
847    msg = (char *) CmiAlloc(nbytes);
848    memcpy(msg, buf, nbytes);
849
850 #if CMK_SMP_TRACE_COMMTHREAD
851         traceBeginCommOp(msg);
852         traceChangeLastTimestamp(CpvAccess(projTraceStart));
853         traceEndCommOp(msg);
854         #if CMI_MPI_TRACE_MOREDETAILED
855         char tmp[32];
856         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
857         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
858         #endif
859 #endif
860
861 #if CMK_BROADCAST_SPANNING_TREE
862    if (CMI_BROADCAST_ROOT(msg))
863       SendSpanningChildren(nbytes, msg);
864 #elif CMK_BROADCAST_HYPERCUBE
865    if (CMI_BROADCAST_ROOT(msg))
866       SendHypercube(nbytes, msg);
867 #endif
868   
869         /* In SMP mode, this push operation needs to be executed
870      * after forwarding broadcast messages. If it is executed
871      * earlier, then during the bcast msg forwarding period,    
872          * the msg could be already freed on the worker thread.
873          * As a result, the forwarded message could be wrong! 
874          * --Chao Mei
875          */  
876 #if CMK_NODE_QUEUE_AVAILABLE
877    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
878       CmiPushNode(msg);
879    else
880 #endif
881       CmiPushPE(CMI_DEST_RANK(msg), msg);
882 }
883
884 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
885
886 #if CMK_SMP
887
888 static int inexit = 0;
889 static CmiNodeLock  exitLock = 0;
890
891 static int MsgQueueEmpty()
892 {
893   int i;
894 #if MULTI_SENDQUEUE
895   for (i=0; i<_Cmi_mynodesize; i++)
896     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
897 #else
898   return PCQueueEmpty(sendMsgBuf);
899 #endif
900   return 1;
901 }
902
903 static int SendMsgBuf();
904
905 /* test if all processors recv queues are empty */
906 static int RecvQueueEmpty()
907 {
908   int i;
909   for (i=0; i<_Cmi_mynodesize; i++) {
910     CmiState cs=CmiGetStateN(i);
911     if (!PCQueueEmpty(cs->recv)) return 0;
912   }
913   return 1;
914 }
915
916 /**
917 CommunicationServer calls MPI to send messages in the queues and probe message from network.
918 */
919
920 #define REPORT_COMM_METRICS 0
921 #if REPORT_COMM_METRICS
922 static double pumptime = 0.0;
923 static double releasetime = 0.0;
924 static double sendtime = 0.0;
925 #endif
926
927 static void CommunicationServer(int sleepTime)
928 {
929   int static count=0;
930 /*
931   count ++;
932   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
933 */
934 #if REPORT_COMM_METRICS
935   double t1, t2, t3, t4;
936   t1 = CmiWallTimer();
937 #endif
938   PumpMsgs();
939 #if REPORT_COMM_METRICS
940   t2 = CmiWallTimer();
941 #endif
942   CmiReleaseSentMessages();
943 #if REPORT_COMM_METRICS
944   t3 = CmiWallTimer();
945 #endif
946   SendMsgBuf();
947 #if REPORT_COMM_METRICS
948   t4 = CmiWallTimer();
949   pumptime += (t2-t1);
950   releasetime += (t3-t2);
951   sendtime += (t4-t3);
952 #endif
953 /*
954   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
955 */
956   if (inexit == CmiMyNodeSize()) {
957     MACHSTATE(2, "CommunicationServer exiting {");
958 #if 0
959     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
960 #endif
961     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
962       CmiReleaseSentMessages();
963       SendMsgBuf();
964       PumpMsgs();
965     }
966     MACHSTATE(2, "CommunicationServer barrier begin {");
967
968     START_EVENT();
969
970     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
971       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
972
973     END_EVENT(10);
974
975     MACHSTATE(2, "} CommunicationServer barrier end");
976 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
977     if (CmiMyNode() == 0){
978       CmiPrintf("End of program\n");
979     }
980 #endif
981     MACHSTATE(2, "} CommunicationServer EXIT");
982
983     ConverseCommonExit();   
984 #if REPORT_COMM_METRICS
985     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
986 #endif
987
988 #if ! CMK_AUTOBUILD
989     signal(SIGINT, signal_int);
990     MPI_Finalize();
991     #endif
992     exit(0);
993   }
994 }
995
996 #endif
997
998 static void CommunicationServerThread(int sleepTime)
999 {
1000 #if CMK_SMP
1001   CommunicationServer(sleepTime);
1002 #endif
1003 #if CMK_IMMEDIATE_MSG
1004   CmiHandleImmediate();
1005 #endif
1006 }
1007
1008 #if CMK_NODE_QUEUE_AVAILABLE
1009 char *CmiGetNonLocalNodeQ(void)
1010 {
1011   CmiState cs = CmiGetState();
1012   char *result = 0;
1013   CmiIdleLock_checkMessage(&cs->idle);
1014 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1015     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1016     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1017     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1018     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1019     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1020 /*  }  */
1021
1022   return result;
1023 }
1024 #endif
1025
1026 void *CmiGetNonLocal(void)
1027 {
1028   static int count=0;
1029   CmiState cs = CmiGetState();
1030   void *msg;
1031
1032 #if ! CMK_SMP
1033   if (CmiNumPes() == 1) return NULL;
1034 #endif
1035
1036   CmiIdleLock_checkMessage(&cs->idle);
1037   /* although it seems that lock is not needed, I found it crashes very often
1038      on mpi-smp without lock */
1039
1040 #if ! CMK_SMP
1041   CmiReleaseSentMessages();
1042   PumpMsgs();
1043 #endif
1044
1045   /* CmiLock(procState[cs->rank].recvLock); */
1046   msg =  PCQueuePop(cs->recv);
1047   /* CmiUnlock(procState[cs->rank].recvLock); */
1048
1049 /*
1050   if (msg) {
1051     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1052   else {
1053     count++;
1054     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1055   }
1056 */
1057 #if ! CMK_SMP
1058   if (no_outstanding_sends) {
1059     while (MsgQueueLen>0) {
1060       CmiReleaseSentMessages();
1061       PumpMsgs();
1062     }
1063   }
1064
1065   if(!msg) {
1066     CmiReleaseSentMessages();
1067     if (PumpMsgs())
1068       return  PCQueuePop(cs->recv);
1069     else
1070       return 0;
1071   }
1072 #endif
1073
1074   return msg;
1075 }
1076
1077 /* called in non-smp mode */
1078 void CmiNotifyIdle(void)
1079 {
1080   CmiReleaseSentMessages();
1081   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1082 }
1083
1084
1085 /********************************************************
1086     The call to probe immediate messages has been renamed to
1087     CmiMachineProgressImpl
1088 ******************************************************/
1089 /* user call to handle immediate message, only useful in non SMP version
1090    using polling method to schedule message.
1091 */
1092 /*
1093 #if CMK_IMMEDIATE_MSG
1094 void CmiProbeImmediateMsg()
1095 {
1096 #if !CMK_SMP
1097   PumpMsgs();
1098   CmiHandleImmediate();
1099 #endif
1100 }
1101 #endif
1102 */
1103
1104 /* Network progress function is used to poll the network when for
1105    messages. This flushes receive buffers on some  implementations*/
1106 #if CMK_MACHINE_PROGRESS_DEFINED
1107 void CmiMachineProgressImpl()
1108 {
1109 #if !CMK_SMP
1110     PumpMsgs();
1111 #if CMK_IMMEDIATE_MSG
1112     CmiHandleImmediate();
1113 #endif
1114 #else
1115     /*Not implemented yet. Communication server does not seem to be
1116       thread safe, so only communication thread call it */
1117     if (CmiMyRank() == CmiMyNodeSize())
1118         CommunicationServerThread(0);
1119 #endif
1120 }
1121 #endif
1122
1123 /********************* MESSAGE SEND FUNCTIONS ******************/
1124
1125 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1126
1127 static void CmiSendSelf(char *msg)
1128 {
1129 #if CMK_IMMEDIATE_MSG
1130     if (CmiIsImmediate(msg)) {
1131       /* CmiBecomeNonImmediate(msg); */
1132       CmiPushImmediateMsg(msg);
1133       CmiHandleImmediate();
1134       return;
1135     }
1136 #endif
1137     CQdCreate(CpvAccess(cQdState), 1);
1138     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1139 }
1140
1141 void CmiSyncSendFn(int destPE, int size, char *msg)
1142 {
1143   CmiState cs = CmiGetState();
1144   char *dupmsg = (char *) CmiAlloc(size);
1145   memcpy(dupmsg, msg, size);
1146
1147   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1148
1149   if (cs->pe==destPE) {
1150     CmiSendSelf(dupmsg);
1151   }
1152   else
1153     CmiAsyncSendFn_(destPE, size, dupmsg);
1154 }
1155
1156 #if CMK_SMP
1157
1158 /* called by communication thread in SMP */
1159 static int SendMsgBuf()
1160 {
1161   SMSG_LIST *msg_tmp;
1162   char *msg;
1163   int node, rank, size;
1164   int i;
1165   int sent = 0;
1166
1167 #if CMI_EXERT_SEND_CAP
1168         int sentCnt = 0;
1169 #endif  
1170         
1171   MACHSTATE(2,"SendMsgBuf begin {");
1172 #if MULTI_SENDQUEUE
1173   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1174   {
1175     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1176     {
1177       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1178 #else
1179     /* single message sending queue */
1180     /* CmiLock(sendMsgBufLock); */
1181     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1182     /* CmiUnlock(sendMsgBufLock); */
1183     while (NULL != msg_tmp)
1184     {
1185 #endif
1186       node = msg_tmp->destpe;
1187       size = msg_tmp->size;
1188       msg = msg_tmp->msg;
1189       msg_tmp->next = 0;
1190       while (MsgQueueLen > request_max) {
1191         CmiReleaseSentMessages();
1192         PumpMsgs();
1193       }
1194       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1195       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1196       CMI_SET_CHECKSUM(msg, size);
1197
1198 #if MPI_POST_RECV_COUNT > 0
1199         if(size <= MPI_POST_RECV_SIZE){
1200
1201           START_EVENT();
1202           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1203                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1204
1205           STOP_EVENT(40);
1206         }
1207         else {
1208             START_EVENT();
1209             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1210                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1211             STOP_EVENT(40);
1212         }
1213 #else
1214         START_EVENT();
1215         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1216             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1217         /*END_EVENT(40);*/
1218 #endif
1219         
1220 #if CMK_SMP_TRACE_COMMTHREAD
1221         traceBeginCommOp(msg);
1222         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1223         /* traceSendMsgComm must execute after traceBeginCommOp because
1224          * we pretend we execute an entry method, and inside this we
1225          * pretend we will send another message. Otherwise how could
1226          * a message creation just before an entry method invocation?
1227          * If such logic is broken, the projections will not trace
1228          * messages correctly! -Chao Mei
1229          */
1230         traceSendMsgComm(msg);
1231         traceEndCommOp(msg);
1232         #if CMI_MPI_TRACE_MOREDETAILED
1233         char tmp[64];
1234         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1235         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1236         #endif
1237 #endif
1238                 
1239                 
1240       MACHSTATE(3,"}MPI_send end");
1241       MsgQueueLen++;
1242       if(sent_msgs==0)
1243         sent_msgs = msg_tmp;
1244       else
1245         end_sent->next = msg_tmp;
1246       end_sent = msg_tmp;
1247       sent=1;
1248           
1249 #if CMI_EXERT_SEND_CAP    
1250           if(++sentCnt == SEND_CAP) break;
1251 #endif    
1252           
1253 #if ! MULTI_SENDQUEUE
1254       /* CmiLock(sendMsgBufLock); */
1255       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1256       /* CmiUnlock(sendMsgBufLock); */
1257 #endif
1258     }
1259 #if MULTI_SENDQUEUE
1260   }
1261 #endif
1262   MACHSTATE(2,"}SendMsgBuf end ");
1263   return sent;
1264 }
1265
1266 void EnqueueMsg(void *m, int size, int node)
1267 {
1268   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1269   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1270   msg_tmp->msg = m;
1271   msg_tmp->size = size;
1272   msg_tmp->destpe = node;
1273         
1274 #if CMK_SMP_TRACE_COMMTHREAD
1275         msg_tmp->srcpe = CmiMyPe();
1276 #endif  
1277
1278 #if MULTI_SENDQUEUE
1279   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1280 #else
1281   CmiLock(sendMsgBufLock);
1282   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1283   CmiUnlock(sendMsgBufLock);
1284 #endif
1285         
1286   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1287 }
1288
1289 #endif
1290
1291 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1292 {
1293   CmiState cs = CmiGetState();
1294   SMSG_LIST *msg_tmp;
1295   CmiUInt2  rank, node;
1296
1297   if(destPE == cs->pe) {
1298     char *dupmsg = (char *) CmiAlloc(size);
1299     memcpy(dupmsg, msg, size);
1300     CmiSendSelf(dupmsg);
1301     return 0;
1302   }
1303   CQdCreate(CpvAccess(cQdState), 1);
1304 #if CMK_SMP
1305   node = CmiNodeOf(destPE);
1306   rank = CmiRankOf(destPE);
1307   if (node == CmiMyNode())  {
1308     CmiPushPE(rank, msg);
1309     return 0;
1310   }
1311   CMI_DEST_RANK(msg) = rank;
1312   EnqueueMsg(msg, size, node);
1313   return 0;
1314 #else
1315   /* non smp */
1316   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1317   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1318   msg_tmp->msg = msg;
1319   msg_tmp->next = 0;
1320   while (MsgQueueLen > request_max) {
1321         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1322         CmiReleaseSentMessages();
1323         PumpMsgs();
1324   }
1325   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1326   CMI_SET_CHECKSUM(msg, size);
1327
1328 #if MPI_POST_RECV_COUNT > 0
1329         if(size <= MPI_POST_RECV_SIZE){
1330
1331           START_EVENT();
1332           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1333                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1334           END_EVENT(40);
1335         }
1336         else {
1337           START_EVENT();
1338           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1339                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1340           END_EVENT(40);
1341         }
1342 #else
1343   START_EVENT();
1344   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1345     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1346   END_EVENT(40);
1347 #endif
1348
1349   MsgQueueLen++;
1350   if(sent_msgs==0)
1351     sent_msgs = msg_tmp;
1352   else
1353     end_sent->next = msg_tmp;
1354   end_sent = msg_tmp;
1355   return (CmiCommHandle) &(msg_tmp->req);
1356 #endif              /* non-smp */
1357 }
1358
1359 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1360 {
1361   CMI_SET_BROADCAST_ROOT(msg, 0);
1362   CmiAsyncSendFn_(destPE, size, msg);
1363 }
1364
1365 void CmiFreeSendFn(int destPE, int size, char *msg)
1366 {
1367   CmiState cs = CmiGetState();
1368   CMI_SET_BROADCAST_ROOT(msg, 0);
1369
1370   if (cs->pe==destPE) {
1371     CmiSendSelf(msg);
1372   } else {
1373     CmiAsyncSendFn_(destPE, size, msg);
1374   }
1375 }
1376
1377 /*********************** BROADCAST FUNCTIONS **********************/
1378
1379 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1380 void CmiSyncSendFn1(int destPE, int size, char *msg)
1381 {
1382   CmiState cs = CmiGetState();
1383   char *dupmsg = (char *) CmiAlloc(size);
1384   memcpy(dupmsg, msg, size);
1385   if (cs->pe==destPE)
1386     CmiSendSelf(dupmsg);
1387   else
1388     CmiAsyncSendFn_(destPE, size, dupmsg);
1389 }
1390
1391 /* send msg to its spanning children in broadcast. G. Zheng */
1392 void SendSpanningChildren(int size, char *msg)
1393 {
1394   CmiState cs = CmiGetState();
1395   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1396   int startnode;
1397   int i, exceptRank;
1398         
1399 #if CMK_SMP
1400    /* first send msgs to other nodes */  
1401   startnode = CmiNodeOf(startpe);  
1402   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1403   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1404     int nd = CmiMyNode()-startnode;
1405     if (nd<0) nd+=CmiNumNodes();
1406     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1407     if (nd > CmiNumNodes() - 1) break;
1408     nd += startnode;
1409     nd = nd%CmiNumNodes();
1410     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1411         /* always send to the first rank of other nodes */
1412         char *newmsg = CmiCopyMsg(msg, size);
1413         CMI_DEST_RANK(newmsg) = 0;
1414     EnqueueMsg(newmsg, size, nd);
1415   }     
1416    /* second send msgs to my peers on this node */
1417   /* FIXME: now it's just a flat p2p send!! When node size is large,
1418    * it should also be sent in a tree
1419    */
1420    exceptRank = CMI_DEST_RANK(msg);
1421    for(i=0; i<exceptRank; i++){
1422            CmiPushPE(i, CmiCopyMsg(msg, size));
1423    }
1424    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1425            CmiPushPE(i, CmiCopyMsg(msg, size));
1426    }
1427 #else
1428   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1429   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1430     int p = cs->pe-startpe;
1431     if (p<0) p+=_Cmi_numpes;
1432     p = BROADCAST_SPANNING_FACTOR*p + i;
1433     if (p > _Cmi_numpes - 1) break;
1434     p += startpe;
1435     p = p%_Cmi_numpes;
1436     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);      
1437     CmiSyncSendFn1(p, size, msg);
1438   }
1439 #endif
1440 }
1441
1442 #include <math.h>
1443
1444 /* send msg along the hypercube in broadcast. (Sameer) */
1445 void SendHypercube(int size, char *msg)
1446 {
1447   CmiState cs = CmiGetState();
1448   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1449   int startnode = CmiNodeOf(startpe);
1450   int i, exceptRank, cnt, tmp, relPE;
1451   int dims=0;
1452
1453   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1454   tmp = CmiNumNodes()-1;
1455   while(tmp>0){
1456           dims++;
1457           tmp = tmp >> 1;
1458   }
1459   if(CmiNumNodes()==1) dims=1;
1460   
1461    /* first send msgs to other nodes */  
1462   relPE = CmiMyNode()-startnode;
1463   if(relPE < 0) relPE += CmiNumNodes();
1464   cnt=0;
1465   tmp = relPE;
1466   /* count how many zeros (in binary format) relPE has */
1467   for(i=0; i<dims; i++, cnt++){
1468     if(tmp & 1 == 1) break;
1469     tmp = tmp >> 1;
1470   }
1471   
1472   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1473   for (i = cnt-1; i >= 0; i--) {
1474     int nd = relPE + (1 << i);
1475         if(nd >= CmiNumNodes()) continue;
1476         nd = (nd+startnode)%CmiNumNodes();
1477         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1478 #if CMK_SMP
1479     /* always send to the first rank of other nodes */
1480     char *newmsg = CmiCopyMsg(msg, size);
1481     CMI_DEST_RANK(newmsg) = 0;
1482     EnqueueMsg(newmsg, size, nd);
1483 #else
1484         CmiSyncSendFn1(nd, size, msg);
1485 #endif
1486   }
1487   
1488 #if CMK_SMP
1489    /* second send msgs to my peers on this node */
1490    /* FIXME: now it's just a flat p2p send!! When node size is large,
1491     * it should also be sent in a tree
1492     */
1493    exceptRank = CMI_DEST_RANK(msg);
1494    for(i=0; i<exceptRank; i++){
1495            CmiPushPE(i, CmiCopyMsg(msg, size));
1496    }
1497    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1498            CmiPushPE(i, CmiCopyMsg(msg, size));
1499    }
1500 #endif
1501 }
1502
1503 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1504 {
1505   CmiState cs = CmiGetState();
1506
1507 #if CMK_SMP     
1508   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1509   CMI_DEST_RANK(msg) = CmiMyRank();
1510 #endif
1511         
1512 #if CMK_BROADCAST_SPANNING_TREE
1513   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1514   SendSpanningChildren(size, msg);
1515
1516 #elif CMK_BROADCAST_HYPERCUBE
1517   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1518   SendHypercube(size, msg);
1519
1520 #else
1521   int i;
1522
1523   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1524     CmiSyncSendFn(i, size,msg) ;
1525   for ( i=0; i<cs->pe; i++ )
1526     CmiSyncSendFn(i, size,msg) ;
1527 #endif
1528
1529   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1530 }
1531
1532
1533 /*  FIXME: luckily async is never used  G. Zheng */
1534 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1535 {
1536   CmiState cs = CmiGetState();
1537   int i ;
1538
1539   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1540     CmiAsyncSendFn(i,size,msg) ;
1541   for ( i=0; i<cs->pe; i++ )
1542     CmiAsyncSendFn(i,size,msg) ;
1543
1544   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1545   CmiAbort("CmiAsyncBroadcastFn should never be called");
1546   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1547 }
1548
1549 void CmiFreeBroadcastFn(int size, char *msg)
1550 {
1551    CmiSyncBroadcastFn(size,msg);
1552    CmiFree(msg);
1553 }
1554
1555 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1556 {
1557
1558 #if CMK_SMP     
1559   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1560   CMI_DEST_RANK(msg) = CmiMyRank();
1561 #endif
1562
1563 #if CMK_BROADCAST_SPANNING_TREE
1564   CmiState cs = CmiGetState();
1565   CmiSyncSendFn(cs->pe, size,msg) ;
1566   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1567   SendSpanningChildren(size, msg);
1568
1569 #elif CMK_BROADCAST_HYPERCUBE
1570   CmiState cs = CmiGetState();
1571   CmiSyncSendFn(cs->pe, size,msg) ;
1572   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1573   SendHypercube(size, msg);
1574
1575 #else
1576     int i ;
1577
1578   for ( i=0; i<_Cmi_numpes; i++ )
1579     CmiSyncSendFn(i,size,msg) ;
1580 #endif
1581
1582   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1583 }
1584
1585 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1586 {
1587   int i ;
1588
1589   for ( i=1; i<_Cmi_numpes; i++ )
1590     CmiAsyncSendFn(i,size,msg) ;
1591
1592   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1593
1594   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1595 }
1596
1597 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1598 {
1599 #if CMK_SMP     
1600   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1601   CMI_DEST_RANK(msg) = CmiMyRank();
1602 #endif
1603
1604 #if CMK_BROADCAST_SPANNING_TREE
1605   CmiState cs = CmiGetState();
1606   CmiSyncSendFn(cs->pe, size,msg) ;
1607   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1608   SendSpanningChildren(size, msg);
1609
1610 #elif CMK_BROADCAST_HYPERCUBE
1611   CmiState cs = CmiGetState();
1612   CmiSyncSendFn(cs->pe, size,msg) ;
1613   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1614   SendHypercube(size, msg);
1615
1616 #else
1617   int i ;
1618
1619   for ( i=0; i<_Cmi_numpes; i++ )
1620     CmiSyncSendFn(i,size,msg) ;
1621 #endif
1622   CmiFree(msg) ;
1623   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1624 }
1625
1626 #if CMK_NODE_QUEUE_AVAILABLE
1627
1628 static void CmiSendNodeSelf(char *msg)
1629 {
1630 #if CMK_IMMEDIATE_MSG
1631 #if 0
1632     if (CmiIsImmediate(msg) && !_immRunning) {
1633       /*CmiHandleImmediateMessage(msg); */
1634       CmiPushImmediateMsg(msg);
1635       CmiHandleImmediate();
1636       return;
1637     }
1638 #endif
1639     if (CmiIsImmediate(msg))
1640     {
1641       CmiPushImmediateMsg(msg);
1642       if (!_immRunning) CmiHandleImmediate();
1643       return;
1644     }
1645 #endif
1646     CQdCreate(CpvAccess(cQdState), 1);
1647     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1648     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1649     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1650 }
1651
1652 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1653 {
1654   int i;
1655   SMSG_LIST *msg_tmp;
1656   char *dupmsg;
1657
1658   CMI_SET_BROADCAST_ROOT(msg, 0);
1659   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1660   switch (dstNode) {
1661   case NODE_BROADCAST_ALL:
1662     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1663   case NODE_BROADCAST_OTHERS:
1664     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1665     for (i=0; i<_Cmi_numnodes; i++)
1666       if (i!=_Cmi_mynode) {
1667         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1668       }
1669     break;
1670   default:
1671     dupmsg = (char *)CmiCopyMsg(msg,size);
1672     if(dstNode == _Cmi_mynode) {
1673       CmiSendNodeSelf(dupmsg);
1674     }
1675     else {
1676       CQdCreate(CpvAccess(cQdState), 1);
1677       EnqueueMsg(dupmsg, size, dstNode);
1678     }
1679   }
1680   return 0;
1681 }
1682
1683 void CmiSyncNodeSendFn(int p, int s, char *m)
1684 {
1685   CmiAsyncNodeSendFn(p, s, m);
1686 }
1687
1688 /* need */
1689 void CmiFreeNodeSendFn(int p, int s, char *m)
1690 {
1691   CmiAsyncNodeSendFn(p, s, m);
1692   CmiFree(m);
1693 }
1694
1695 /* need */
1696 void CmiSyncNodeBroadcastFn(int s, char *m)
1697 {
1698   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1699 }
1700
1701 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1702 {
1703 }
1704
1705 /* need */
1706 void CmiFreeNodeBroadcastFn(int s, char *m)
1707 {
1708   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1709   CmiFree(m);
1710 }
1711
1712 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1713 {
1714   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1715 }
1716
1717 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1718 {
1719   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1720 }
1721
1722 /* need */
1723 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1724 {
1725   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1726   CmiFree(m);
1727 }
1728 #endif
1729
1730 /************************** MAIN ***********************************/
1731 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1732
1733 void ConverseExit(void)
1734 {
1735 #if ! CMK_SMP
1736   while(!CmiAllAsyncMsgsSent()) {
1737     PumpMsgs();
1738     CmiReleaseSentMessages();
1739   }
1740   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1741     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1742
1743   ConverseCommonExit();
1744 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1745   if (CmiMyPe() == 0){
1746     CmiPrintf("End of program\n");
1747 #if MPI_POST_RECV_COUNT > 0
1748     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1749 #endif
1750 }
1751 #endif
1752 #if ! CMK_AUTOBUILD
1753   signal(SIGINT, signal_int);
1754   MPI_Finalize();
1755 #endif
1756   exit(0);
1757
1758 #else
1759     /* SMP version, communication thread will exit */
1760   ConverseCommonExit();
1761   /* atomic increment */
1762   CmiLock(exitLock);
1763   inexit++;
1764   CmiUnlock(exitLock);
1765   while (1) CmiYield();
1766 #endif
1767 }
1768
1769 static void registerMPITraceEvents() {
1770 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1771     traceRegisterUserEvent("MPI_Barrier", 10);
1772     traceRegisterUserEvent("MPI_Send", 20);
1773     traceRegisterUserEvent("MPI_Recv", 30);
1774     traceRegisterUserEvent("MPI_Isend", 40);
1775     traceRegisterUserEvent("MPI_Irecv", 50);
1776     traceRegisterUserEvent("MPI_Test", 60);
1777     traceRegisterUserEvent("MPI_Iprobe", 70);
1778 #endif
1779 }
1780
1781
1782 static char     **Cmi_argv;
1783 static char     **Cmi_argvcopy;
1784 static CmiStartFn Cmi_startfn;   /* The start function */
1785 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1786
1787 typedef struct {
1788   int sleepMs; /*Milliseconds to sleep while idle*/
1789   int nIdles; /*Number of times we've been idle in a row*/
1790   CmiState cs; /*Machine state*/
1791 } CmiIdleState;
1792
1793 static CmiIdleState *CmiNotifyGetState(void)
1794 {
1795   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1796   s->sleepMs=0;
1797   s->nIdles=0;
1798   s->cs=CmiGetState();
1799   return s;
1800 }
1801
1802 static void CmiNotifyBeginIdle(CmiIdleState *s)
1803 {
1804   s->sleepMs=0;
1805   s->nIdles=0;
1806 }
1807
1808 static void CmiNotifyStillIdle(CmiIdleState *s)
1809 {
1810 #if ! CMK_SMP
1811   CmiReleaseSentMessages();
1812   PumpMsgs();
1813 #else
1814 /*  CmiYield();  */
1815 #endif
1816
1817 #if 1
1818   {
1819   int nSpins=20; /*Number of times to spin before sleeping*/
1820   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1821   s->nIdles++;
1822   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1823     s->sleepMs+=2;
1824     if (s->sleepMs>10) s->sleepMs=10;
1825   }
1826   /*Comm. thread will listen on sockets-- just sleep*/
1827   if (s->sleepMs>0) {
1828     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1829     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1830     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1831   }
1832   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1833   }
1834 #endif
1835 }
1836
1837 #if MACHINE_DEBUG_LOG
1838 FILE *debugLog = NULL;
1839 #endif
1840
1841 static int machine_exit_idx;
1842 static void machine_exit(char *m) {
1843   EmergencyExit();
1844   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1845   fflush(stdout);
1846   CmiNodeBarrier();
1847   if (CmiMyRank() == 0) {
1848     MPI_Barrier(MPI_COMM_WORLD);
1849     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1850     MPI_Abort(MPI_COMM_WORLD, 1);
1851   } else {
1852     while (1) CmiYield();
1853   }
1854 }
1855
1856 static void KillOnAllSigs(int sigNo) {
1857   static int already_in_signal_handler = 0;
1858   char *m;
1859   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1860   already_in_signal_handler = 1;
1861 #if CMK_CCS_AVAILABLE
1862   if (CpvAccess(cmiArgDebugFlag)) {
1863     CpdNotify(CPD_SIGNAL, sigNo);
1864     CpdFreeze();
1865   }
1866 #endif
1867   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1868       "Signal: %d\n",CmiMyPe(),sigNo);
1869   CmiPrintStackTrace(1);
1870
1871   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1872   CmiSetHandler(m, machine_exit_idx);
1873   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1874   machine_exit(m);
1875 }
1876
1877 static void ConverseRunPE(int everReturn)
1878 {
1879   CmiIdleState *s=CmiNotifyGetState();
1880   CmiState cs;
1881   char** CmiMyArgv;
1882
1883   CmiNodeAllBarrier();
1884
1885   cs = CmiGetState();
1886   CpvInitialize(void *,CmiLocalQueue);
1887   CpvAccess(CmiLocalQueue) = cs->localqueue;
1888
1889   if (CmiMyRank())
1890     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1891   else
1892     CmiMyArgv=Cmi_argv;
1893
1894   CthInit(CmiMyArgv);
1895
1896   ConverseCommonInit(CmiMyArgv);
1897   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1898
1899 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1900   CpvInitialize(double, projTraceStart);
1901   /* only PE 0 needs to care about registration (to generate sts file). */
1902   if (CmiMyPe() == 0) {
1903     registerMachineUserEventsFunction(&registerMPITraceEvents);
1904   }
1905 #endif
1906
1907   /* initialize the network progress counter*/
1908   /* Network progress function is used to poll the network when for
1909      messages. This flushes receive buffers on some  implementations*/
1910   CpvInitialize(int , networkProgressCount);
1911   CpvAccess(networkProgressCount) = 0;
1912
1913 #if CMK_SMP
1914   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1915   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1916 #else
1917   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1918 #endif
1919
1920 #if MACHINE_DEBUG_LOG
1921   if (CmiMyRank() == 0) {
1922     char ln[200];
1923     sprintf(ln,"debugLog.%d",CmiMyNode());
1924     debugLog=fopen(ln,"w");
1925   }
1926 #endif
1927
1928   /* Converse initialization finishes, immediate messages can be processed.
1929      node barrier previously should take care of the node synchronization */
1930   _immediateReady = 1;
1931
1932   /* communication thread */
1933   if (CmiMyRank() == CmiMyNodeSize()) {
1934     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1935     while (1) CommunicationServerThread(5);
1936   }
1937   else {  /* worker thread */
1938   if (!everReturn) {
1939     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1940     if (Cmi_usrsched==0) CsdScheduler(-1);
1941     ConverseExit();
1942   }
1943   }
1944 }
1945
1946 static char *thread_level_tostring(int thread_level)
1947 {
1948 #if CMK_MPI_INIT_THREAD
1949   switch (thread_level) {
1950   case MPI_THREAD_SINGLE:
1951       return "MPI_THREAD_SINGLE";
1952   case MPI_THREAD_FUNNELED:
1953       return "MPI_THREAD_FUNNELED";
1954   case MPI_THREAD_SERIALIZED:
1955       return "MPI_THREAD_SERIALIZED";
1956   case MPI_THREAD_MULTIPLE :
1957       return "MPI_THREAD_MULTIPLE ";
1958   default: {
1959       char *str = (char*)malloc(5);
1960       sprintf(str,"%d", thread_level);
1961       return str;
1962       }
1963   }
1964   return  "unknown";
1965 #else
1966   char *str = (char*)malloc(5);
1967   sprintf(str,"%d", thread_level);
1968   return str;
1969 #endif
1970 }
1971
1972 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1973 {
1974   int n,i;
1975   int ver, subver;
1976   int provided;
1977   int thread_level;
1978
1979 #if MACHINE_DEBUG
1980   debugLog=NULL;
1981 #endif
1982 #if CMK_USE_HP_MAIN_FIX
1983 #if FOR_CPLUS
1984   _main(argc,argv);
1985 #endif
1986 #endif
1987
1988 #if CMK_MPI_INIT_THREAD
1989 #if CMK_SMP
1990   thread_level = MPI_THREAD_FUNNELED;
1991 #else
1992   thread_level = MPI_THREAD_SINGLE;
1993 #endif
1994   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1995   _thread_provided = provided;
1996 #else
1997   MPI_Init(&argc, &argv);
1998   thread_level = 0;
1999   provided = -1;
2000 #endif
2001   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2002   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2003
2004   MPI_Get_version(&ver, &subver);
2005   if (_Cmi_mynode == 0) {
2006     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2007   }
2008
2009   /* processor per node */
2010   _Cmi_mynodesize = 1;
2011   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2012     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2013 #if ! CMK_SMP
2014   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2015     CmiAbort("+ppn cannot be used in non SMP version!\n");
2016 #endif
2017   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2018   if (idleblock && _Cmi_mynode == 0) {
2019     printf("Charm++: Running in idle blocking mode.\n");
2020   }
2021
2022   /* setup signal handlers */
2023   signal(SIGSEGV, KillOnAllSigs);
2024   signal(SIGFPE, KillOnAllSigs);
2025   signal(SIGILL, KillOnAllSigs);
2026   signal_int = signal(SIGINT, KillOnAllSigs);
2027   signal(SIGTERM, KillOnAllSigs);
2028   signal(SIGABRT, KillOnAllSigs);
2029 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2030   signal(SIGQUIT, KillOnAllSigs);
2031   signal(SIGBUS, KillOnAllSigs);
2032 /*#     if CMK_HANDLE_SIGUSR
2033   signal(SIGUSR1, HandleUserSignals);
2034   signal(SIGUSR2, HandleUserSignals);
2035 #     endif*/
2036 #   endif /*UNIX*/
2037   
2038 #if CMK_NO_OUTSTANDING_SENDS
2039   no_outstanding_sends=1;
2040 #endif
2041   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2042     no_outstanding_sends = 1;
2043     if (_Cmi_mynode == 0)
2044       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2045         no_outstanding_sends?"":" not");
2046   }
2047   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2048   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2049   Cmi_argvcopy = CmiCopyArgs(argv);
2050   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2051   /* find dim = log2(numpes), to pretend we are a hypercube */
2052   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2053     Cmi_dim++ ;
2054  /* CmiSpanTreeInit();*/
2055   request_max=MAX_QLEN;
2056   CmiGetArgInt(argv,"+requestmax",&request_max);
2057   /*printf("request max=%d\n", request_max);*/
2058
2059   /* checksum flag */
2060   if (CmiGetArgFlag(argv,"+checksum")) {
2061 #if !CMK_OPTIMIZE
2062     checksum_flag = 1;
2063     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2064 #else
2065     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2066 #endif
2067   }
2068
2069   {
2070   int debug = CmiGetArgFlag(argv,"++debug");
2071   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2072   if (debug || debug_no_pause)
2073   {   /*Pause so user has a chance to start and attach debugger*/
2074 #if CMK_HAS_GETPID
2075     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2076     fflush(stdout);
2077     if (!debug_no_pause)
2078       sleep(15);
2079 #else
2080     printf("++debug ignored.\n");
2081 #endif
2082   }
2083   }
2084
2085 #if MPI_POST_RECV_COUNT > 0
2086
2087   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2088   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2089   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2090   CpvInitialize(char*,CmiPostedRecvBuffers);
2091
2092     /* Post some extra recvs to help out with incoming messages */
2093     /* On some MPIs the messages are unexpected and thus slow */
2094
2095     /* An array of request handles for posted recvs */
2096     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2097
2098     /* An array of buffers for posted recvs */
2099     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2100
2101     /* Post Recvs */
2102     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2103         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2104                     MPI_POST_RECV_SIZE,
2105                     MPI_BYTE,
2106                     MPI_ANY_SOURCE,
2107                     POST_RECV_TAG,
2108                     MPI_COMM_WORLD,
2109                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2110           CmiAbort("MPI_Irecv failed\n");
2111     }
2112
2113 #endif
2114
2115
2116
2117   /* CmiTimerInit(); */
2118
2119 #if 0
2120   CthInit(argv);
2121   ConverseCommonInit(argv);
2122
2123   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2124   if (initret==0) {
2125     fn(CmiGetArgc(argv), argv);
2126     if (usched==0) CsdScheduler(-1);
2127     ConverseExit();
2128   }
2129 #endif
2130
2131   CsvInitialize(CmiNodeState, NodeState);
2132   CmiNodeStateInit(&CsvAccess(NodeState));
2133
2134   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2135
2136   for (i=0; i<_Cmi_mynodesize+1; i++) {
2137 #if MULTI_SENDQUEUE
2138     procState[i].sendMsgBuf = PCQueueCreate();
2139 #endif
2140     procState[i].recvLock = CmiCreateLock();
2141   }
2142 #if CMK_SMP
2143 #if !MULTI_SENDQUEUE
2144   sendMsgBuf = PCQueueCreate();
2145   sendMsgBufLock = CmiCreateLock();
2146 #endif
2147   exitLock = CmiCreateLock();            /* exit count lock */
2148 #endif
2149
2150   /* Network progress function is used to poll the network when for
2151      messages. This flushes receive buffers on some  implementations*/
2152   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2153   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2154
2155   CmiStartThreads(argv);
2156   ConverseRunPE(initret);
2157 }
2158
2159 /***********************************************************************
2160  *
2161  * Abort function:
2162  *
2163  ************************************************************************/
2164
2165 void CmiAbort(const char *message)
2166 {
2167   char *m;
2168   /* if CharmDebug is attached simply try to send a message to it */
2169 #if CMK_CCS_AVAILABLE
2170   if (CpvAccess(cmiArgDebugFlag)) {
2171     CpdNotify(CPD_ABORT, message);
2172     CpdFreeze();
2173   }
2174 #endif  
2175   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2176         "Reason: %s\n",CmiMyPe(),message);
2177  /*  CmiError(message); */
2178   CmiPrintStackTrace(0);
2179   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2180   CmiSetHandler(m, machine_exit_idx);
2181   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2182   machine_exit(m);
2183   /* Program never reaches here */
2184   MPI_Abort(MPI_COMM_WORLD, 1);
2185 }
2186
2187
2188 #if 0
2189
2190 /* ****************************************************************** */
2191 /*    The following internal functions implement recd msg queue       */
2192 /* ****************************************************************** */
2193
2194 static void ** AllocBlock(unsigned int len)
2195 {
2196   void ** blk;
2197
2198   blk=(void **)CmiAlloc(len*sizeof(void *));
2199   if(blk==(void **)0) {
2200     CmiError("Cannot Allocate Memory!\n");
2201     MPI_Abort(MPI_COMM_WORLD, 1);
2202   }
2203   return blk;
2204 }
2205
2206 static void
2207 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2208 {
2209   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2210   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2211 }
2212
2213 void recdQueueInit(void)
2214 {
2215   recdQueue_blk = AllocBlock(BLK_LEN);
2216   recdQueue_blk_len = BLK_LEN;
2217   recdQueue_first = 0;
2218   recdQueue_len = 0;
2219 }
2220
2221 void recdQueueAddToBack(void *element)
2222 {
2223 #if NODE_0_IS_CONVHOST
2224   inside_comm = 1;
2225 #endif
2226   if(recdQueue_len==recdQueue_blk_len) {
2227     void **blk;
2228     recdQueue_blk_len *= 3;
2229     blk = AllocBlock(recdQueue_blk_len);
2230     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2231     CmiFree(recdQueue_blk);
2232     recdQueue_blk = blk;
2233     recdQueue_first = 0;
2234   }
2235   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2236 #if NODE_0_IS_CONVHOST
2237   inside_comm = 0;
2238 #endif
2239 }
2240
2241
2242 void * recdQueueRemoveFromFront(void)
2243 {
2244   if(recdQueue_len) {
2245     void *element;
2246     element = recdQueue_blk[recdQueue_first++];
2247     recdQueue_first %= recdQueue_blk_len;
2248     recdQueue_len--;
2249     return element;
2250   }
2251   return 0;
2252 }
2253
2254 #endif
2255
2256 /*@}*/