Fixed the spanning tree implementation in SMP mode. Note it is a charm smp node
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #define CMK_BROADCAST_SPANNING_TREE    1
96 #define CMK_BROADCAST_HYPERCUBE        0
97
98 #define BROADCAST_SPANNING_FACTOR      4
99 /* The number of children used when a msg is broadcast inside a node */
100 #define BROADCAST_SPANNING_INTRA_FACTOR      8
101
102 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
103 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
107
108 /* FIXME: need a random number that everyone agrees ! */
109 #define CHARM_MAGIC_NUMBER               126
110
111 #if !CMK_OPTIMIZE
112 static int checksum_flag = 0;
113 #define CMI_SET_CHECKSUM(msg, len)      \
114         if (checksum_flag)  {   \
115           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
116           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
117         }
118 #define CMI_CHECK_CHECKSUM(msg, len)    \
119         if (checksum_flag)      \
120           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
121             CmiAbort("Fatal error: checksum doesn't agree!\n");
122 #else
123 #define CMI_SET_CHECKSUM(msg, len)
124 #define CMI_CHECK_CHECKSUM(msg, len)
125 #endif
126
127 #if CMK_BROADCAST_SPANNING_TREE
128 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
129 #else
130 #  define CMI_SET_BROADCAST_ROOT(msg, root)
131 #endif
132
133 #if CMK_BROADCAST_HYPERCUBE
134 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
135 #else
136 #  define CMI_SET_CYCLE(msg, cycle)
137 #endif
138
139
140 /** 
141     If MPI_POST_RECV is defined, we provide default values for size 
142     and number of posted recieves. If MPI_POST_RECV_COUNT is set
143     then a default value for MPI_POST_RECV_SIZE is used if not specified
144     by the user.
145 */
146 #ifdef MPI_POST_RECV
147 #define MPI_POST_RECV_COUNT 10
148 #undef MPI_POST_RECV
149 #endif
150 #if MPI_POST_RECV_COUNT > 0
151 #warning "Using MPI posted receives which have not yet been tested"
152 #ifndef MPI_POST_RECV_SIZE
153 #define MPI_POST_RECV_SIZE 200
154 #endif
155 /* #undef  MPI_POST_RECV_DEBUG  */
156 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
157 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
158 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
159 CpvDeclare(char*,CmiPostedRecvBuffers);
160 #endif
161
162 /*
163  to avoid MPI's in order delivery, changing MPI Tag all the time
164 */
165 #define TAG     1375
166
167 #if MPI_POST_RECV_COUNT > 0
168 #define POST_RECV_TAG TAG+1
169 #define BARRIER_ZERO_TAG TAG
170 #else
171 #define BARRIER_ZERO_TAG     1375
172 #endif
173
174 #include <signal.h>
175 void (*signal_int)(int);
176
177 /*
178 static int mpi_tag = TAG;
179 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
180 */
181
182 static int        _thread_provided = -1;
183 int               _Cmi_numpes;
184 int               _Cmi_mynode;    /* Which address space am I */
185 int               _Cmi_mynodesize;/* Number of processors in my address space */
186 int               _Cmi_numnodes;  /* Total number of address spaces */
187 int               _Cmi_numpes;    /* Total number of processors */
188 static int        Cmi_nodestart; /* First processor in this address space */
189 CpvDeclare(void*, CmiLocalQueue);
190
191 /*Network progress utility variables. Period controls the rate at
192   which the network poll is called */
193 CpvDeclare(unsigned , networkProgressCount);
194 int networkProgressPeriod;
195
196 int               idleblock = 0;
197
198 #define BLK_LEN  512
199
200 #if CMK_NODE_QUEUE_AVAILABLE
201 #define DGRAM_NODEMESSAGE   (0xFB)
202
203 #define NODE_BROADCAST_OTHERS (-1)
204 #define NODE_BROADCAST_ALL    (-2)
205 #endif
206
207 #if 0
208 static void **recdQueue_blk;
209 static unsigned int recdQueue_blk_len;
210 static unsigned int recdQueue_first;
211 static unsigned int recdQueue_len;
212 static void recdQueueInit(void);
213 static void recdQueueAddToBack(void *element);
214 static void *recdQueueRemoveFromFront(void);
215 #endif
216
217 static void ConverseRunPE(int everReturn);
218 static void CommunicationServer(int sleepTime);
219 static void CommunicationServerThread(int sleepTime);
220
221 typedef struct msg_list {
222      char *msg;
223      struct msg_list *next;
224      int size, destpe;
225
226 #if CMK_SMP_TRACE_COMMTHREAD
227         int srcpe;
228 #endif  
229         
230      MPI_Request req;
231 } SMSG_LIST;
232
233 int MsgQueueLen=0;
234 static int request_max;
235
236 static SMSG_LIST *sent_msgs=0;
237 static SMSG_LIST *end_sent=0;
238
239 static int Cmi_dim;
240
241 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
242
243 #if NODE_0_IS_CONVHOST
244 int inside_comm = 0;
245 #endif
246
247 void CmiAbort(const char *message);
248 static void PerrorExit(const char *msg);
249
250 void SendSpanningChildren(int size, char *msg);
251 void SendHypercube(int size, char *msg);
252
253 static void PerrorExit(const char *msg)
254 {
255   perror(msg);
256   exit(1);
257 }
258
259 extern unsigned char computeCheckSum(unsigned char *data, int len);
260
261 /**************************  TIMER FUNCTIONS **************************/
262
263 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
264
265 /* MPI calls are not threadsafe, even the timer on some machines */
266 static CmiNodeLock  timerLock = 0;
267 static double starttimer = 0;
268 static int _is_global = 0;
269
270 int CmiTimerIsSynchronized()
271 {
272   int  flag;
273   void *v;
274
275   /*  check if it using synchronized timer */
276   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
277     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
278   if (flag) {
279     _is_global = *(int*)v;
280     if (_is_global && CmiMyPe() == 0)
281       printf("Charm++> MPI timer is synchronized!\n");
282   }
283   return _is_global;
284 }
285
286 void CmiTimerInit()
287 {
288   _is_global = CmiTimerIsSynchronized();
289
290   if (_is_global) {
291     if (CmiMyRank() == 0) {
292       double minTimer;
293 #if CMK_TIMER_USE_XT3_DCLOCK
294       starttimer = dclock();
295 #else
296       starttimer = MPI_Wtime();
297 #endif
298
299       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
300                                   MPI_COMM_WORLD );
301       starttimer = minTimer;
302     }
303   }
304   else {  /* we don't have a synchronous timer, set our own start time */
305     CmiBarrier();
306     CmiBarrier();
307     CmiBarrier();
308 #if CMK_TIMER_USE_XT3_DCLOCK
309     starttimer = dclock();
310 #else
311     starttimer = MPI_Wtime();
312 #endif
313   }
314
315 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
316   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
317     timerLock = CmiCreateLock();
318 #endif
319   CmiNodeAllBarrier();          /* for smp */
320 }
321
322 /**
323  * Since the timerLock is never created, and is
324  * always NULL, then all the if-condition inside
325  * the timer functions could be disabled right
326  * now in the case of SMP. --Chao Mei
327  */
328 double CmiTimer(void)
329 {
330   double t;
331 #if 0 && CMK_SMP
332   if (timerLock) CmiLock(timerLock);
333 #endif
334 #if CMK_TIMER_USE_XT3_DCLOCK
335   t = dclock() - starttimer;
336 #else
337   t = MPI_Wtime() - starttimer;
338 #endif
339
340 #if 0 && CMK_SMP
341   if (timerLock) CmiUnlock(timerLock);
342 #endif
343
344   return t;
345 }
346
347 double CmiWallTimer(void)
348 {
349   double t;
350 #if 0 && CMK_SMP
351   if (timerLock) CmiLock(timerLock);
352 #endif
353 #if CMK_TIMER_USE_XT3_DCLOCK
354   t = dclock() - starttimer;
355 #else
356   t = MPI_Wtime() - starttimer;
357 #endif
358 #if 0 && CMK_SMP
359   if (timerLock) CmiUnlock(timerLock);
360 #endif
361   return t;
362 }
363
364 double CmiCpuTimer(void)
365 {
366   double t;
367 #if 0 && CMK_SMP
368   if (timerLock) CmiLock(timerLock);
369 #endif
370 #if CMK_TIMER_USE_XT3_DCLOCK
371   t = dclock() - starttimer;
372 #else
373   t = MPI_Wtime() - starttimer;
374 #endif
375 #if 0 && CMK_SMP
376   if (timerLock) CmiUnlock(timerLock);
377 #endif
378   return t;
379 }
380
381 #endif
382
383 /* must be called on all ranks including comm thread in SMP */
384 int CmiBarrier()
385 {
386 #if CMK_SMP
387     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
388   CmiNodeAllBarrier();
389   if (CmiMyRank() == CmiMyNodeSize()) 
390 #else
391   if (CmiMyRank() == 0) 
392 #endif
393   {
394 /**
395  *  The call of CmiBarrier is usually before the initialization
396  *  of trace module of Charm++, therefore, the START_EVENT
397  *  and END_EVENT are disabled here. -Chao Mei
398  */     
399     /*START_EVENT();*/
400
401     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
402         CmiAbort("Timernit: MPI_Barrier failed!\n");
403
404     /*END_EVENT(10);*/
405   }
406   CmiNodeAllBarrier();
407   return 0;
408 }
409
410 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
411 int CmiBarrierZero()
412 {
413   int i;
414 #if CMK_SMP
415   if (CmiMyRank() == CmiMyNodeSize()) 
416 #else
417   if (CmiMyRank() == 0) 
418 #endif
419   {
420     char msg[1];
421     MPI_Status sts;
422     if (CmiMyNode() == 0)  {
423       for (i=0; i<CmiNumNodes()-1; i++) {
424          START_EVENT();
425
426          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
427             CmiPrintf("MPI_Recv failed!\n");
428
429          END_EVENT(30);
430       }
431     }
432     else {
433       START_EVENT();
434
435       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
436          printf("MPI_Send failed!\n");
437
438       END_EVENT(20);
439     }
440   }
441   CmiNodeAllBarrier();
442   return 0;
443 }
444
445 typedef struct ProcState {
446 #if MULTI_SENDQUEUE
447 PCQueue      sendMsgBuf;       /* per processor message sending queue */
448 #endif
449 CmiNodeLock  recvLock;              /* for cs->recv */
450 } ProcState;
451
452 static ProcState  *procState;
453
454 #if CMK_SMP
455
456 #if !MULTI_SENDQUEUE
457 static PCQueue sendMsgBuf;
458 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
459 #endif
460
461 #endif
462
463 /************************************************************
464  *
465  * Processor state structure
466  *
467  ************************************************************/
468
469 /* fake Cmi_charmrun_fd */
470 static int Cmi_charmrun_fd = 0;
471 #include "machine-smp.c"
472
473 CsvDeclare(CmiNodeState, NodeState);
474
475 #include "immediate.c"
476
477 #if CMK_SHARED_VARS_UNAVAILABLE
478 /************ non SMP **************/
479 static struct CmiStateStruct Cmi_state;
480 int _Cmi_mype;
481 int _Cmi_myrank;
482
483 void CmiMemLock() {}
484 void CmiMemUnlock() {}
485
486 #define CmiGetState() (&Cmi_state)
487 #define CmiGetStateN(n) (&Cmi_state)
488
489 void CmiYield(void) { sleep(0); }
490
491 static void CmiStartThreads(char **argv)
492 {
493   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
494   _Cmi_mype = Cmi_nodestart;
495   _Cmi_myrank = 0;
496 }
497 #endif  /* non smp */
498
499 /*Add a message to this processor's receive queue, pe is a rank */
500 void CmiPushPE(int pe,void *msg)
501 {
502   CmiState cs = CmiGetStateN(pe);
503   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
504 #if CMK_IMMEDIATE_MSG
505   if (CmiIsImmediate(msg)) {
506 /*
507 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
508     CmiHandleMessage(msg);
509 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
510 */
511     /**(CmiUInt2 *)msg = pe;*/
512     CMI_DEST_RANK(msg) = pe;
513     CmiPushImmediateMsg(msg);
514     return;
515   }
516 #endif
517
518 #if CMK_SMP
519   CmiLock(procState[pe].recvLock);
520 #endif
521   PCQueuePush(cs->recv,msg);
522 #if CMK_SMP
523   CmiUnlock(procState[pe].recvLock);
524 #endif
525   CmiIdleLock_addMessage(&cs->idle);
526   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
527 }
528
529 #if CMK_NODE_QUEUE_AVAILABLE
530 /*Add a message to this processor's receive queue */
531 static void CmiPushNode(void *msg)
532 {
533   MACHSTATE(3,"Pushing message into NodeRecv queue");
534 #if CMK_IMMEDIATE_MSG
535   if (CmiIsImmediate(msg)) {
536     CMI_DEST_RANK(msg) = 0;
537     CmiPushImmediateMsg(msg);
538     return;
539   }
540 #endif
541   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
542   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
543   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
544   {
545   CmiState cs=CmiGetStateN(0);
546   CmiIdleLock_addMessage(&cs->idle);
547   }
548 }
549 #endif
550
551 #ifndef CmiMyPe
552 int CmiMyPe(void)
553 {
554   return CmiGetState()->pe;
555 }
556 #endif
557
558 #ifndef CmiMyRank
559 int CmiMyRank(void)
560 {
561   return CmiGetState()->rank;
562 }
563 #endif
564
565 #ifndef CmiNodeFirst
566 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
567 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
568 #endif
569
570 #ifndef CmiNodeOf
571 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
572 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
573 #endif
574
575 static size_t CmiAllAsyncMsgsSent(void)
576 {
577    SMSG_LIST *msg_tmp = sent_msgs;
578    MPI_Status sts;
579    int done;
580
581    while(msg_tmp!=0) {
582     done = 0;
583     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
584       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
585     if(!done)
586       return 0;
587     msg_tmp = msg_tmp->next;
588 /*    MsgQueueLen--; ????? */
589    }
590    return 1;
591 }
592
593 int CmiAsyncMsgSent(CmiCommHandle c) {
594
595   SMSG_LIST *msg_tmp = sent_msgs;
596   int done;
597   MPI_Status sts;
598
599   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
600     msg_tmp = msg_tmp->next;
601   if(msg_tmp) {
602     done = 0;
603     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
604       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
605     return ((done)?1:0);
606   } else {
607     return 1;
608   }
609 }
610
611 void CmiReleaseCommHandle(CmiCommHandle c)
612 {
613   return;
614 }
615
616 #if CMK_BLUEGENEL
617 extern void MPID_Progress_test();
618 #endif
619
620 void CmiReleaseSentMessages(void)
621 {
622   SMSG_LIST *msg_tmp=sent_msgs;
623   SMSG_LIST *prev=0;
624   SMSG_LIST *temp;
625   int done;
626   MPI_Status sts;
627
628 #if CMK_BLUEGENEL
629   MPID_Progress_test();
630 #endif
631
632   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
633   while(msg_tmp!=0) {
634     done =0;
635 #if CMK_SMP_TRACE_COMMTHREAD
636     double startT = CmiWallTimer();
637 #endif
638     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
639       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
640     if(done) {
641       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
642       MsgQueueLen--;
643       /* Release the message */
644       temp = msg_tmp->next;
645       if(prev==0)  /* first message */
646         sent_msgs = temp;
647       else
648         prev->next = temp;
649       CmiFree(msg_tmp->msg);
650       CmiFree(msg_tmp);
651       msg_tmp = temp;
652     } else {
653       prev = msg_tmp;
654       msg_tmp = msg_tmp->next;
655     }
656 #if CMK_SMP_TRACE_COMMTHREAD
657     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
658 #endif
659   }
660   end_sent = prev;
661   MACHSTATE(2,"} CmiReleaseSentMessages end");
662 }
663
664 int PumpMsgs(void)
665 {
666   int nbytes, flg, res;
667   char *msg;
668   MPI_Status sts;
669   int recd=0;
670
671 #if CMI_EXERT_RECV_CAP
672   int recvCnt=0;
673 #endif
674         
675 #if CMK_BLUEGENEL
676   MPID_Progress_test();
677 #endif
678
679   MACHSTATE(2,"PumpMsgs begin {");
680
681         
682   while(1) {
683 #if CMI_EXERT_RECV_CAP
684         if(recvCnt==RECV_CAP) break;
685 #endif
686           
687     /* First check posted recvs then do  probe unmatched outstanding messages */
688 #if MPI_POST_RECV_COUNT > 0 
689     int completed_index=-1;
690     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
691         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
692     if(flg){
693         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
694             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
695
696         recd = 1;
697         msg = (char *) CmiAlloc(nbytes);
698         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
699         /* and repost the recv */
700
701         START_EVENT();
702
703         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
704             MPI_POST_RECV_SIZE,
705             MPI_BYTE,
706             MPI_ANY_SOURCE,
707             POST_RECV_TAG,
708             MPI_COMM_WORLD,
709             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
710                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
711
712         END_EVENT(50);
713
714         CpvAccess(Cmi_posted_recv_total)++;
715     }
716     else {
717         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
718         if(res != MPI_SUCCESS)
719         CmiAbort("MPI_Iprobe failed\n");
720         if(!flg) break;
721         recd = 1;
722         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
723         msg = (char *) CmiAlloc(nbytes);
724
725         START_EVENT();
726
727         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
728             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
729
730         END_EVENT(30);
731
732         CpvAccess(Cmi_unposted_recv_total)++;
733     }
734 #else
735     /* Original version */
736 #if CMK_SMP_TRACE_COMMTHREAD
737   double startT = CmiWallTimer(); 
738 #endif
739     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
740     if(res != MPI_SUCCESS)
741       CmiAbort("MPI_Iprobe failed\n");
742
743     if(!flg) break;
744 #if CMK_SMP_TRACE_COMMTHREAD
745     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
746 #endif
747
748     recd = 1;
749     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
750     msg = (char *) CmiAlloc(nbytes);
751     
752     START_EVENT();
753
754     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
755       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
756
757     /*END_EVENT(30);*/
758
759 #endif
760
761 #if CMK_SMP_TRACE_COMMTHREAD
762         traceBeginCommOp(msg);
763         traceChangeLastTimestamp(CpvAccess(projTraceStart));
764         traceEndCommOp(msg);
765         #if CMI_MPI_TRACE_MOREDETAILED
766         char tmp[32];
767         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
768         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
769         #endif
770 #endif  
771         
772     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
773     CMI_CHECK_CHECKSUM(msg, nbytes);
774     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
775       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
776       CmiFree(msg);
777       CmiAbort("Abort!\n");
778       continue;
779     }
780         
781 #if CMK_BROADCAST_SPANNING_TREE
782     if (CMI_BROADCAST_ROOT(msg))
783       SendSpanningChildren(nbytes, msg);
784 #elif CMK_BROADCAST_HYPERCUBE
785     if (CMI_GET_CYCLE(msg))
786       SendHypercube(nbytes, msg);
787 #endif
788         
789         /* In SMP mode, this push operation needs to be executed
790      * after forwarding broadcast messages. If it is executed
791      * earlier, then during the bcast msg forwarding period,    
792          * the msg could be already freed on the worker thread.
793          * As a result, the forwarded message could be wrong! 
794          * --Chao Mei
795          */
796 #if CMK_NODE_QUEUE_AVAILABLE
797     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
798       CmiPushNode(msg);
799     else
800 #endif
801         CmiPushPE(CMI_DEST_RANK(msg), msg);     
802         
803 #if CMI_EXERT_RECV_CAP
804         recvCnt++;
805 #endif  
806   }
807
808   
809 #if CMK_IMMEDIATE_MSG && !CMK_SMP
810   CmiHandleImmediate();
811 #endif
812   
813   MACHSTATE(2,"} PumpMsgs end ");
814   return recd;
815 }
816
817 /* blocking version */
818 static void PumpMsgsBlocking(void)
819 {
820   static int maxbytes = 20000000;
821   static char *buf = NULL;
822   int nbytes, flg;
823   MPI_Status sts;
824   char *msg;
825   int recd=0;
826
827   if (!PCQueueEmpty(CmiGetState()->recv)) return;
828   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
829   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
830   if (sent_msgs)  return;
831
832 #if 0
833   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
834 #endif
835
836   if (buf == NULL) {
837     buf = (char *) CmiAlloc(maxbytes);
838     _MEMCHECK(buf);
839   }
840
841
842 #if MPI_POST_RECV_COUNT > 0
843 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
844 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
845 #endif
846
847   START_EVENT();
848
849   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
850       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
851
852   /*END_EVENT(30);*/
853     
854    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
855    msg = (char *) CmiAlloc(nbytes);
856    memcpy(msg, buf, nbytes);
857
858 #if CMK_SMP_TRACE_COMMTHREAD
859         traceBeginCommOp(msg);
860         traceChangeLastTimestamp(CpvAccess(projTraceStart));
861         traceEndCommOp(msg);
862         #if CMI_MPI_TRACE_MOREDETAILED
863         char tmp[32];
864         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
865         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
866         #endif
867 #endif
868
869 #if CMK_BROADCAST_SPANNING_TREE
870    if (CMI_BROADCAST_ROOT(msg))
871       SendSpanningChildren(nbytes, msg);
872 #elif CMK_BROADCAST_HYPERCUBE
873    if (CMI_GET_CYCLE(msg))
874       SendHypercube(nbytes, msg);
875 #endif
876   
877         /* In SMP mode, this push operation needs to be executed
878      * after forwarding broadcast messages. If it is executed
879      * earlier, then during the bcast msg forwarding period,    
880          * the msg could be already freed on the worker thread.
881          * As a result, the forwarded message could be wrong! 
882          * --Chao Mei
883          */  
884 #if CMK_NODE_QUEUE_AVAILABLE
885    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
886       CmiPushNode(msg);
887    else
888 #endif
889       CmiPushPE(CMI_DEST_RANK(msg), msg);
890 }
891
892 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
893
894 #if CMK_SMP
895
896 static int inexit = 0;
897 static CmiNodeLock  exitLock = 0;
898
899 static int MsgQueueEmpty()
900 {
901   int i;
902 #if MULTI_SENDQUEUE
903   for (i=0; i<_Cmi_mynodesize; i++)
904     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
905 #else
906   return PCQueueEmpty(sendMsgBuf);
907 #endif
908   return 1;
909 }
910
911 static int SendMsgBuf();
912
913 /* test if all processors recv queues are empty */
914 static int RecvQueueEmpty()
915 {
916   int i;
917   for (i=0; i<_Cmi_mynodesize; i++) {
918     CmiState cs=CmiGetStateN(i);
919     if (!PCQueueEmpty(cs->recv)) return 0;
920   }
921   return 1;
922 }
923
924 /**
925 CommunicationServer calls MPI to send messages in the queues and probe message from network.
926 */
927
928 #define REPORT_COMM_METRICS 0
929 #if REPORT_COMM_METRICS
930 static double pumptime = 0.0;
931 static double releasetime = 0.0;
932 static double sendtime = 0.0;
933 #endif
934
935 static void CommunicationServer(int sleepTime)
936 {
937   int static count=0;
938 /*
939   count ++;
940   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
941 */
942 #if REPORT_COMM_METRICS
943   double t1, t2, t3, t4;
944   t1 = CmiWallTimer();
945 #endif
946   PumpMsgs();
947 #if REPORT_COMM_METRICS
948   t2 = CmiWallTimer();
949 #endif
950   CmiReleaseSentMessages();
951 #if REPORT_COMM_METRICS
952   t3 = CmiWallTimer();
953 #endif
954   SendMsgBuf();
955 #if REPORT_COMM_METRICS
956   t4 = CmiWallTimer();
957   pumptime += (t2-t1);
958   releasetime += (t3-t2);
959   sendtime += (t4-t3);
960 #endif
961 /*
962   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
963 */
964   if (inexit == CmiMyNodeSize()) {
965     MACHSTATE(2, "CommunicationServer exiting {");
966 #if 0
967     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
968 #endif
969     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
970       CmiReleaseSentMessages();
971       SendMsgBuf();
972       PumpMsgs();
973     }
974     MACHSTATE(2, "CommunicationServer barrier begin {");
975
976     START_EVENT();
977
978     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
979       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
980
981     END_EVENT(10);
982
983     MACHSTATE(2, "} CommunicationServer barrier end");
984 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
985     if (CmiMyNode() == 0){
986       CmiPrintf("End of program\n");
987     }
988 #endif
989     MACHSTATE(2, "} CommunicationServer EXIT");
990
991     ConverseCommonExit();   
992 #if REPORT_COMM_METRICS
993     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
994 #endif
995
996 #if ! CMK_AUTOBUILD
997     signal(SIGINT, signal_int);
998     MPI_Finalize();
999     #endif
1000     exit(0);
1001   }
1002 }
1003
1004 #endif
1005
1006 static void CommunicationServerThread(int sleepTime)
1007 {
1008 #if CMK_SMP
1009   CommunicationServer(sleepTime);
1010 #endif
1011 #if CMK_IMMEDIATE_MSG
1012   CmiHandleImmediate();
1013 #endif
1014 }
1015
1016 #if CMK_NODE_QUEUE_AVAILABLE
1017 char *CmiGetNonLocalNodeQ(void)
1018 {
1019   CmiState cs = CmiGetState();
1020   char *result = 0;
1021   CmiIdleLock_checkMessage(&cs->idle);
1022 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1023     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1024     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1025     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1026     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1027     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1028 /*  }  */
1029
1030   return result;
1031 }
1032 #endif
1033
1034 void *CmiGetNonLocal(void)
1035 {
1036   static int count=0;
1037   CmiState cs = CmiGetState();
1038   void *msg;
1039
1040 #if ! CMK_SMP
1041   if (CmiNumPes() == 1) return NULL;
1042 #endif
1043
1044   CmiIdleLock_checkMessage(&cs->idle);
1045   /* although it seems that lock is not needed, I found it crashes very often
1046      on mpi-smp without lock */
1047
1048 #if ! CMK_SMP
1049   CmiReleaseSentMessages();
1050   PumpMsgs();
1051 #endif
1052
1053   /* CmiLock(procState[cs->rank].recvLock); */
1054   msg =  PCQueuePop(cs->recv);
1055   /* CmiUnlock(procState[cs->rank].recvLock); */
1056
1057 /*
1058   if (msg) {
1059     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1060   else {
1061     count++;
1062     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1063   }
1064 */
1065 #if ! CMK_SMP
1066   if (no_outstanding_sends) {
1067     while (MsgQueueLen>0) {
1068       CmiReleaseSentMessages();
1069       PumpMsgs();
1070     }
1071   }
1072
1073   if(!msg) {
1074     CmiReleaseSentMessages();
1075     if (PumpMsgs())
1076       return  PCQueuePop(cs->recv);
1077     else
1078       return 0;
1079   }
1080 #endif
1081
1082   return msg;
1083 }
1084
1085 /* called in non-smp mode */
1086 void CmiNotifyIdle(void)
1087 {
1088   CmiReleaseSentMessages();
1089   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1090 }
1091
1092
1093 /********************************************************
1094     The call to probe immediate messages has been renamed to
1095     CmiMachineProgressImpl
1096 ******************************************************/
1097 /* user call to handle immediate message, only useful in non SMP version
1098    using polling method to schedule message.
1099 */
1100 /*
1101 #if CMK_IMMEDIATE_MSG
1102 void CmiProbeImmediateMsg()
1103 {
1104 #if !CMK_SMP
1105   PumpMsgs();
1106   CmiHandleImmediate();
1107 #endif
1108 }
1109 #endif
1110 */
1111
1112 /* Network progress function is used to poll the network when for
1113    messages. This flushes receive buffers on some  implementations*/
1114 #if CMK_MACHINE_PROGRESS_DEFINED
1115 void CmiMachineProgressImpl()
1116 {
1117 #if !CMK_SMP
1118     PumpMsgs();
1119 #if CMK_IMMEDIATE_MSG
1120     CmiHandleImmediate();
1121 #endif
1122 #else
1123     /*Not implemented yet. Communication server does not seem to be
1124       thread safe, so only communication thread call it */
1125     if (CmiMyRank() == CmiMyNodeSize())
1126         CommunicationServerThread(0);
1127 #endif
1128 }
1129 #endif
1130
1131 /********************* MESSAGE SEND FUNCTIONS ******************/
1132
1133 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1134
1135 static void CmiSendSelf(char *msg)
1136 {
1137 #if CMK_IMMEDIATE_MSG
1138     if (CmiIsImmediate(msg)) {
1139       /* CmiBecomeNonImmediate(msg); */
1140       CmiPushImmediateMsg(msg);
1141       CmiHandleImmediate();
1142       return;
1143     }
1144 #endif
1145     CQdCreate(CpvAccess(cQdState), 1);
1146     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1147 }
1148
1149 void CmiSyncSendFn(int destPE, int size, char *msg)
1150 {
1151   CmiState cs = CmiGetState();
1152   char *dupmsg = (char *) CmiAlloc(size);
1153   memcpy(dupmsg, msg, size);
1154
1155   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1156
1157   if (cs->pe==destPE) {
1158     CmiSendSelf(dupmsg);
1159   }
1160   else
1161     CmiAsyncSendFn_(destPE, size, dupmsg);
1162 }
1163
1164 #if CMK_SMP
1165
1166 /* called by communication thread in SMP */
1167 static int SendMsgBuf()
1168 {
1169   SMSG_LIST *msg_tmp;
1170   char *msg;
1171   int node, rank, size;
1172   int i;
1173   int sent = 0;
1174
1175 #if CMI_EXERT_SEND_CAP
1176         int sentCnt = 0;
1177 #endif  
1178         
1179   MACHSTATE(2,"SendMsgBuf begin {");
1180 #if MULTI_SENDQUEUE
1181   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1182   {
1183     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1184     {
1185       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1186 #else
1187     /* single message sending queue */
1188     /* CmiLock(sendMsgBufLock); */
1189     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1190     /* CmiUnlock(sendMsgBufLock); */
1191     while (NULL != msg_tmp)
1192     {
1193 #endif
1194       node = msg_tmp->destpe;
1195       size = msg_tmp->size;
1196       msg = msg_tmp->msg;
1197       msg_tmp->next = 0;
1198       while (MsgQueueLen > request_max) {
1199         CmiReleaseSentMessages();
1200         PumpMsgs();
1201       }
1202       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1203       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1204       CMI_SET_CHECKSUM(msg, size);
1205
1206 #if MPI_POST_RECV_COUNT > 0
1207         if(size <= MPI_POST_RECV_SIZE){
1208
1209           START_EVENT();
1210           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1211                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1212
1213           STOP_EVENT(40);
1214         }
1215         else {
1216             START_EVENT();
1217             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1218                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1219             STOP_EVENT(40);
1220         }
1221 #else
1222         START_EVENT();
1223         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1224             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1225         /*END_EVENT(40);*/
1226 #endif
1227         
1228 #if CMK_SMP_TRACE_COMMTHREAD
1229         traceBeginCommOp(msg);
1230         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1231         /* traceSendMsgComm must execute after traceBeginCommOp because
1232          * we pretend we execute an entry method, and inside this we
1233          * pretend we will send another message. Otherwise how could
1234          * a message creation just before an entry method invocation?
1235          * If such logic is broken, the projections will not trace
1236          * messages correctly! -Chao Mei
1237          */
1238         traceSendMsgComm(msg);
1239         traceEndCommOp(msg);
1240         #if CMI_MPI_TRACE_MOREDETAILED
1241         char tmp[64];
1242         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1243         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1244         #endif
1245 #endif
1246                 
1247                 
1248       MACHSTATE(3,"}MPI_send end");
1249       MsgQueueLen++;
1250       if(sent_msgs==0)
1251         sent_msgs = msg_tmp;
1252       else
1253         end_sent->next = msg_tmp;
1254       end_sent = msg_tmp;
1255       sent=1;
1256           
1257 #if CMI_EXERT_SEND_CAP    
1258           if(++sentCnt == SEND_CAP) break;
1259 #endif    
1260           
1261 #if ! MULTI_SENDQUEUE
1262       /* CmiLock(sendMsgBufLock); */
1263       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1264       /* CmiUnlock(sendMsgBufLock); */
1265 #endif
1266     }
1267 #if MULTI_SENDQUEUE
1268   }
1269 #endif
1270   MACHSTATE(2,"}SendMsgBuf end ");
1271   return sent;
1272 }
1273
1274 void EnqueueMsg(void *m, int size, int node)
1275 {
1276   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1277   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1278   msg_tmp->msg = m;
1279   msg_tmp->size = size;
1280   msg_tmp->destpe = node;
1281         
1282 #if CMK_SMP_TRACE_COMMTHREAD
1283         msg_tmp->srcpe = CmiMyPe();
1284 #endif  
1285
1286 #if MULTI_SENDQUEUE
1287   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1288 #else
1289   CmiLock(sendMsgBufLock);
1290   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1291   CmiUnlock(sendMsgBufLock);
1292 #endif
1293         
1294   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1295 }
1296
1297 #endif
1298
1299 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1300 {
1301   CmiState cs = CmiGetState();
1302   SMSG_LIST *msg_tmp;
1303   CmiUInt2  rank, node;
1304
1305   if(destPE == cs->pe) {
1306     char *dupmsg = (char *) CmiAlloc(size);
1307     memcpy(dupmsg, msg, size);
1308     CmiSendSelf(dupmsg);
1309     return 0;
1310   }
1311   CQdCreate(CpvAccess(cQdState), 1);
1312 #if CMK_SMP
1313   node = CmiNodeOf(destPE);
1314   rank = CmiRankOf(destPE);
1315   if (node == CmiMyNode())  {
1316     CmiPushPE(rank, msg);
1317     return 0;
1318   }
1319   CMI_DEST_RANK(msg) = rank;
1320   EnqueueMsg(msg, size, node);
1321   return 0;
1322 #else
1323   /* non smp */
1324   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1325   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1326   msg_tmp->msg = msg;
1327   msg_tmp->next = 0;
1328   while (MsgQueueLen > request_max) {
1329         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1330         CmiReleaseSentMessages();
1331         PumpMsgs();
1332   }
1333   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1334   CMI_SET_CHECKSUM(msg, size);
1335
1336 #if MPI_POST_RECV_COUNT > 0
1337         if(size <= MPI_POST_RECV_SIZE){
1338
1339           START_EVENT();
1340           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1341                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1342           END_EVENT(40);
1343         }
1344         else {
1345           START_EVENT();
1346           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1347                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1348           END_EVENT(40);
1349         }
1350 #else
1351   START_EVENT();
1352   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1353     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1354   END_EVENT(40);
1355 #endif
1356
1357   MsgQueueLen++;
1358   if(sent_msgs==0)
1359     sent_msgs = msg_tmp;
1360   else
1361     end_sent->next = msg_tmp;
1362   end_sent = msg_tmp;
1363   return (CmiCommHandle) &(msg_tmp->req);
1364 #endif              /* non-smp */
1365 }
1366
1367 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1368 {
1369   CMI_SET_BROADCAST_ROOT(msg, 0);
1370   CmiAsyncSendFn_(destPE, size, msg);
1371 }
1372
1373 void CmiFreeSendFn(int destPE, int size, char *msg)
1374 {
1375   CmiState cs = CmiGetState();
1376   CMI_SET_BROADCAST_ROOT(msg, 0);
1377
1378   if (cs->pe==destPE) {
1379     CmiSendSelf(msg);
1380   } else {
1381     CmiAsyncSendFn_(destPE, size, msg);
1382   }
1383 }
1384
1385 /*********************** BROADCAST FUNCTIONS **********************/
1386
1387 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1388 void CmiSyncSendFn1(int destPE, int size, char *msg)
1389 {
1390   CmiState cs = CmiGetState();
1391   char *dupmsg = (char *) CmiAlloc(size);
1392   memcpy(dupmsg, msg, size);
1393   if (cs->pe==destPE)
1394     CmiSendSelf(dupmsg);
1395   else
1396     CmiAsyncSendFn_(destPE, size, dupmsg);
1397 }
1398
1399 /* send msg to its spanning children in broadcast. G. Zheng */
1400 void SendSpanningChildren(int size, char *msg)
1401 {
1402   CmiState cs = CmiGetState();
1403   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1404   int startnode;
1405   int i, exceptRank;
1406         
1407 #if CMK_SMP
1408    /* first send msgs to other nodes */  
1409   startnode = CmiNodeOf(startpe);  
1410   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1411   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1412     int nd = CmiMyNode()-startnode;
1413     if (nd<0) nd+=CmiNumNodes();
1414     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1415     if (nd > CmiNumNodes() - 1) break;
1416     nd += startnode;
1417     nd = nd%CmiNumNodes();
1418     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1419         /* always send to the first rank of other nodes */
1420         char *newmsg = CmiCopyMsg(msg, size);
1421         CMI_DEST_RANK(newmsg) = 0;
1422     EnqueueMsg(newmsg, size, nd);
1423   }     
1424    /* second send msgs to my peers on this node */
1425   /* FIXME: now it's just a flat p2p send!! When node size is large,
1426    * it should also be sent in a tree
1427    */
1428    exceptRank = CMI_DEST_RANK(msg);
1429    for(i=0; i<exceptRank; i++){
1430            CmiPushPE(i, CmiCopyMsg(msg, size));
1431    }
1432    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1433            CmiPushPE(i, CmiCopyMsg(msg, size));
1434    }
1435 #else
1436   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1437   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1438     int p = cs->pe-startpe;
1439     if (p<0) p+=_Cmi_numpes;
1440     p = BROADCAST_SPANNING_FACTOR*p + i;
1441     if (p > _Cmi_numpes - 1) break;
1442     p += startpe;
1443     p = p%_Cmi_numpes;
1444     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);      
1445     CmiSyncSendFn1(p, size, msg);
1446   }
1447 #endif
1448 }
1449
1450 #include <math.h>
1451
1452 /* send msg along the hypercube in broadcast. (Sameer) */
1453 void SendHypercube(int size, char *msg)
1454 {
1455   CmiState cs = CmiGetState();
1456   int curcycle = CMI_GET_CYCLE(msg);
1457   int i, exceptRank;
1458
1459 #if CMK_SMP
1460    /* first send msgs to other nodes */  
1461   double logp = CmiNumNodes();
1462   logp = log(logp)/log(2.0);
1463   logp = ceil(logp);
1464
1465   for (i = curcycle; i < logp; i++) {
1466     int nd = CmiMyNode() ^ (1 << i);
1467
1468     if(nd < CmiNumNodes()) {
1469       CMI_SET_CYCLE(msg, i + 1);
1470           /* always send to the first rank of other nodes */
1471           char *newmsg = CmiCopyMsg(msg, size);
1472           CMI_DEST_RANK(newmsg) = 0;
1473       EnqueueMsg(newmsg, size, nd);       
1474     }
1475   }     
1476    /* second send msgs to my peers on this node */
1477    /* FIXME: now it's just a flat p2p send!! When node size is large,
1478     * it should also be sent in a tree
1479     */
1480    exceptRank = CMI_DEST_RANK(msg);
1481    for(i=0; i<exceptRank; i++){
1482            CmiPushPE(i, CmiCopyMsg(msg, size));
1483    }
1484    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1485            CmiPushPE(i, CmiCopyMsg(msg, size));
1486    }
1487 #else
1488   double logp = CmiNumPes();
1489   logp = log(logp)/log(2.0);
1490   logp = ceil(logp);
1491
1492   /*  CmiPrintf("In hypercube\n"); */
1493
1494   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1495
1496   for (i = curcycle; i < logp; i++) {
1497     int p = cs->pe ^ (1 << i);
1498
1499     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1500
1501     if(p < CmiNumPes()) {
1502       CMI_SET_CYCLE(msg, i + 1);
1503       CmiSyncSendFn1(p, size, msg);
1504     }
1505   }
1506 #endif  
1507 }
1508
1509 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1510 {
1511   CmiState cs = CmiGetState();
1512
1513 #if CMK_SMP     
1514   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1515   CMI_DEST_RANK(msg) = CmiMyRank();
1516 #endif
1517         
1518 #if CMK_BROADCAST_SPANNING_TREE
1519   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1520   SendSpanningChildren(size, msg);
1521
1522 #elif CMK_BROADCAST_HYPERCUBE
1523   CMI_SET_CYCLE(msg, 0);
1524   SendHypercube(size, msg);
1525
1526 #else
1527   int i;
1528
1529   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1530     CmiSyncSendFn(i, size,msg) ;
1531   for ( i=0; i<cs->pe; i++ )
1532     CmiSyncSendFn(i, size,msg) ;
1533 #endif
1534
1535   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1536 }
1537
1538
1539 /*  FIXME: luckily async is never used  G. Zheng */
1540 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1541 {
1542   CmiState cs = CmiGetState();
1543   int i ;
1544
1545   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1546     CmiAsyncSendFn(i,size,msg) ;
1547   for ( i=0; i<cs->pe; i++ )
1548     CmiAsyncSendFn(i,size,msg) ;
1549
1550   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1551   CmiAbort("CmiAsyncBroadcastFn should never be called");
1552   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1553 }
1554
1555 void CmiFreeBroadcastFn(int size, char *msg)
1556 {
1557    CmiSyncBroadcastFn(size,msg);
1558    CmiFree(msg);
1559 }
1560
1561 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1562 {
1563
1564 #if CMK_SMP     
1565   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1566   CMI_DEST_RANK(msg) = CmiMyRank();
1567 #endif
1568
1569 #if CMK_BROADCAST_SPANNING_TREE
1570   CmiState cs = CmiGetState();
1571   CmiSyncSendFn(cs->pe, size,msg) ;
1572   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1573   SendSpanningChildren(size, msg);
1574
1575 #elif CMK_BROADCAST_HYPERCUBE
1576   CmiState cs = CmiGetState();
1577   CmiSyncSendFn(cs->pe, size,msg) ;
1578   CMI_SET_CYCLE(msg, 0);
1579   SendHypercube(size, msg);
1580
1581 #else
1582     int i ;
1583
1584   for ( i=0; i<_Cmi_numpes; i++ )
1585     CmiSyncSendFn(i,size,msg) ;
1586 #endif
1587
1588   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1589 }
1590
1591 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1592 {
1593   int i ;
1594
1595   for ( i=1; i<_Cmi_numpes; i++ )
1596     CmiAsyncSendFn(i,size,msg) ;
1597
1598   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1599
1600   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1601 }
1602
1603 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1604 {
1605 #if CMK_SMP     
1606   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1607   CMI_DEST_RANK(msg) = CmiMyRank();
1608 #endif
1609
1610 #if CMK_BROADCAST_SPANNING_TREE
1611   CmiState cs = CmiGetState();
1612   CmiSyncSendFn(cs->pe, size,msg) ;
1613   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1614   SendSpanningChildren(size, msg);
1615
1616 #elif CMK_BROADCAST_HYPERCUBE
1617   CmiState cs = CmiGetState();
1618   CmiSyncSendFn(cs->pe, size,msg) ;
1619   CMI_SET_CYCLE(msg, 0);
1620   SendHypercube(size, msg);
1621
1622 #else
1623   int i ;
1624
1625   for ( i=0; i<_Cmi_numpes; i++ )
1626     CmiSyncSendFn(i,size,msg) ;
1627 #endif
1628   CmiFree(msg) ;
1629   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1630 }
1631
1632 #if CMK_NODE_QUEUE_AVAILABLE
1633
1634 static void CmiSendNodeSelf(char *msg)
1635 {
1636 #if CMK_IMMEDIATE_MSG
1637 #if 0
1638     if (CmiIsImmediate(msg) && !_immRunning) {
1639       /*CmiHandleImmediateMessage(msg); */
1640       CmiPushImmediateMsg(msg);
1641       CmiHandleImmediate();
1642       return;
1643     }
1644 #endif
1645     if (CmiIsImmediate(msg))
1646     {
1647       CmiPushImmediateMsg(msg);
1648       if (!_immRunning) CmiHandleImmediate();
1649       return;
1650     }
1651 #endif
1652     CQdCreate(CpvAccess(cQdState), 1);
1653     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1654     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1655     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1656 }
1657
1658 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1659 {
1660   int i;
1661   SMSG_LIST *msg_tmp;
1662   char *dupmsg;
1663
1664   CMI_SET_BROADCAST_ROOT(msg, 0);
1665   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1666   switch (dstNode) {
1667   case NODE_BROADCAST_ALL:
1668     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1669   case NODE_BROADCAST_OTHERS:
1670     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1671     for (i=0; i<_Cmi_numnodes; i++)
1672       if (i!=_Cmi_mynode) {
1673         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1674       }
1675     break;
1676   default:
1677     dupmsg = (char *)CmiCopyMsg(msg,size);
1678     if(dstNode == _Cmi_mynode) {
1679       CmiSendNodeSelf(dupmsg);
1680     }
1681     else {
1682       CQdCreate(CpvAccess(cQdState), 1);
1683       EnqueueMsg(dupmsg, size, dstNode);
1684     }
1685   }
1686   return 0;
1687 }
1688
1689 void CmiSyncNodeSendFn(int p, int s, char *m)
1690 {
1691   CmiAsyncNodeSendFn(p, s, m);
1692 }
1693
1694 /* need */
1695 void CmiFreeNodeSendFn(int p, int s, char *m)
1696 {
1697   CmiAsyncNodeSendFn(p, s, m);
1698   CmiFree(m);
1699 }
1700
1701 /* need */
1702 void CmiSyncNodeBroadcastFn(int s, char *m)
1703 {
1704   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1705 }
1706
1707 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1708 {
1709 }
1710
1711 /* need */
1712 void CmiFreeNodeBroadcastFn(int s, char *m)
1713 {
1714   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1715   CmiFree(m);
1716 }
1717
1718 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1719 {
1720   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1721 }
1722
1723 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1724 {
1725   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1726 }
1727
1728 /* need */
1729 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1730 {
1731   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1732   CmiFree(m);
1733 }
1734 #endif
1735
1736 /************************** MAIN ***********************************/
1737 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1738
1739 void ConverseExit(void)
1740 {
1741 #if ! CMK_SMP
1742   while(!CmiAllAsyncMsgsSent()) {
1743     PumpMsgs();
1744     CmiReleaseSentMessages();
1745   }
1746   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1747     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1748
1749   ConverseCommonExit();
1750 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1751   if (CmiMyPe() == 0){
1752     CmiPrintf("End of program\n");
1753 #if MPI_POST_RECV_COUNT > 0
1754     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1755 #endif
1756 }
1757 #endif
1758 #if ! CMK_AUTOBUILD
1759   signal(SIGINT, signal_int);
1760   MPI_Finalize();
1761 #endif
1762   exit(0);
1763
1764 #else
1765     /* SMP version, communication thread will exit */
1766   ConverseCommonExit();
1767   /* atomic increment */
1768   CmiLock(exitLock);
1769   inexit++;
1770   CmiUnlock(exitLock);
1771   while (1) CmiYield();
1772 #endif
1773 }
1774
1775 static void registerMPITraceEvents() {
1776 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1777     traceRegisterUserEvent("MPI_Barrier", 10);
1778     traceRegisterUserEvent("MPI_Send", 20);
1779     traceRegisterUserEvent("MPI_Recv", 30);
1780     traceRegisterUserEvent("MPI_Isend", 40);
1781     traceRegisterUserEvent("MPI_Irecv", 50);
1782     traceRegisterUserEvent("MPI_Test", 60);
1783     traceRegisterUserEvent("MPI_Iprobe", 70);
1784 #endif
1785 }
1786
1787
1788 static char     **Cmi_argv;
1789 static char     **Cmi_argvcopy;
1790 static CmiStartFn Cmi_startfn;   /* The start function */
1791 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1792
1793 typedef struct {
1794   int sleepMs; /*Milliseconds to sleep while idle*/
1795   int nIdles; /*Number of times we've been idle in a row*/
1796   CmiState cs; /*Machine state*/
1797 } CmiIdleState;
1798
1799 static CmiIdleState *CmiNotifyGetState(void)
1800 {
1801   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1802   s->sleepMs=0;
1803   s->nIdles=0;
1804   s->cs=CmiGetState();
1805   return s;
1806 }
1807
1808 static void CmiNotifyBeginIdle(CmiIdleState *s)
1809 {
1810   s->sleepMs=0;
1811   s->nIdles=0;
1812 }
1813
1814 static void CmiNotifyStillIdle(CmiIdleState *s)
1815 {
1816 #if ! CMK_SMP
1817   CmiReleaseSentMessages();
1818   PumpMsgs();
1819 #else
1820 /*  CmiYield();  */
1821 #endif
1822
1823 #if 1
1824   {
1825   int nSpins=20; /*Number of times to spin before sleeping*/
1826   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1827   s->nIdles++;
1828   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1829     s->sleepMs+=2;
1830     if (s->sleepMs>10) s->sleepMs=10;
1831   }
1832   /*Comm. thread will listen on sockets-- just sleep*/
1833   if (s->sleepMs>0) {
1834     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1835     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1836     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1837   }
1838   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1839   }
1840 #endif
1841 }
1842
1843 #if MACHINE_DEBUG_LOG
1844 FILE *debugLog = NULL;
1845 #endif
1846
1847 static int machine_exit_idx;
1848 static void machine_exit(char *m) {
1849   EmergencyExit();
1850   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1851   fflush(stdout);
1852   CmiNodeBarrier();
1853   if (CmiMyRank() == 0) {
1854     MPI_Barrier(MPI_COMM_WORLD);
1855     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1856     MPI_Abort(MPI_COMM_WORLD, 1);
1857   } else {
1858     while (1) CmiYield();
1859   }
1860 }
1861
1862 static void KillOnAllSigs(int sigNo) {
1863   static int already_in_signal_handler = 0;
1864   char *m;
1865   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1866   already_in_signal_handler = 1;
1867 #if CMK_CCS_AVAILABLE
1868   if (CpvAccess(cmiArgDebugFlag)) {
1869     CpdNotify(CPD_SIGNAL, sigNo);
1870     CpdFreeze();
1871   }
1872 #endif
1873   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1874       "Signal: %d\n",CmiMyPe(),sigNo);
1875   CmiPrintStackTrace(1);
1876
1877   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1878   CmiSetHandler(m, machine_exit_idx);
1879   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1880   machine_exit(m);
1881 }
1882
1883 static void ConverseRunPE(int everReturn)
1884 {
1885   CmiIdleState *s=CmiNotifyGetState();
1886   CmiState cs;
1887   char** CmiMyArgv;
1888
1889   CmiNodeAllBarrier();
1890
1891   cs = CmiGetState();
1892   CpvInitialize(void *,CmiLocalQueue);
1893   CpvAccess(CmiLocalQueue) = cs->localqueue;
1894
1895   if (CmiMyRank())
1896     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1897   else
1898     CmiMyArgv=Cmi_argv;
1899
1900   CthInit(CmiMyArgv);
1901
1902   ConverseCommonInit(CmiMyArgv);
1903   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1904
1905 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1906   CpvInitialize(double, projTraceStart);
1907   /* only PE 0 needs to care about registration (to generate sts file). */
1908   if (CmiMyPe() == 0) {
1909     registerMachineUserEventsFunction(&registerMPITraceEvents);
1910   }
1911 #endif
1912
1913   /* initialize the network progress counter*/
1914   /* Network progress function is used to poll the network when for
1915      messages. This flushes receive buffers on some  implementations*/
1916   CpvInitialize(int , networkProgressCount);
1917   CpvAccess(networkProgressCount) = 0;
1918
1919 #if CMK_SMP
1920   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1921   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1922 #else
1923   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1924 #endif
1925
1926 #if MACHINE_DEBUG_LOG
1927   if (CmiMyRank() == 0) {
1928     char ln[200];
1929     sprintf(ln,"debugLog.%d",CmiMyNode());
1930     debugLog=fopen(ln,"w");
1931   }
1932 #endif
1933
1934   /* Converse initialization finishes, immediate messages can be processed.
1935      node barrier previously should take care of the node synchronization */
1936   _immediateReady = 1;
1937
1938   /* communication thread */
1939   if (CmiMyRank() == CmiMyNodeSize()) {
1940     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1941     while (1) CommunicationServerThread(5);
1942   }
1943   else {  /* worker thread */
1944   if (!everReturn) {
1945     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1946     if (Cmi_usrsched==0) CsdScheduler(-1);
1947     ConverseExit();
1948   }
1949   }
1950 }
1951
1952 static char *thread_level_tostring(int thread_level)
1953 {
1954 #if CMK_MPI_INIT_THREAD
1955   switch (thread_level) {
1956   case MPI_THREAD_SINGLE:
1957       return "MPI_THREAD_SINGLE";
1958   case MPI_THREAD_FUNNELED:
1959       return "MPI_THREAD_FUNNELED";
1960   case MPI_THREAD_SERIALIZED:
1961       return "MPI_THREAD_SERIALIZED";
1962   case MPI_THREAD_MULTIPLE :
1963       return "MPI_THREAD_MULTIPLE ";
1964   default: {
1965       char *str = (char*)malloc(5);
1966       sprintf(str,"%d", thread_level);
1967       return str;
1968       }
1969   }
1970   return  "unknown";
1971 #else
1972   char *str = (char*)malloc(5);
1973   sprintf(str,"%d", thread_level);
1974   return str;
1975 #endif
1976 }
1977
1978 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1979 {
1980   int n,i;
1981   int ver, subver;
1982   int provided;
1983   int thread_level;
1984
1985 #if MACHINE_DEBUG
1986   debugLog=NULL;
1987 #endif
1988 #if CMK_USE_HP_MAIN_FIX
1989 #if FOR_CPLUS
1990   _main(argc,argv);
1991 #endif
1992 #endif
1993
1994 #if CMK_MPI_INIT_THREAD
1995 #if CMK_SMP
1996   thread_level = MPI_THREAD_FUNNELED;
1997 #else
1998   thread_level = MPI_THREAD_SINGLE;
1999 #endif
2000   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2001   _thread_provided = provided;
2002 #else
2003   MPI_Init(&argc, &argv);
2004   thread_level = 0;
2005   provided = -1;
2006 #endif
2007   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2008   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2009
2010   MPI_Get_version(&ver, &subver);
2011   if (_Cmi_mynode == 0) {
2012     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2013   }
2014
2015   /* processor per node */
2016   _Cmi_mynodesize = 1;
2017   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2018     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2019 #if ! CMK_SMP
2020   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2021     CmiAbort("+ppn cannot be used in non SMP version!\n");
2022 #endif
2023   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2024   if (idleblock && _Cmi_mynode == 0) {
2025     printf("Charm++: Running in idle blocking mode.\n");
2026   }
2027
2028   /* setup signal handlers */
2029   signal(SIGSEGV, KillOnAllSigs);
2030   signal(SIGFPE, KillOnAllSigs);
2031   signal(SIGILL, KillOnAllSigs);
2032   signal_int = signal(SIGINT, KillOnAllSigs);
2033   signal(SIGTERM, KillOnAllSigs);
2034   signal(SIGABRT, KillOnAllSigs);
2035 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2036   signal(SIGQUIT, KillOnAllSigs);
2037   signal(SIGBUS, KillOnAllSigs);
2038 /*#     if CMK_HANDLE_SIGUSR
2039   signal(SIGUSR1, HandleUserSignals);
2040   signal(SIGUSR2, HandleUserSignals);
2041 #     endif*/
2042 #   endif /*UNIX*/
2043   
2044 #if CMK_NO_OUTSTANDING_SENDS
2045   no_outstanding_sends=1;
2046 #endif
2047   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2048     no_outstanding_sends = 1;
2049     if (_Cmi_mynode == 0)
2050       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2051         no_outstanding_sends?"":" not");
2052   }
2053   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2054   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2055   Cmi_argvcopy = CmiCopyArgs(argv);
2056   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2057   /* find dim = log2(numpes), to pretend we are a hypercube */
2058   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2059     Cmi_dim++ ;
2060  /* CmiSpanTreeInit();*/
2061   request_max=MAX_QLEN;
2062   CmiGetArgInt(argv,"+requestmax",&request_max);
2063   /*printf("request max=%d\n", request_max);*/
2064
2065   /* checksum flag */
2066   if (CmiGetArgFlag(argv,"+checksum")) {
2067 #if !CMK_OPTIMIZE
2068     checksum_flag = 1;
2069     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2070 #else
2071     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2072 #endif
2073   }
2074
2075   {
2076   int debug = CmiGetArgFlag(argv,"++debug");
2077   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2078   if (debug || debug_no_pause)
2079   {   /*Pause so user has a chance to start and attach debugger*/
2080 #if CMK_HAS_GETPID
2081     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2082     fflush(stdout);
2083     if (!debug_no_pause)
2084       sleep(15);
2085 #else
2086     printf("++debug ignored.\n");
2087 #endif
2088   }
2089   }
2090
2091 #if MPI_POST_RECV_COUNT > 0
2092
2093   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2094   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2095   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2096   CpvInitialize(char*,CmiPostedRecvBuffers);
2097
2098     /* Post some extra recvs to help out with incoming messages */
2099     /* On some MPIs the messages are unexpected and thus slow */
2100
2101     /* An array of request handles for posted recvs */
2102     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2103
2104     /* An array of buffers for posted recvs */
2105     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2106
2107     /* Post Recvs */
2108     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2109         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2110                     MPI_POST_RECV_SIZE,
2111                     MPI_BYTE,
2112                     MPI_ANY_SOURCE,
2113                     POST_RECV_TAG,
2114                     MPI_COMM_WORLD,
2115                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2116           CmiAbort("MPI_Irecv failed\n");
2117     }
2118
2119 #endif
2120
2121
2122
2123   /* CmiTimerInit(); */
2124
2125 #if 0
2126   CthInit(argv);
2127   ConverseCommonInit(argv);
2128
2129   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2130   if (initret==0) {
2131     fn(CmiGetArgc(argv), argv);
2132     if (usched==0) CsdScheduler(-1);
2133     ConverseExit();
2134   }
2135 #endif
2136
2137   CsvInitialize(CmiNodeState, NodeState);
2138   CmiNodeStateInit(&CsvAccess(NodeState));
2139
2140   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2141
2142   for (i=0; i<_Cmi_mynodesize+1; i++) {
2143 #if MULTI_SENDQUEUE
2144     procState[i].sendMsgBuf = PCQueueCreate();
2145 #endif
2146     procState[i].recvLock = CmiCreateLock();
2147   }
2148 #if CMK_SMP
2149 #if !MULTI_SENDQUEUE
2150   sendMsgBuf = PCQueueCreate();
2151   sendMsgBufLock = CmiCreateLock();
2152 #endif
2153   exitLock = CmiCreateLock();            /* exit count lock */
2154 #endif
2155
2156   /* Network progress function is used to poll the network when for
2157      messages. This flushes receive buffers on some  implementations*/
2158   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2159   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2160
2161   CmiStartThreads(argv);
2162   ConverseRunPE(initret);
2163 }
2164
2165 /***********************************************************************
2166  *
2167  * Abort function:
2168  *
2169  ************************************************************************/
2170
2171 void CmiAbort(const char *message)
2172 {
2173   char *m;
2174   /* if CharmDebug is attached simply try to send a message to it */
2175 #if CMK_CCS_AVAILABLE
2176   if (CpvAccess(cmiArgDebugFlag)) {
2177     CpdNotify(CPD_ABORT, message);
2178     CpdFreeze();
2179   }
2180 #endif  
2181   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2182         "Reason: %s\n",CmiMyPe(),message);
2183  /*  CmiError(message); */
2184   CmiPrintStackTrace(0);
2185   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2186   CmiSetHandler(m, machine_exit_idx);
2187   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2188   machine_exit(m);
2189   /* Program never reaches here */
2190   MPI_Abort(MPI_COMM_WORLD, 1);
2191 }
2192
2193
2194 #if 0
2195
2196 /* ****************************************************************** */
2197 /*    The following internal functions implement recd msg queue       */
2198 /* ****************************************************************** */
2199
2200 static void ** AllocBlock(unsigned int len)
2201 {
2202   void ** blk;
2203
2204   blk=(void **)CmiAlloc(len*sizeof(void *));
2205   if(blk==(void **)0) {
2206     CmiError("Cannot Allocate Memory!\n");
2207     MPI_Abort(MPI_COMM_WORLD, 1);
2208   }
2209   return blk;
2210 }
2211
2212 static void
2213 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2214 {
2215   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2216   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2217 }
2218
2219 void recdQueueInit(void)
2220 {
2221   recdQueue_blk = AllocBlock(BLK_LEN);
2222   recdQueue_blk_len = BLK_LEN;
2223   recdQueue_first = 0;
2224   recdQueue_len = 0;
2225 }
2226
2227 void recdQueueAddToBack(void *element)
2228 {
2229 #if NODE_0_IS_CONVHOST
2230   inside_comm = 1;
2231 #endif
2232   if(recdQueue_len==recdQueue_blk_len) {
2233     void **blk;
2234     recdQueue_blk_len *= 3;
2235     blk = AllocBlock(recdQueue_blk_len);
2236     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2237     CmiFree(recdQueue_blk);
2238     recdQueue_blk = blk;
2239     recdQueue_first = 0;
2240   }
2241   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2242 #if NODE_0_IS_CONVHOST
2243   inside_comm = 0;
2244 #endif
2245 }
2246
2247
2248 void * recdQueueRemoveFromFront(void)
2249 {
2250   if(recdQueue_len) {
2251     void *element;
2252     element = recdQueue_blk[recdQueue_first++];
2253     recdQueue_first %= recdQueue_blk_len;
2254     recdQueue_len--;
2255     return element;
2256   }
2257   return 0;
2258 }
2259
2260 #endif
2261
2262 /*@}*/