a0b717ea0496566341e24dec626a6a3ecca6d691
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #if CMK_SMP
51 /* currently only considering the smp case */
52 #define CMI_DYNAMIC_EXERT_CAP 0
53 /* This macro defines the max number of msgs in the sender msg buffer 
54  * that is allowed for recving operation to continue
55  */
56 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 8
57 #define CMI_DYNAMIC_MAXCAPSIZE 1000
58 #define CMI_DYNAMIC_SEND_CAPSIZE 10
59 #define CMI_DYNAMIC_RECV_CAPSIZE 10
60 /* initial values, -1 indiates there's no cap */
61 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
62 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
63 #endif
64
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 3
67 #endif
68
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
72
73
74 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
75 #define CMI_MPI_TRACE_MOREDETAILED 0
76 #undef CMI_MPI_TRACE_USEREVENTS
77 #define CMI_MPI_TRACE_USEREVENTS 1
78 #else
79 #undef CMK_SMP_TRACE_COMMTHREAD
80 #define CMK_SMP_TRACE_COMMTHREAD 0
81 #endif
82
83 #define CMK_TRACE_COMMOVERHEAD 0
84 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
85 #undef CMI_MPI_TRACE_USEREVENTS
86 #define CMI_MPI_TRACE_USEREVENTS 1
87 #else
88 #undef CMK_TRACE_COMMOVERHEAD
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #endif
91
92 #include "machine.h"
93
94 #include "pcqueue.h"
95
96 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
97
98 #if CMK_BLUEGENEL
99 #define MAX_QLEN 8
100 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
101 #else
102 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
103 #define MAX_QLEN 200
104 #endif
105
106 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 # define  START_EVENT()
112 # define  END_EVENT(x)
113 #endif
114
115 /*
116     To reduce the buffer used in broadcast and distribute the load from
117   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
118   spanning tree broadcast algorithm.
119     This will use the fourth short in message as an indicator of spanning tree
120   root.
121 */
122 #define CMK_BROADCAST_SPANNING_TREE    1
123 #define CMK_BROADCAST_HYPERCUBE        0
124
125 #define BROADCAST_SPANNING_FACTOR      4
126 /* The number of children used when a msg is broadcast inside a node */
127 #define BROADCAST_SPANNING_INTRA_FACTOR      8
128
129 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
130 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
131
132 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
133 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
134
135 /* FIXME: need a random number that everyone agrees ! */
136 #define CHARM_MAGIC_NUMBER               126
137
138 #if CMK_ERROR_CHECKING
139 static int checksum_flag = 0;
140 #define CMI_SET_CHECKSUM(msg, len)      \
141         if (checksum_flag)  {   \
142           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
143           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
144         }
145 #define CMI_CHECK_CHECKSUM(msg, len)    \
146         if (checksum_flag)      \
147           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
148             CmiAbort("Fatal error: checksum doesn't agree!\n");
149 #else
150 #define CMI_SET_CHECKSUM(msg, len)
151 #define CMI_CHECK_CHECKSUM(msg, len)
152 #endif
153
154 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
155 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
156 #else
157 #define CMI_SET_BROADCAST_ROOT(msg, root) 
158 #endif
159
160
161 /** 
162     If MPI_POST_RECV is defined, we provide default values for size 
163     and number of posted recieves. If MPI_POST_RECV_COUNT is set
164     then a default value for MPI_POST_RECV_SIZE is used if not specified
165     by the user.
166 */
167 #ifdef MPI_POST_RECV
168 #define MPI_POST_RECV_COUNT 10
169 #undef MPI_POST_RECV
170 #endif
171 #if MPI_POST_RECV_COUNT > 0
172 #warning "Using MPI posted receives which have not yet been tested"
173 #ifndef MPI_POST_RECV_SIZE
174 #define MPI_POST_RECV_SIZE 200
175 #endif
176 /* #undef  MPI_POST_RECV_DEBUG  */
177 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
178 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
179 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
180 CpvDeclare(char*,CmiPostedRecvBuffers);
181 #endif
182
183 /*
184  to avoid MPI's in order delivery, changing MPI Tag all the time
185 */
186 #define TAG     1375
187
188 #if MPI_POST_RECV_COUNT > 0
189 #define POST_RECV_TAG TAG+1
190 #define BARRIER_ZERO_TAG TAG
191 #else
192 #define BARRIER_ZERO_TAG     1375
193 #endif
194
195 #include <signal.h>
196 void (*signal_int)(int);
197
198 /*
199 static int mpi_tag = TAG;
200 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
201 */
202
203 static int        _thread_provided = -1;
204 int               _Cmi_numpes;
205 int               _Cmi_mynode;    /* Which address space am I */
206 int               _Cmi_mynodesize;/* Number of processors in my address space */
207 int               _Cmi_numnodes;  /* Total number of address spaces */
208 int               _Cmi_numpes;    /* Total number of processors */
209 static int        Cmi_nodestart; /* First processor in this address space */
210 CpvDeclare(void*, CmiLocalQueue);
211
212 /*Network progress utility variables. Period controls the rate at
213   which the network poll is called */
214 CpvDeclare(unsigned , networkProgressCount);
215 int networkProgressPeriod;
216
217 int               idleblock = 0;
218
219 #define BLK_LEN  512
220
221 #if CMK_NODE_QUEUE_AVAILABLE
222 #define DGRAM_NODEMESSAGE   (0xFB)
223
224 #define NODE_BROADCAST_OTHERS (-1)
225 #define NODE_BROADCAST_ALL    (-2)
226 #endif
227
228 #if 0
229 static void **recdQueue_blk;
230 static unsigned int recdQueue_blk_len;
231 static unsigned int recdQueue_first;
232 static unsigned int recdQueue_len;
233 static void recdQueueInit(void);
234 static void recdQueueAddToBack(void *element);
235 static void *recdQueueRemoveFromFront(void);
236 #endif
237
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 typedef struct msg_list {
243      char *msg;
244      struct msg_list *next;
245      int size, destpe;
246
247 #if CMK_SMP_TRACE_COMMTHREAD
248         int srcpe;
249 #endif  
250         
251      MPI_Request req;
252 } SMSG_LIST;
253
254 int MsgQueueLen=0;
255 static int request_max;
256
257 static SMSG_LIST *sent_msgs=0;
258 static SMSG_LIST *end_sent=0;
259
260 static int Cmi_dim;
261
262 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
263
264 #if NODE_0_IS_CONVHOST
265 int inside_comm = 0;
266 #endif
267
268 void CmiAbort(const char *message);
269 static void PerrorExit(const char *msg);
270
271 void SendSpanningChildren(int size, char *msg);
272 void SendHypercube(int size, char *msg);
273
274 static void PerrorExit(const char *msg)
275 {
276   perror(msg);
277   exit(1);
278 }
279
280 extern unsigned char computeCheckSum(unsigned char *data, int len);
281
282 /**************************  TIMER FUNCTIONS **************************/
283
284 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
285
286 /* MPI calls are not threadsafe, even the timer on some machines */
287 static CmiNodeLock  timerLock = 0;
288 static int _absoluteTime = 0;
289 static double starttimer = 0;
290 static int _is_global = 0;
291
292 int CmiTimerIsSynchronized()
293 {
294   int  flag;
295   void *v;
296
297   /*  check if it using synchronized timer */
298   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
299     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
300   if (flag) {
301     _is_global = *(int*)v;
302     if (_is_global && CmiMyPe() == 0)
303       printf("Charm++> MPI timer is synchronized!\n");
304   }
305   return _is_global;
306 }
307
308 int CmiTimerAbsolute()
309 {       
310   return _absoluteTime;
311 }
312
313 double CmiStartTimer()
314 {
315   return 0.0;
316 }
317
318 double CmiInitTime()
319 {
320   return starttimer;
321 }
322
323 void CmiTimerInit(char **argv)
324 {
325   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
326
327   _is_global = CmiTimerIsSynchronized();
328
329   if (_is_global) {
330     if (CmiMyRank() == 0) {
331       double minTimer;
332 #if CMK_TIMER_USE_XT3_DCLOCK
333       starttimer = dclock();
334 #else
335       starttimer = MPI_Wtime();
336 #endif
337
338       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
339                                   MPI_COMM_WORLD );
340       starttimer = minTimer;
341     }
342   }
343   else {  /* we don't have a synchronous timer, set our own start time */
344     CmiBarrier();
345     CmiBarrier();
346     CmiBarrier();
347 #if CMK_TIMER_USE_XT3_DCLOCK
348     starttimer = dclock();
349 #else
350     starttimer = MPI_Wtime();
351 #endif
352   }
353
354 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
355   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
356     timerLock = CmiCreateLock();
357 #endif
358   CmiNodeAllBarrier();          /* for smp */
359 }
360
361 /**
362  * Since the timerLock is never created, and is
363  * always NULL, then all the if-condition inside
364  * the timer functions could be disabled right
365  * now in the case of SMP. --Chao Mei
366  */
367 double CmiTimer(void)
368 {
369   double t;
370 #if 0 && CMK_SMP
371   if (timerLock) CmiLock(timerLock);
372 #endif
373
374 #if CMK_TIMER_USE_XT3_DCLOCK
375   t = dclock();
376 #else
377   t = MPI_Wtime();
378 #endif
379
380 #if 0 && CMK_SMP
381   if (timerLock) CmiUnlock(timerLock);
382 #endif
383
384   return _absoluteTime?t: (t-starttimer);
385 }
386
387 double CmiWallTimer(void)
388 {
389   double t;
390 #if 0 && CMK_SMP
391   if (timerLock) CmiLock(timerLock);
392 #endif
393
394 #if CMK_TIMER_USE_XT3_DCLOCK
395   t = dclock();
396 #else
397   t = MPI_Wtime();
398 #endif
399
400 #if 0 && CMK_SMP
401   if (timerLock) CmiUnlock(timerLock);
402 #endif
403
404   return _absoluteTime? t: (t-starttimer);
405 }
406
407 double CmiCpuTimer(void)
408 {
409   double t;
410 #if 0 && CMK_SMP
411   if (timerLock) CmiLock(timerLock);
412 #endif
413 #if CMK_TIMER_USE_XT3_DCLOCK
414   t = dclock() - starttimer;
415 #else
416   t = MPI_Wtime() - starttimer;
417 #endif
418 #if 0 && CMK_SMP
419   if (timerLock) CmiUnlock(timerLock);
420 #endif
421   return t;
422 }
423
424 #endif
425
426 /* must be called on all ranks including comm thread in SMP */
427 int CmiBarrier()
428 {
429 #if CMK_SMP
430     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
431   CmiNodeAllBarrier();
432   if (CmiMyRank() == CmiMyNodeSize()) 
433 #else
434   if (CmiMyRank() == 0) 
435 #endif
436   {
437 /**
438  *  The call of CmiBarrier is usually before the initialization
439  *  of trace module of Charm++, therefore, the START_EVENT
440  *  and END_EVENT are disabled here. -Chao Mei
441  */     
442     /*START_EVENT();*/
443
444     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
445         CmiAbort("Timernit: MPI_Barrier failed!\n");
446
447     /*END_EVENT(10);*/
448   }
449   CmiNodeAllBarrier();
450   return 0;
451 }
452
453 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
454 int CmiBarrierZero()
455 {
456   int i;
457 #if CMK_SMP
458   if (CmiMyRank() == CmiMyNodeSize()) 
459 #else
460   if (CmiMyRank() == 0) 
461 #endif
462   {
463     char msg[1];
464     MPI_Status sts;
465     if (CmiMyNode() == 0)  {
466       for (i=0; i<CmiNumNodes()-1; i++) {
467          START_EVENT();
468
469          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
470             CmiPrintf("MPI_Recv failed!\n");
471
472          END_EVENT(30);
473       }
474     }
475     else {
476       START_EVENT();
477
478       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
479          printf("MPI_Send failed!\n");
480
481       END_EVENT(20);
482     }
483   }
484   CmiNodeAllBarrier();
485   return 0;
486 }
487
488 typedef struct ProcState {
489 #if MULTI_SENDQUEUE
490 PCQueue      sendMsgBuf;       /* per processor message sending queue */
491 #endif
492 CmiNodeLock  recvLock;              /* for cs->recv */
493 } ProcState;
494
495 static ProcState  *procState;
496
497 #if CMK_SMP
498
499 #if !MULTI_SENDQUEUE
500 static PCQueue sendMsgBuf;
501 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
502 #endif
503
504 #endif
505
506 /************************************************************
507  *
508  * Processor state structure
509  *
510  ************************************************************/
511
512 /* fake Cmi_charmrun_fd */
513 static int Cmi_charmrun_fd = 0;
514 #include "machine-smp.c"
515
516 CsvDeclare(CmiNodeState, NodeState);
517
518 #include "immediate.c"
519
520 #if CMK_SHARED_VARS_UNAVAILABLE
521 /************ non SMP **************/
522 static struct CmiStateStruct Cmi_state;
523 int _Cmi_mype;
524 int _Cmi_myrank;
525
526 void CmiMemLock() {}
527 void CmiMemUnlock() {}
528
529 #define CmiGetState() (&Cmi_state)
530 #define CmiGetStateN(n) (&Cmi_state)
531
532 void CmiYield(void) { sleep(0); }
533
534 static void CmiStartThreads(char **argv)
535 {
536   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
537   _Cmi_mype = Cmi_nodestart;
538   _Cmi_myrank = 0;
539 }
540 #endif  /* non smp */
541
542 /*Add a message to this processor's receive queue, pe is a rank */
543 void CmiPushPE(int pe,void *msg)
544 {
545   CmiState cs = CmiGetStateN(pe);
546   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
547 #if CMK_IMMEDIATE_MSG
548   if (CmiIsImmediate(msg)) {
549 /*
550 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
551     CmiHandleMessage(msg);
552 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
553 */
554     /**(CmiUInt2 *)msg = pe;*/
555     CMI_DEST_RANK(msg) = pe;
556     CmiPushImmediateMsg(msg);
557     return;
558   }
559 #endif
560
561 #if CMK_SMP
562   CmiLock(procState[pe].recvLock);
563 #endif
564   PCQueuePush(cs->recv,msg);
565 #if CMK_SMP
566   CmiUnlock(procState[pe].recvLock);
567 #endif
568   CmiIdleLock_addMessage(&cs->idle);
569   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
570 }
571
572 #if CMK_NODE_QUEUE_AVAILABLE
573 /*Add a message to this processor's receive queue */
574 static void CmiPushNode(void *msg)
575 {
576   MACHSTATE(3,"Pushing message into NodeRecv queue");
577 #if CMK_IMMEDIATE_MSG
578   if (CmiIsImmediate(msg)) {
579     CMI_DEST_RANK(msg) = 0;
580     CmiPushImmediateMsg(msg);
581     return;
582   }
583 #endif
584   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
585   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
586   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
587   {
588   CmiState cs=CmiGetStateN(0);
589   CmiIdleLock_addMessage(&cs->idle);
590   }
591 }
592 #endif
593
594 #ifndef CmiMyPe
595 int CmiMyPe(void)
596 {
597   return CmiGetState()->pe;
598 }
599 #endif
600
601 #ifndef CmiMyRank
602 int CmiMyRank(void)
603 {
604   return CmiGetState()->rank;
605 }
606 #endif
607
608 #ifndef CmiNodeFirst
609 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
610 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
611 #endif
612
613 #ifndef CmiNodeOf
614 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
615 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
616 #endif
617
618 static size_t CmiAllAsyncMsgsSent(void)
619 {
620    SMSG_LIST *msg_tmp = sent_msgs;
621    MPI_Status sts;
622    int done;
623
624    while(msg_tmp!=0) {
625     done = 0;
626     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
627       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
628     if(!done)
629       return 0;
630     msg_tmp = msg_tmp->next;
631 /*    MsgQueueLen--; ????? */
632    }
633    return 1;
634 }
635
636 int CmiAsyncMsgSent(CmiCommHandle c) {
637
638   SMSG_LIST *msg_tmp = sent_msgs;
639   int done;
640   MPI_Status sts;
641
642   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
643     msg_tmp = msg_tmp->next;
644   if(msg_tmp) {
645     done = 0;
646     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
647       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
648     return ((done)?1:0);
649   } else {
650     return 1;
651   }
652 }
653
654 void CmiReleaseCommHandle(CmiCommHandle c)
655 {
656   return;
657 }
658
659 #if CMK_BLUEGENEL
660 extern void MPID_Progress_test();
661 #endif
662
663 void CmiReleaseSentMessages(void)
664 {
665   SMSG_LIST *msg_tmp=sent_msgs;
666   SMSG_LIST *prev=0;
667   SMSG_LIST *temp;
668   int done;
669   MPI_Status sts;
670
671 #if CMK_BLUEGENEL
672   MPID_Progress_test();
673 #endif
674
675   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
676   while(msg_tmp!=0) {
677     done =0;
678 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
679     double startT = CmiWallTimer();
680 #endif
681     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
682       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
683     if(done) {
684       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
685       MsgQueueLen--;
686       /* Release the message */
687       temp = msg_tmp->next;
688       if(prev==0)  /* first message */
689         sent_msgs = temp;
690       else
691         prev->next = temp;
692       CmiFree(msg_tmp->msg);
693       CmiFree(msg_tmp);
694       msg_tmp = temp;
695     } else {
696       prev = msg_tmp;
697       msg_tmp = msg_tmp->next;
698     }
699 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
700     {
701     double endT = CmiWallTimer();
702     /* only record the event if it takes more than 1ms */
703     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
704     }
705 #endif
706   }
707   end_sent = prev;
708   MACHSTATE(2,"} CmiReleaseSentMessages end");
709 }
710
711 int PumpMsgs(void)
712 {
713   int nbytes, flg, res;
714   char *msg;
715   MPI_Status sts;
716   int recd=0;
717
718 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
719   int recvCnt=0;
720 #endif
721         
722 #if CMK_BLUEGENEL
723   MPID_Progress_test();
724 #endif
725
726   MACHSTATE(2,"PumpMsgs begin {");
727
728 #if CMI_DYNAMIC_EXERT_CAP
729   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
730 #endif
731         
732   while(1) {
733 #if CMI_EXERT_RECV_CAP
734         if(recvCnt==RECV_CAP) break;
735 #elif CMI_DYNAMIC_EXERT_CAP
736         if(recvCnt >= dynamicRecvCap) break;
737 #endif
738           
739     /* First check posted recvs then do  probe unmatched outstanding messages */
740 #if MPI_POST_RECV_COUNT > 0 
741     int completed_index=-1;
742     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
743         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
744     if(flg){
745         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
746             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
747
748         recd = 1;
749         msg = (char *) CmiAlloc(nbytes);
750         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
751         /* and repost the recv */
752
753         START_EVENT();
754
755         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
756             MPI_POST_RECV_SIZE,
757             MPI_BYTE,
758             MPI_ANY_SOURCE,
759             POST_RECV_TAG,
760             MPI_COMM_WORLD,
761             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
762                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
763
764         END_EVENT(50);
765
766         CpvAccess(Cmi_posted_recv_total)++;
767     }
768     else {
769         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
770         if(res != MPI_SUCCESS)
771         CmiAbort("MPI_Iprobe failed\n");
772         if(!flg) break;
773         recd = 1;
774         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
775         msg = (char *) CmiAlloc(nbytes);
776
777         START_EVENT();
778
779         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
780             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
781
782         END_EVENT(30);
783
784         CpvAccess(Cmi_unposted_recv_total)++;
785     }
786 #else
787     /* Original version */
788 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
789   double startT = CmiWallTimer(); 
790 #endif
791     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
792     if(res != MPI_SUCCESS)
793       CmiAbort("MPI_Iprobe failed\n");
794
795     if(!flg) break;
796 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
797     {
798     double endT = CmiWallTimer();
799     /* only trace the probe that last longer than 1ms */
800     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
801     }
802 #endif
803
804     recd = 1;
805     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
806     msg = (char *) CmiAlloc(nbytes);
807     
808     START_EVENT();
809
810     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
811       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
812
813     /*END_EVENT(30);*/
814
815 #endif
816
817 #if CMK_SMP_TRACE_COMMTHREAD
818         traceBeginCommOp(msg);
819         traceChangeLastTimestamp(CpvAccess(projTraceStart));
820         traceEndCommOp(msg);
821         #if CMI_MPI_TRACE_MOREDETAILED
822         char tmp[32];
823         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
824         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
825         #endif
826 #elif CMK_TRACE_COMMOVERHEAD
827         char tmp[32];
828         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
829         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
830 #endif
831         
832         
833     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
834     CMI_CHECK_CHECKSUM(msg, nbytes);
835 #if CMK_ERROR_CHECKING
836     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
837       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
838       CmiFree(msg);
839       CmiAbort("Abort!\n");
840       continue;
841     }
842 #endif
843         
844 #if CMK_BROADCAST_SPANNING_TREE
845     if (CMI_BROADCAST_ROOT(msg))
846       SendSpanningChildren(nbytes, msg);
847 #elif CMK_BROADCAST_HYPERCUBE
848     if (CMI_BROADCAST_ROOT(msg))
849       SendHypercube(nbytes, msg);
850 #endif
851         
852         /* In SMP mode, this push operation needs to be executed
853      * after forwarding broadcast messages. If it is executed
854      * earlier, then during the bcast msg forwarding period,    
855          * the msg could be already freed on the worker thread.
856          * As a result, the forwarded message could be wrong! 
857          * --Chao Mei
858          */
859 #if CMK_NODE_QUEUE_AVAILABLE
860     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
861       CmiPushNode(msg);
862     else
863 #endif
864         CmiPushPE(CMI_DEST_RANK(msg), msg);     
865         
866 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
867         recvCnt++;
868         /* check sendMsgBuf  to get the  number of messages that have not been sent */
869         /* MsgQueueLen indicates the number of messages that have not been released by MPI */
870         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD){
871                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
872         }
873 #endif  
874         
875   }
876
877   
878 #if CMK_IMMEDIATE_MSG && !CMK_SMP
879   CmiHandleImmediate();
880 #endif
881   
882   MACHSTATE(2,"} PumpMsgs end ");
883   return recd;
884 }
885
886 /* blocking version */
887 static void PumpMsgsBlocking(void)
888 {
889   static int maxbytes = 20000000;
890   static char *buf = NULL;
891   int nbytes, flg;
892   MPI_Status sts;
893   char *msg;
894   int recd=0;
895
896   if (!PCQueueEmpty(CmiGetState()->recv)) return;
897   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
898   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
899   if (sent_msgs)  return;
900
901 #if 0
902   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
903 #endif
904
905   if (buf == NULL) {
906     buf = (char *) CmiAlloc(maxbytes);
907     _MEMCHECK(buf);
908   }
909
910
911 #if MPI_POST_RECV_COUNT > 0
912 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
913 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
914 #endif
915
916   START_EVENT();
917
918   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
919       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
920
921   /*END_EVENT(30);*/
922     
923    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
924    msg = (char *) CmiAlloc(nbytes);
925    memcpy(msg, buf, nbytes);
926
927 #if CMK_SMP_TRACE_COMMTHREAD
928         traceBeginCommOp(msg);
929         traceChangeLastTimestamp(CpvAccess(projTraceStart));
930         traceEndCommOp(msg);
931         #if CMI_MPI_TRACE_MOREDETAILED
932         char tmp[32];
933         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
934         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
935         #endif
936 #endif
937
938 #if CMK_BROADCAST_SPANNING_TREE
939    if (CMI_BROADCAST_ROOT(msg))
940       SendSpanningChildren(nbytes, msg);
941 #elif CMK_BROADCAST_HYPERCUBE
942    if (CMI_BROADCAST_ROOT(msg))
943       SendHypercube(nbytes, msg);
944 #endif
945   
946         /* In SMP mode, this push operation needs to be executed
947      * after forwarding broadcast messages. If it is executed
948      * earlier, then during the bcast msg forwarding period,    
949          * the msg could be already freed on the worker thread.
950          * As a result, the forwarded message could be wrong! 
951          * --Chao Mei
952          */  
953 #if CMK_NODE_QUEUE_AVAILABLE
954    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
955       CmiPushNode(msg);
956    else
957 #endif
958       CmiPushPE(CMI_DEST_RANK(msg), msg);
959 }
960
961 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
962
963 #if CMK_SMP
964
965 static int inexit = 0;
966 static CmiNodeLock  exitLock = 0;
967
968 static int MsgQueueEmpty()
969 {
970   int i;
971 #if MULTI_SENDQUEUE
972   for (i=0; i<_Cmi_mynodesize; i++)
973     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
974 #else
975   return PCQueueEmpty(sendMsgBuf);
976 #endif
977   return 1;
978 }
979
980 static int SendMsgBuf();
981
982 /* test if all processors recv queues are empty */
983 static int RecvQueueEmpty()
984 {
985   int i;
986   for (i=0; i<_Cmi_mynodesize; i++) {
987     CmiState cs=CmiGetStateN(i);
988     if (!PCQueueEmpty(cs->recv)) return 0;
989   }
990   return 1;
991 }
992
993 /**
994 CommunicationServer calls MPI to send messages in the queues and probe message from network.
995 */
996
997 #define REPORT_COMM_METRICS 0
998 #if REPORT_COMM_METRICS
999 static double pumptime = 0.0;
1000 static double releasetime = 0.0;
1001 static double sendtime = 0.0;
1002 #endif
1003
1004 static void CommunicationServer(int sleepTime)
1005 {
1006   int static count=0;
1007 /*
1008   count ++;
1009   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1010 */
1011 #if REPORT_COMM_METRICS
1012   double t1, t2, t3, t4;
1013   t1 = CmiWallTimer();
1014 #endif
1015   PumpMsgs();
1016 #if REPORT_COMM_METRICS
1017   t2 = CmiWallTimer();
1018 #endif
1019   CmiReleaseSentMessages();
1020 #if REPORT_COMM_METRICS
1021   t3 = CmiWallTimer();
1022 #endif
1023   SendMsgBuf();
1024 #if REPORT_COMM_METRICS
1025   t4 = CmiWallTimer();
1026   pumptime += (t2-t1);
1027   releasetime += (t3-t2);
1028   sendtime += (t4-t3);
1029 #endif
1030 /*
1031   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1032 */
1033   if (inexit == CmiMyNodeSize()) {
1034     MACHSTATE(2, "CommunicationServer exiting {");
1035 #if 0
1036     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1037 #endif
1038     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1039       CmiReleaseSentMessages();
1040       SendMsgBuf();
1041       PumpMsgs();
1042     }
1043     MACHSTATE(2, "CommunicationServer barrier begin {");
1044
1045     START_EVENT();
1046
1047     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1048       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1049
1050     END_EVENT(10);
1051
1052     MACHSTATE(2, "} CommunicationServer barrier end");
1053 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1054     if (CmiMyNode() == 0){
1055       CmiPrintf("End of program\n");
1056     }
1057 #endif
1058     MACHSTATE(2, "} CommunicationServer EXIT");
1059
1060     ConverseCommonExit();   
1061 #if REPORT_COMM_METRICS
1062     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1063 #endif
1064
1065 #if ! CMK_AUTOBUILD
1066     signal(SIGINT, signal_int);
1067     MPI_Finalize();
1068     #endif
1069     exit(0);
1070   }
1071 }
1072
1073 #endif
1074
1075 static void CommunicationServerThread(int sleepTime)
1076 {
1077 #if CMK_SMP
1078   CommunicationServer(sleepTime);
1079 #endif
1080 #if CMK_IMMEDIATE_MSG
1081   CmiHandleImmediate();
1082 #endif
1083 }
1084
1085 #if CMK_NODE_QUEUE_AVAILABLE
1086 char *CmiGetNonLocalNodeQ(void)
1087 {
1088   CmiState cs = CmiGetState();
1089   char *result = 0;
1090   CmiIdleLock_checkMessage(&cs->idle);
1091 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1092     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1093     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1094     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1095     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1096     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1097 /*  }  */
1098
1099   return result;
1100 }
1101 #endif
1102
1103 void *CmiGetNonLocal(void)
1104 {
1105   static int count=0;
1106   CmiState cs = CmiGetState();
1107   void *msg;
1108
1109 #if ! CMK_SMP
1110   if (CmiNumPes() == 1) return NULL;
1111 #endif
1112
1113   CmiIdleLock_checkMessage(&cs->idle);
1114   /* although it seems that lock is not needed, I found it crashes very often
1115      on mpi-smp without lock */
1116
1117 #if ! CMK_SMP
1118   CmiReleaseSentMessages();
1119   PumpMsgs();
1120 #endif
1121
1122   /* CmiLock(procState[cs->rank].recvLock); */
1123   msg =  PCQueuePop(cs->recv);
1124   /* CmiUnlock(procState[cs->rank].recvLock); */
1125
1126 /*
1127   if (msg) {
1128     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1129   else {
1130     count++;
1131     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1132   }
1133 */
1134 #if ! CMK_SMP
1135   if (no_outstanding_sends) {
1136     while (MsgQueueLen>0) {
1137       CmiReleaseSentMessages();
1138       PumpMsgs();
1139     }
1140   }
1141
1142   if(!msg) {
1143     CmiReleaseSentMessages();
1144     if (PumpMsgs())
1145       return  PCQueuePop(cs->recv);
1146     else
1147       return 0;
1148   }
1149 #endif
1150
1151   return msg;
1152 }
1153
1154 /* called in non-smp mode */
1155 void CmiNotifyIdle(void)
1156 {
1157   CmiReleaseSentMessages();
1158   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1159 }
1160
1161
1162 /********************************************************
1163     The call to probe immediate messages has been renamed to
1164     CmiMachineProgressImpl
1165 ******************************************************/
1166 /* user call to handle immediate message, only useful in non SMP version
1167    using polling method to schedule message.
1168 */
1169 /*
1170 #if CMK_IMMEDIATE_MSG
1171 void CmiProbeImmediateMsg()
1172 {
1173 #if !CMK_SMP
1174   PumpMsgs();
1175   CmiHandleImmediate();
1176 #endif
1177 }
1178 #endif
1179 */
1180
1181 /* Network progress function is used to poll the network when for
1182    messages. This flushes receive buffers on some  implementations*/
1183 #if CMK_MACHINE_PROGRESS_DEFINED
1184 void CmiMachineProgressImpl()
1185 {
1186 #if !CMK_SMP
1187     PumpMsgs();
1188 #if CMK_IMMEDIATE_MSG
1189     CmiHandleImmediate();
1190 #endif
1191 #else
1192     /*Not implemented yet. Communication server does not seem to be
1193       thread safe, so only communication thread call it */
1194     if (CmiMyRank() == CmiMyNodeSize())
1195         CommunicationServerThread(0);
1196 #endif
1197 }
1198 #endif
1199
1200 /********************* MESSAGE SEND FUNCTIONS ******************/
1201
1202 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1203
1204 static void CmiSendSelf(char *msg)
1205 {
1206 #if CMK_IMMEDIATE_MSG
1207     if (CmiIsImmediate(msg)) {
1208       /* CmiBecomeNonImmediate(msg); */
1209       CmiPushImmediateMsg(msg);
1210       CmiHandleImmediate();
1211       return;
1212     }
1213 #endif
1214     CQdCreate(CpvAccess(cQdState), 1);
1215     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1216 }
1217
1218 void CmiSyncSendFn(int destPE, int size, char *msg)
1219 {
1220   CmiState cs = CmiGetState();
1221   char *dupmsg = (char *) CmiAlloc(size);
1222   memcpy(dupmsg, msg, size);
1223
1224   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1225
1226   if (cs->pe==destPE) {
1227     CmiSendSelf(dupmsg);
1228   }
1229   else
1230     CmiAsyncSendFn_(destPE, size, dupmsg);
1231 }
1232
1233 #if CMK_SMP
1234
1235 /* called by communication thread in SMP */
1236 static int SendMsgBuf()
1237 {
1238   SMSG_LIST *msg_tmp;
1239   char *msg;
1240   int node, rank, size;
1241   int i;
1242   int sent = 0;
1243
1244 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1245         int sentCnt = 0;
1246 #endif  
1247         
1248 #if CMI_DYNAMIC_EXERT_CAP
1249         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1250 #endif
1251         
1252   MACHSTATE(2,"SendMsgBuf begin {");
1253 #if MULTI_SENDQUEUE
1254   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1255   {
1256     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1257     {
1258       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1259 #else
1260     /* single message sending queue */
1261     /* CmiLock(sendMsgBufLock); */
1262     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1263     /* CmiUnlock(sendMsgBufLock); */
1264     while (NULL != msg_tmp)
1265     {
1266 #endif
1267       node = msg_tmp->destpe;
1268       size = msg_tmp->size;
1269       msg = msg_tmp->msg;
1270       msg_tmp->next = 0;
1271                 
1272 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1273       while (MsgQueueLen > request_max) {
1274                 CmiReleaseSentMessages();
1275                 PumpMsgs();
1276       }
1277 #endif
1278           
1279       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1280 #if CMK_ERROR_CHECKING
1281       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1282 #endif
1283       CMI_SET_CHECKSUM(msg, size);
1284
1285 #if MPI_POST_RECV_COUNT > 0
1286         if(size <= MPI_POST_RECV_SIZE){
1287
1288           START_EVENT();
1289           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1290                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1291
1292           STOP_EVENT(40);
1293         }
1294         else {
1295             START_EVENT();
1296             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1297                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1298             STOP_EVENT(40);
1299         }
1300 #else
1301         START_EVENT();
1302         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1303             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1304         /*END_EVENT(40);*/
1305 #endif
1306         
1307 #if CMK_SMP_TRACE_COMMTHREAD
1308         traceBeginCommOp(msg);
1309         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1310         /* traceSendMsgComm must execute after traceBeginCommOp because
1311          * we pretend we execute an entry method, and inside this we
1312          * pretend we will send another message. Otherwise how could
1313          * a message creation just before an entry method invocation?
1314          * If such logic is broken, the projections will not trace
1315          * messages correctly! -Chao Mei
1316          */
1317         traceSendMsgComm(msg);
1318         traceEndCommOp(msg);
1319         #if CMI_MPI_TRACE_MOREDETAILED
1320         char tmp[64];
1321         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1322         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1323         #endif
1324 #endif
1325                 
1326                 
1327       MACHSTATE(3,"}MPI_send end");
1328       MsgQueueLen++;
1329       if(sent_msgs==0)
1330         sent_msgs = msg_tmp;
1331       else
1332         end_sent->next = msg_tmp;
1333       end_sent = msg_tmp;
1334       sent=1;
1335           
1336 #if CMI_EXERT_SEND_CAP    
1337           if(++sentCnt == SEND_CAP) break;
1338 #elif CMI_DYNAMIC_EXERT_CAP
1339           if(++sentCnt >= dynamicSendCap) break;
1340           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1341                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1342 #endif    
1343           
1344 #if ! MULTI_SENDQUEUE
1345       /* CmiLock(sendMsgBufLock); */
1346       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1347       /* CmiUnlock(sendMsgBufLock); */
1348 #endif
1349     }
1350 #if MULTI_SENDQUEUE
1351   }
1352 #endif
1353   MACHSTATE(2,"}SendMsgBuf end ");
1354   return sent;
1355 }
1356
1357 void EnqueueMsg(void *m, int size, int node)
1358 {
1359   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1360   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1361   msg_tmp->msg = m;
1362   msg_tmp->size = size;
1363   msg_tmp->destpe = node;
1364         
1365 #if CMK_SMP_TRACE_COMMTHREAD
1366         msg_tmp->srcpe = CmiMyPe();
1367 #endif  
1368
1369 #if MULTI_SENDQUEUE
1370   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1371 #else
1372   CmiLock(sendMsgBufLock);
1373   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1374   CmiUnlock(sendMsgBufLock);
1375 #endif
1376         
1377   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1378 }
1379
1380 #endif
1381
1382 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1383 {
1384   CmiState cs = CmiGetState();
1385   SMSG_LIST *msg_tmp;
1386   CmiUInt2  rank, node;
1387
1388   if(destPE == cs->pe) {
1389     char *dupmsg = (char *) CmiAlloc(size);
1390     memcpy(dupmsg, msg, size);
1391     CmiSendSelf(dupmsg);
1392     return 0;
1393   }
1394   CQdCreate(CpvAccess(cQdState), 1);
1395 #if CMK_SMP
1396   node = CmiNodeOf(destPE);
1397   rank = CmiRankOf(destPE);
1398   if (node == CmiMyNode())  {
1399     CmiPushPE(rank, msg);
1400     return 0;
1401   }
1402   CMI_DEST_RANK(msg) = rank;
1403   EnqueueMsg(msg, size, node);
1404   return 0;
1405 #else
1406   /* non smp */
1407   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1408   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1409   msg_tmp->msg = msg;
1410   msg_tmp->next = 0;
1411   while (MsgQueueLen > request_max) {
1412         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1413         CmiReleaseSentMessages();
1414         PumpMsgs();
1415   }
1416 #if CMK_ERROR_CHECKING
1417   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1418 #endif
1419   CMI_SET_CHECKSUM(msg, size);
1420
1421 #if MPI_POST_RECV_COUNT > 0
1422         if(size <= MPI_POST_RECV_SIZE){
1423
1424           START_EVENT();
1425           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1426                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1427           END_EVENT(40);
1428         }
1429         else {
1430           START_EVENT();
1431           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1432                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1433           END_EVENT(40);
1434         }
1435 #else
1436   START_EVENT();
1437   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1438     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1439   /*END_EVENT(40);*/
1440   #if CMK_TRACE_COMMOVERHEAD
1441         char tmp[64];
1442         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1443         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1444   #endif
1445 #endif
1446
1447   MsgQueueLen++;
1448   if(sent_msgs==0)
1449     sent_msgs = msg_tmp;
1450   else
1451     end_sent->next = msg_tmp;
1452   end_sent = msg_tmp;
1453   return (CmiCommHandle) &(msg_tmp->req);
1454 #endif              /* non-smp */
1455 }
1456
1457 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1458 {
1459   CMI_SET_BROADCAST_ROOT(msg, 0);
1460   CmiAsyncSendFn_(destPE, size, msg);
1461 }
1462
1463 void CmiFreeSendFn(int destPE, int size, char *msg)
1464 {
1465   CmiState cs = CmiGetState();
1466   CMI_SET_BROADCAST_ROOT(msg, 0);
1467
1468   if (cs->pe==destPE) {
1469     CmiSendSelf(msg);
1470   } else {
1471     CmiAsyncSendFn_(destPE, size, msg);
1472   }
1473 }
1474
1475 /*********************** BROADCAST FUNCTIONS **********************/
1476
1477 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1478 void CmiSyncSendFn1(int destPE, int size, char *msg)
1479 {
1480   CmiState cs = CmiGetState();
1481   char *dupmsg = (char *) CmiAlloc(size);
1482   memcpy(dupmsg, msg, size);
1483   if (cs->pe==destPE)
1484     CmiSendSelf(dupmsg);
1485   else
1486     CmiAsyncSendFn_(destPE, size, dupmsg);
1487 }
1488
1489 /* send msg to its spanning children in broadcast. G. Zheng */
1490 void SendSpanningChildren(int size, char *msg)
1491 {
1492   CmiState cs = CmiGetState();
1493   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1494   int startnode = CmiNodeOf(startpe);
1495   int i, exceptRank;
1496         
1497    /* first send msgs to other nodes */
1498   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1499   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1500     int nd = CmiMyNode()-startnode;
1501     if (nd<0) nd+=CmiNumNodes();
1502     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1503     if (nd > CmiNumNodes() - 1) break;
1504     nd += startnode;
1505     nd = nd%CmiNumNodes();
1506     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1507         #if CMK_SMP
1508         /* always send to the first rank of other nodes */
1509         char *newmsg = CmiCopyMsg(msg, size);
1510         CMI_DEST_RANK(newmsg) = 0;
1511     EnqueueMsg(newmsg, size, nd);
1512         #else
1513         CmiSyncSendFn1(nd, size, msg);
1514         #endif
1515   }
1516 #if CMK_SMP  
1517    /* second send msgs to my peers on this node */
1518   /* FIXME: now it's just a flat p2p send!! When node size is large,
1519    * it should also be sent in a tree
1520    */
1521    exceptRank = CMI_DEST_RANK(msg);
1522    for(i=0; i<exceptRank; i++){
1523            CmiPushPE(i, CmiCopyMsg(msg, size));
1524    }
1525    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1526            CmiPushPE(i, CmiCopyMsg(msg, size));
1527    }
1528 #endif
1529 }
1530
1531 #include <math.h>
1532
1533 /* send msg along the hypercube in broadcast. (Sameer) */
1534 void SendHypercube(int size, char *msg)
1535 {
1536   CmiState cs = CmiGetState();
1537   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1538   int startnode = CmiNodeOf(startpe);
1539   int i, exceptRank, cnt, tmp, relPE;
1540   int dims=0;
1541
1542   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1543   tmp = CmiNumNodes()-1;
1544   while(tmp>0){
1545           dims++;
1546           tmp = tmp >> 1;
1547   }
1548   if(CmiNumNodes()==1) dims=1;
1549   
1550    /* first send msgs to other nodes */  
1551   relPE = CmiMyNode()-startnode;
1552   if(relPE < 0) relPE += CmiNumNodes();
1553   cnt=0;
1554   tmp = relPE;
1555   /* count how many zeros (in binary format) relPE has */
1556   for(i=0; i<dims; i++, cnt++){
1557     if(tmp & 1 == 1) break;
1558     tmp = tmp >> 1;
1559   }
1560   
1561   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1562   for (i = cnt-1; i >= 0; i--) {
1563     int nd = relPE + (1 << i);
1564         if(nd >= CmiNumNodes()) continue;
1565         nd = (nd+startnode)%CmiNumNodes();
1566         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1567 #if CMK_SMP
1568     /* always send to the first rank of other nodes */
1569     char *newmsg = CmiCopyMsg(msg, size);
1570     CMI_DEST_RANK(newmsg) = 0;
1571     EnqueueMsg(newmsg, size, nd);
1572 #else
1573         CmiSyncSendFn1(nd, size, msg);
1574 #endif
1575   }
1576   
1577 #if CMK_SMP
1578    /* second send msgs to my peers on this node */
1579    /* FIXME: now it's just a flat p2p send!! When node size is large,
1580     * it should also be sent in a tree
1581     */
1582    exceptRank = CMI_DEST_RANK(msg);
1583    for(i=0; i<exceptRank; i++){
1584            CmiPushPE(i, CmiCopyMsg(msg, size));
1585    }
1586    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1587            CmiPushPE(i, CmiCopyMsg(msg, size));
1588    }
1589 #endif
1590 }
1591
1592 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1593 {
1594   CmiState cs = CmiGetState();
1595
1596 #if CMK_SMP     
1597   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1598   CMI_DEST_RANK(msg) = CmiMyRank();
1599 #endif
1600         
1601 #if CMK_BROADCAST_SPANNING_TREE
1602   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1603   SendSpanningChildren(size, msg);
1604
1605 #elif CMK_BROADCAST_HYPERCUBE
1606   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1607   SendHypercube(size, msg);
1608
1609 #else
1610   int i;
1611
1612   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1613     CmiSyncSendFn(i, size,msg) ;
1614   for ( i=0; i<cs->pe; i++ )
1615     CmiSyncSendFn(i, size,msg) ;
1616 #endif
1617
1618   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1619 }
1620
1621
1622 /*  FIXME: luckily async is never used  G. Zheng */
1623 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1624 {
1625   CmiState cs = CmiGetState();
1626   int i ;
1627
1628   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1629     CmiAsyncSendFn(i,size,msg) ;
1630   for ( i=0; i<cs->pe; i++ )
1631     CmiAsyncSendFn(i,size,msg) ;
1632
1633   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1634   CmiAbort("CmiAsyncBroadcastFn should never be called");
1635   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1636 }
1637
1638 void CmiFreeBroadcastFn(int size, char *msg)
1639 {
1640    CmiSyncBroadcastFn(size,msg);
1641    CmiFree(msg);
1642 }
1643
1644 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1645 {
1646
1647 #if CMK_SMP     
1648   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1649   CMI_DEST_RANK(msg) = CmiMyRank();
1650 #endif
1651
1652 #if CMK_BROADCAST_SPANNING_TREE
1653   CmiState cs = CmiGetState();
1654   CmiSyncSendFn(cs->pe, size,msg) ;
1655   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1656   SendSpanningChildren(size, msg);
1657
1658 #elif CMK_BROADCAST_HYPERCUBE
1659   CmiState cs = CmiGetState();
1660   CmiSyncSendFn(cs->pe, size,msg) ;
1661   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1662   SendHypercube(size, msg);
1663
1664 #else
1665     int i ;
1666
1667   for ( i=0; i<_Cmi_numpes; i++ )
1668     CmiSyncSendFn(i,size,msg) ;
1669 #endif
1670
1671   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1672 }
1673
1674 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1675 {
1676   int i ;
1677
1678   for ( i=1; i<_Cmi_numpes; i++ )
1679     CmiAsyncSendFn(i,size,msg) ;
1680
1681   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1682
1683   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1684 }
1685
1686 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1687 {
1688 #if CMK_SMP     
1689   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1690   CMI_DEST_RANK(msg) = CmiMyRank();
1691 #endif
1692
1693 #if CMK_BROADCAST_SPANNING_TREE
1694   CmiState cs = CmiGetState();
1695   CmiSyncSendFn(cs->pe, size,msg) ;
1696   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1697   SendSpanningChildren(size, msg);
1698
1699 #elif CMK_BROADCAST_HYPERCUBE
1700   CmiState cs = CmiGetState();
1701   CmiSyncSendFn(cs->pe, size,msg) ;
1702   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1703   SendHypercube(size, msg);
1704
1705 #else
1706   int i ;
1707
1708   for ( i=0; i<_Cmi_numpes; i++ )
1709     CmiSyncSendFn(i,size,msg) ;
1710 #endif
1711   CmiFree(msg) ;
1712   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1713 }
1714
1715 #if CMK_NODE_QUEUE_AVAILABLE
1716
1717 static void CmiSendNodeSelf(char *msg)
1718 {
1719 #if CMK_IMMEDIATE_MSG
1720 #if 0
1721     if (CmiIsImmediate(msg) && !_immRunning) {
1722       /*CmiHandleImmediateMessage(msg); */
1723       CmiPushImmediateMsg(msg);
1724       CmiHandleImmediate();
1725       return;
1726     }
1727 #endif
1728     if (CmiIsImmediate(msg))
1729     {
1730       CmiPushImmediateMsg(msg);
1731       if (!_immRunning) CmiHandleImmediate();
1732       return;
1733     }
1734 #endif
1735     CQdCreate(CpvAccess(cQdState), 1);
1736     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1737     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1738     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1739 }
1740
1741 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1742 {
1743   int i;
1744   SMSG_LIST *msg_tmp;
1745   char *dupmsg;
1746
1747   CMI_SET_BROADCAST_ROOT(msg, 0);
1748   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1749   switch (dstNode) {
1750   case NODE_BROADCAST_ALL:
1751     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1752   case NODE_BROADCAST_OTHERS:
1753     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1754     for (i=0; i<_Cmi_numnodes; i++)
1755       if (i!=_Cmi_mynode) {
1756         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1757       }
1758     break;
1759   default:
1760     dupmsg = (char *)CmiCopyMsg(msg,size);
1761     if(dstNode == _Cmi_mynode) {
1762       CmiSendNodeSelf(dupmsg);
1763     }
1764     else {
1765       CQdCreate(CpvAccess(cQdState), 1);
1766       EnqueueMsg(dupmsg, size, dstNode);
1767     }
1768   }
1769   return 0;
1770 }
1771
1772 void CmiSyncNodeSendFn(int p, int s, char *m)
1773 {
1774   CmiAsyncNodeSendFn(p, s, m);
1775 }
1776
1777 /* need */
1778 void CmiFreeNodeSendFn(int p, int s, char *m)
1779 {
1780   CmiAsyncNodeSendFn(p, s, m);
1781   CmiFree(m);
1782 }
1783
1784 /* need */
1785 void CmiSyncNodeBroadcastFn(int s, char *m)
1786 {
1787   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1788 }
1789
1790 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1791 {
1792 }
1793
1794 /* need */
1795 void CmiFreeNodeBroadcastFn(int s, char *m)
1796 {
1797   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1798   CmiFree(m);
1799 }
1800
1801 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1802 {
1803   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1804 }
1805
1806 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1807 {
1808   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1809 }
1810
1811 /* need */
1812 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1813 {
1814   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1815   CmiFree(m);
1816 }
1817 #endif
1818
1819 /************************** MAIN ***********************************/
1820 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1821
1822 void ConverseExit(void)
1823 {
1824 #if ! CMK_SMP
1825   while(!CmiAllAsyncMsgsSent()) {
1826     PumpMsgs();
1827     CmiReleaseSentMessages();
1828   }
1829   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1830     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1831
1832   ConverseCommonExit();
1833 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1834   if (CmiMyPe() == 0){
1835     CmiPrintf("End of program\n");
1836 #if MPI_POST_RECV_COUNT > 0
1837     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1838 #endif
1839 }
1840 #endif
1841 #if ! CMK_AUTOBUILD
1842   signal(SIGINT, signal_int);
1843   MPI_Finalize();
1844 #endif
1845   exit(0);
1846
1847 #else
1848     /* SMP version, communication thread will exit */
1849   ConverseCommonExit();
1850   /* atomic increment */
1851   CmiLock(exitLock);
1852   inexit++;
1853   CmiUnlock(exitLock);
1854   while (1) CmiYield();
1855 #endif
1856 }
1857
1858 static void registerMPITraceEvents() {
1859 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1860     traceRegisterUserEvent("MPI_Barrier", 10);
1861     traceRegisterUserEvent("MPI_Send", 20);
1862     traceRegisterUserEvent("MPI_Recv", 30);
1863     traceRegisterUserEvent("MPI_Isend", 40);
1864     traceRegisterUserEvent("MPI_Irecv", 50);
1865     traceRegisterUserEvent("MPI_Test", 60);
1866     traceRegisterUserEvent("MPI_Iprobe", 70);
1867 #endif
1868 }
1869
1870
1871 static char     **Cmi_argv;
1872 static char     **Cmi_argvcopy;
1873 static CmiStartFn Cmi_startfn;   /* The start function */
1874 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1875
1876 typedef struct {
1877   int sleepMs; /*Milliseconds to sleep while idle*/
1878   int nIdles; /*Number of times we've been idle in a row*/
1879   CmiState cs; /*Machine state*/
1880 } CmiIdleState;
1881
1882 static CmiIdleState *CmiNotifyGetState(void)
1883 {
1884   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1885   s->sleepMs=0;
1886   s->nIdles=0;
1887   s->cs=CmiGetState();
1888   return s;
1889 }
1890
1891 static void CmiNotifyBeginIdle(CmiIdleState *s)
1892 {
1893   s->sleepMs=0;
1894   s->nIdles=0;
1895 }
1896
1897 static void CmiNotifyStillIdle(CmiIdleState *s)
1898 {
1899 #if ! CMK_SMP
1900   CmiReleaseSentMessages();
1901   PumpMsgs();
1902 #else
1903 /*  CmiYield();  */
1904 #endif
1905
1906 #if 1
1907   {
1908   int nSpins=20; /*Number of times to spin before sleeping*/
1909   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1910   s->nIdles++;
1911   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1912     s->sleepMs+=2;
1913     if (s->sleepMs>10) s->sleepMs=10;
1914   }
1915   /*Comm. thread will listen on sockets-- just sleep*/
1916   if (s->sleepMs>0) {
1917     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1918     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1919     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1920   }
1921   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1922   }
1923 #endif
1924 }
1925
1926 #if MACHINE_DEBUG_LOG
1927 FILE *debugLog = NULL;
1928 #endif
1929
1930 static int machine_exit_idx;
1931 static void machine_exit(char *m) {
1932   EmergencyExit();
1933   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1934   fflush(stdout);
1935   CmiNodeBarrier();
1936   if (CmiMyRank() == 0) {
1937     MPI_Barrier(MPI_COMM_WORLD);
1938     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1939     MPI_Abort(MPI_COMM_WORLD, 1);
1940   } else {
1941     while (1) CmiYield();
1942   }
1943 }
1944
1945 static void KillOnAllSigs(int sigNo) {
1946   static int already_in_signal_handler = 0;
1947   char *m;
1948   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1949   already_in_signal_handler = 1;
1950 #if CMK_CCS_AVAILABLE
1951   if (CpvAccess(cmiArgDebugFlag)) {
1952     CpdNotify(CPD_SIGNAL, sigNo);
1953     CpdFreeze();
1954   }
1955 #endif
1956   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1957       "Signal: %d\n",CmiMyPe(),sigNo);
1958   CmiPrintStackTrace(1);
1959
1960   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1961   CmiSetHandler(m, machine_exit_idx);
1962   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1963   machine_exit(m);
1964 }
1965
1966 static void ConverseRunPE(int everReturn)
1967 {
1968   CmiIdleState *s=CmiNotifyGetState();
1969   CmiState cs;
1970   char** CmiMyArgv;
1971
1972   CmiNodeAllBarrier();
1973
1974   cs = CmiGetState();
1975   CpvInitialize(void *,CmiLocalQueue);
1976   CpvAccess(CmiLocalQueue) = cs->localqueue;
1977
1978   if (CmiMyRank())
1979     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1980   else
1981     CmiMyArgv=Cmi_argv;
1982
1983   CthInit(CmiMyArgv);
1984
1985   ConverseCommonInit(CmiMyArgv);
1986   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1987
1988 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1989   CpvInitialize(double, projTraceStart);
1990   /* only PE 0 needs to care about registration (to generate sts file). */
1991   if (CmiMyPe() == 0) {
1992     registerMachineUserEventsFunction(&registerMPITraceEvents);
1993   }
1994 #endif
1995
1996   /* initialize the network progress counter*/
1997   /* Network progress function is used to poll the network when for
1998      messages. This flushes receive buffers on some  implementations*/
1999   CpvInitialize(int , networkProgressCount);
2000   CpvAccess(networkProgressCount) = 0;
2001
2002 #if CMK_SMP
2003   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2004   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2005 #else
2006   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2007 #endif
2008
2009 #if MACHINE_DEBUG_LOG
2010   if (CmiMyRank() == 0) {
2011     char ln[200];
2012     sprintf(ln,"debugLog.%d",CmiMyNode());
2013     debugLog=fopen(ln,"w");
2014   }
2015 #endif
2016
2017   /* Converse initialization finishes, immediate messages can be processed.
2018      node barrier previously should take care of the node synchronization */
2019   _immediateReady = 1;
2020
2021   /* communication thread */
2022   if (CmiMyRank() == CmiMyNodeSize()) {
2023     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2024     while (1) CommunicationServerThread(5);
2025   }
2026   else {  /* worker thread */
2027   if (!everReturn) {
2028     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2029     if (Cmi_usrsched==0) CsdScheduler(-1);
2030     ConverseExit();
2031   }
2032   }
2033 }
2034
2035 static char *thread_level_tostring(int thread_level)
2036 {
2037 #if CMK_MPI_INIT_THREAD
2038   switch (thread_level) {
2039   case MPI_THREAD_SINGLE:
2040       return "MPI_THREAD_SINGLE";
2041   case MPI_THREAD_FUNNELED:
2042       return "MPI_THREAD_FUNNELED";
2043   case MPI_THREAD_SERIALIZED:
2044       return "MPI_THREAD_SERIALIZED";
2045   case MPI_THREAD_MULTIPLE :
2046       return "MPI_THREAD_MULTIPLE ";
2047   default: {
2048       char *str = (char*)malloc(5);
2049       sprintf(str,"%d", thread_level);
2050       return str;
2051       }
2052   }
2053   return  "unknown";
2054 #else
2055   char *str = (char*)malloc(5);
2056   sprintf(str,"%d", thread_level);
2057   return str;
2058 #endif
2059 }
2060
2061 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2062 {
2063   int n,i;
2064   int ver, subver;
2065   int provided;
2066   int thread_level;
2067
2068 #if MACHINE_DEBUG
2069   debugLog=NULL;
2070 #endif
2071 #if CMK_USE_HP_MAIN_FIX
2072 #if FOR_CPLUS
2073   _main(argc,argv);
2074 #endif
2075 #endif
2076
2077 #if CMK_MPI_INIT_THREAD
2078 #if CMK_SMP
2079   thread_level = MPI_THREAD_FUNNELED;
2080 #else
2081   thread_level = MPI_THREAD_SINGLE;
2082 #endif
2083   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2084   _thread_provided = provided;
2085 #else
2086   MPI_Init(&argc, &argv);
2087   thread_level = 0;
2088   provided = -1;
2089 #endif
2090   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2091   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2092
2093   MPI_Get_version(&ver, &subver);
2094   if (_Cmi_mynode == 0) {
2095     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2096   }
2097
2098   /* processor per node */
2099   _Cmi_mynodesize = 1;
2100   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2101     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2102 #if ! CMK_SMP
2103   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2104     CmiAbort("+ppn cannot be used in non SMP version!\n");
2105 #endif
2106   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2107   if (idleblock && _Cmi_mynode == 0) {
2108     printf("Charm++: Running in idle blocking mode.\n");
2109   }
2110
2111   /* setup signal handlers */
2112   signal(SIGSEGV, KillOnAllSigs);
2113   signal(SIGFPE, KillOnAllSigs);
2114   signal(SIGILL, KillOnAllSigs);
2115   signal_int = signal(SIGINT, KillOnAllSigs);
2116   signal(SIGTERM, KillOnAllSigs);
2117   signal(SIGABRT, KillOnAllSigs);
2118 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2119   signal(SIGQUIT, KillOnAllSigs);
2120   signal(SIGBUS, KillOnAllSigs);
2121 /*#     if CMK_HANDLE_SIGUSR
2122   signal(SIGUSR1, HandleUserSignals);
2123   signal(SIGUSR2, HandleUserSignals);
2124 #     endif*/
2125 #   endif /*UNIX*/
2126   
2127 #if CMK_NO_OUTSTANDING_SENDS
2128   no_outstanding_sends=1;
2129 #endif
2130   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2131     no_outstanding_sends = 1;
2132     if (_Cmi_mynode == 0)
2133       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2134         no_outstanding_sends?"":" not");
2135   }
2136   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2137   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2138   Cmi_argvcopy = CmiCopyArgs(argv);
2139   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2140   /* find dim = log2(numpes), to pretend we are a hypercube */
2141   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2142     Cmi_dim++ ;
2143  /* CmiSpanTreeInit();*/
2144   request_max=MAX_QLEN;
2145   CmiGetArgInt(argv,"+requestmax",&request_max);
2146   /*printf("request max=%d\n", request_max);*/
2147
2148   /* checksum flag */
2149   if (CmiGetArgFlag(argv,"+checksum")) {
2150 #if !CMK_OPTIMIZE
2151     checksum_flag = 1;
2152     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2153 #else
2154     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2155 #endif
2156   }
2157
2158   {
2159   int debug = CmiGetArgFlag(argv,"++debug");
2160   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2161   if (debug || debug_no_pause)
2162   {   /*Pause so user has a chance to start and attach debugger*/
2163 #if CMK_HAS_GETPID
2164     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2165     fflush(stdout);
2166     if (!debug_no_pause)
2167       sleep(15);
2168 #else
2169     printf("++debug ignored.\n");
2170 #endif
2171   }
2172   }
2173
2174 #if MPI_POST_RECV_COUNT > 0
2175
2176   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2177   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2178   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2179   CpvInitialize(char*,CmiPostedRecvBuffers);
2180
2181     /* Post some extra recvs to help out with incoming messages */
2182     /* On some MPIs the messages are unexpected and thus slow */
2183
2184     /* An array of request handles for posted recvs */
2185     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2186
2187     /* An array of buffers for posted recvs */
2188     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2189
2190     /* Post Recvs */
2191     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2192         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2193                     MPI_POST_RECV_SIZE,
2194                     MPI_BYTE,
2195                     MPI_ANY_SOURCE,
2196                     POST_RECV_TAG,
2197                     MPI_COMM_WORLD,
2198                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2199           CmiAbort("MPI_Irecv failed\n");
2200     }
2201
2202 #endif
2203
2204
2205
2206   /* CmiTimerInit(); */
2207
2208 #if 0
2209   CthInit(argv);
2210   ConverseCommonInit(argv);
2211
2212   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2213   if (initret==0) {
2214     fn(CmiGetArgc(argv), argv);
2215     if (usched==0) CsdScheduler(-1);
2216     ConverseExit();
2217   }
2218 #endif
2219
2220   CsvInitialize(CmiNodeState, NodeState);
2221   CmiNodeStateInit(&CsvAccess(NodeState));
2222
2223   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2224
2225   for (i=0; i<_Cmi_mynodesize+1; i++) {
2226 #if MULTI_SENDQUEUE
2227     procState[i].sendMsgBuf = PCQueueCreate();
2228 #endif
2229     procState[i].recvLock = CmiCreateLock();
2230   }
2231 #if CMK_SMP
2232 #if !MULTI_SENDQUEUE
2233   sendMsgBuf = PCQueueCreate();
2234   sendMsgBufLock = CmiCreateLock();
2235 #endif
2236   exitLock = CmiCreateLock();            /* exit count lock */
2237 #endif
2238
2239   /* Network progress function is used to poll the network when for
2240      messages. This flushes receive buffers on some  implementations*/
2241   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2242   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2243
2244   CmiStartThreads(argv);
2245   ConverseRunPE(initret);
2246 }
2247
2248 /***********************************************************************
2249  *
2250  * Abort function:
2251  *
2252  ************************************************************************/
2253
2254 void CmiAbort(const char *message)
2255 {
2256   char *m;
2257   /* if CharmDebug is attached simply try to send a message to it */
2258 #if CMK_CCS_AVAILABLE
2259   if (CpvAccess(cmiArgDebugFlag)) {
2260     CpdNotify(CPD_ABORT, message);
2261     CpdFreeze();
2262   }
2263 #endif  
2264   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2265         "Reason: %s\n",CmiMyPe(),message);
2266  /*  CmiError(message); */
2267   CmiPrintStackTrace(0);
2268   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2269   CmiSetHandler(m, machine_exit_idx);
2270   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2271   machine_exit(m);
2272   /* Program never reaches here */
2273   MPI_Abort(MPI_COMM_WORLD, 1);
2274 }
2275
2276
2277 #if 0
2278
2279 /* ****************************************************************** */
2280 /*    The following internal functions implement recd msg queue       */
2281 /* ****************************************************************** */
2282
2283 static void ** AllocBlock(unsigned int len)
2284 {
2285   void ** blk;
2286
2287   blk=(void **)CmiAlloc(len*sizeof(void *));
2288   if(blk==(void **)0) {
2289     CmiError("Cannot Allocate Memory!\n");
2290     MPI_Abort(MPI_COMM_WORLD, 1);
2291   }
2292   return blk;
2293 }
2294
2295 static void
2296 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2297 {
2298   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2299   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2300 }
2301
2302 void recdQueueInit(void)
2303 {
2304   recdQueue_blk = AllocBlock(BLK_LEN);
2305   recdQueue_blk_len = BLK_LEN;
2306   recdQueue_first = 0;
2307   recdQueue_len = 0;
2308 }
2309
2310 void recdQueueAddToBack(void *element)
2311 {
2312 #if NODE_0_IS_CONVHOST
2313   inside_comm = 1;
2314 #endif
2315   if(recdQueue_len==recdQueue_blk_len) {
2316     void **blk;
2317     recdQueue_blk_len *= 3;
2318     blk = AllocBlock(recdQueue_blk_len);
2319     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2320     CmiFree(recdQueue_blk);
2321     recdQueue_blk = blk;
2322     recdQueue_first = 0;
2323   }
2324   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2325 #if NODE_0_IS_CONVHOST
2326   inside_comm = 0;
2327 #endif
2328 }
2329
2330
2331 void * recdQueueRemoveFromFront(void)
2332 {
2333   if(recdQueue_len) {
2334     void *element;
2335     element = recdQueue_blk[recdQueue_first++];
2336     recdQueue_first %= recdQueue_blk_len;
2337     recdQueue_len--;
2338     return element;
2339   }
2340   return 0;
2341 }
2342
2343 #endif
2344
2345 /*@}*/