a new option +useAbsoluteTime time to converse timing module to force it to use...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #define CMK_TRACE_COMMOVERHEAD 0
66 #if CMK_TRACE_COMMOVERHEAD
67 #undef CMI_MPI_TRACE_USEREVENTS
68 #define CMI_MPI_TRACE_USEREVENTS 1
69 #endif
70
71 #include "machine.h"
72
73 #include "pcqueue.h"
74
75 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
76
77 #if CMK_BLUEGENEL
78 #define MAX_QLEN 8
79 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
80 #else
81 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
82 #define MAX_QLEN 200
83 #endif
84
85 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
86 CpvStaticDeclare(double, projTraceStart);
87 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
88 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
89 #else
90 # define  START_EVENT()
91 # define  END_EVENT(x)
92 #endif
93
94 /*
95     To reduce the buffer used in broadcast and distribute the load from
96   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
97   spanning tree broadcast algorithm.
98     This will use the fourth short in message as an indicator of spanning tree
99   root.
100 */
101 #define CMK_BROADCAST_SPANNING_TREE    1
102 #define CMK_BROADCAST_HYPERCUBE        0
103
104 #define BROADCAST_SPANNING_FACTOR      4
105 /* The number of children used when a msg is broadcast inside a node */
106 #define BROADCAST_SPANNING_INTRA_FACTOR      8
107
108 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
109 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
110
111 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
112 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
113
114 /* FIXME: need a random number that everyone agrees ! */
115 #define CHARM_MAGIC_NUMBER               126
116
117 #if CMK_ERROR_CHECKING
118 static int checksum_flag = 0;
119 #define CMI_SET_CHECKSUM(msg, len)      \
120         if (checksum_flag)  {   \
121           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
122           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
123         }
124 #define CMI_CHECK_CHECKSUM(msg, len)    \
125         if (checksum_flag)      \
126           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
127             CmiAbort("Fatal error: checksum doesn't agree!\n");
128 #else
129 #define CMI_SET_CHECKSUM(msg, len)
130 #define CMI_CHECK_CHECKSUM(msg, len)
131 #endif
132
133 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
134 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
135 #else
136 #define CMI_SET_BROADCAST_ROOT(msg, root) 
137 #endif
138
139
140 /** 
141     If MPI_POST_RECV is defined, we provide default values for size 
142     and number of posted recieves. If MPI_POST_RECV_COUNT is set
143     then a default value for MPI_POST_RECV_SIZE is used if not specified
144     by the user.
145 */
146 #ifdef MPI_POST_RECV
147 #define MPI_POST_RECV_COUNT 10
148 #undef MPI_POST_RECV
149 #endif
150 #if MPI_POST_RECV_COUNT > 0
151 #warning "Using MPI posted receives which have not yet been tested"
152 #ifndef MPI_POST_RECV_SIZE
153 #define MPI_POST_RECV_SIZE 200
154 #endif
155 /* #undef  MPI_POST_RECV_DEBUG  */
156 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
157 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
158 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
159 CpvDeclare(char*,CmiPostedRecvBuffers);
160 #endif
161
162 /*
163  to avoid MPI's in order delivery, changing MPI Tag all the time
164 */
165 #define TAG     1375
166
167 #if MPI_POST_RECV_COUNT > 0
168 #define POST_RECV_TAG TAG+1
169 #define BARRIER_ZERO_TAG TAG
170 #else
171 #define BARRIER_ZERO_TAG     1375
172 #endif
173
174 #include <signal.h>
175 void (*signal_int)(int);
176
177 /*
178 static int mpi_tag = TAG;
179 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
180 */
181
182 static int        _thread_provided = -1;
183 int               _Cmi_numpes;
184 int               _Cmi_mynode;    /* Which address space am I */
185 int               _Cmi_mynodesize;/* Number of processors in my address space */
186 int               _Cmi_numnodes;  /* Total number of address spaces */
187 int               _Cmi_numpes;    /* Total number of processors */
188 static int        Cmi_nodestart; /* First processor in this address space */
189 CpvDeclare(void*, CmiLocalQueue);
190
191 /*Network progress utility variables. Period controls the rate at
192   which the network poll is called */
193 CpvDeclare(unsigned , networkProgressCount);
194 int networkProgressPeriod;
195
196 int               idleblock = 0;
197
198 #define BLK_LEN  512
199
200 #if CMK_NODE_QUEUE_AVAILABLE
201 #define DGRAM_NODEMESSAGE   (0xFB)
202
203 #define NODE_BROADCAST_OTHERS (-1)
204 #define NODE_BROADCAST_ALL    (-2)
205 #endif
206
207 #if 0
208 static void **recdQueue_blk;
209 static unsigned int recdQueue_blk_len;
210 static unsigned int recdQueue_first;
211 static unsigned int recdQueue_len;
212 static void recdQueueInit(void);
213 static void recdQueueAddToBack(void *element);
214 static void *recdQueueRemoveFromFront(void);
215 #endif
216
217 static void ConverseRunPE(int everReturn);
218 static void CommunicationServer(int sleepTime);
219 static void CommunicationServerThread(int sleepTime);
220
221 typedef struct msg_list {
222      char *msg;
223      struct msg_list *next;
224      int size, destpe;
225
226 #if CMK_SMP_TRACE_COMMTHREAD
227         int srcpe;
228 #endif  
229         
230      MPI_Request req;
231 } SMSG_LIST;
232
233 int MsgQueueLen=0;
234 static int request_max;
235
236 static SMSG_LIST *sent_msgs=0;
237 static SMSG_LIST *end_sent=0;
238
239 static int Cmi_dim;
240
241 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
242
243 #if NODE_0_IS_CONVHOST
244 int inside_comm = 0;
245 #endif
246
247 void CmiAbort(const char *message);
248 static void PerrorExit(const char *msg);
249
250 void SendSpanningChildren(int size, char *msg);
251 void SendHypercube(int size, char *msg);
252
253 static void PerrorExit(const char *msg)
254 {
255   perror(msg);
256   exit(1);
257 }
258
259 extern unsigned char computeCheckSum(unsigned char *data, int len);
260
261 /**************************  TIMER FUNCTIONS **************************/
262
263 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
264
265 /* MPI calls are not threadsafe, even the timer on some machines */
266 static CmiNodeLock  timerLock = 0;
267 static int _absoluteTime = 0;
268 static double starttimer = 0;
269 static int _is_global = 0;
270
271 int CmiTimerIsSynchronized()
272 {
273   int  flag;
274   void *v;
275
276   /*  check if it using synchronized timer */
277   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
278     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
279   if (flag) {
280     _is_global = *(int*)v;
281     if (_is_global && CmiMyPe() == 0)
282       printf("Charm++> MPI timer is synchronized!\n");
283   }
284   return _is_global;
285 }
286
287 int CmiTimerAbsolute()
288 {       
289   return _absoluteTime;
290 }
291
292 double CmiStartTimer()
293 {
294   return 0.0;
295 }
296
297 double CmiInitTime()
298 {
299   return starttimer;
300 }
301
302 void CmiTimerInit(char **argv)
303 {
304   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
305
306   _is_global = CmiTimerIsSynchronized();
307
308   if (_is_global) {
309     if (CmiMyRank() == 0) {
310       double minTimer;
311 #if CMK_TIMER_USE_XT3_DCLOCK
312       starttimer = dclock();
313 #else
314       starttimer = MPI_Wtime();
315 #endif
316
317       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
318                                   MPI_COMM_WORLD );
319       starttimer = minTimer;
320     }
321   }
322   else {  /* we don't have a synchronous timer, set our own start time */
323     CmiBarrier();
324     CmiBarrier();
325     CmiBarrier();
326 #if CMK_TIMER_USE_XT3_DCLOCK
327     starttimer = dclock();
328 #else
329     starttimer = MPI_Wtime();
330 #endif
331   }
332
333 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
334   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
335     timerLock = CmiCreateLock();
336 #endif
337   CmiNodeAllBarrier();          /* for smp */
338 }
339
340 /**
341  * Since the timerLock is never created, and is
342  * always NULL, then all the if-condition inside
343  * the timer functions could be disabled right
344  * now in the case of SMP. --Chao Mei
345  */
346 double CmiTimer(void)
347 {
348   double t;
349 #if 0 && CMK_SMP
350   if (timerLock) CmiLock(timerLock);
351 #endif
352
353 #if CMK_TIMER_USE_XT3_DCLOCK
354   t = dclock();
355 #else
356   t = MPI_Wtime();
357 #endif
358
359 #if 0 && CMK_SMP
360   if (timerLock) CmiUnlock(timerLock);
361 #endif
362
363   return _absoluteTime?t: (t-starttimer);
364 }
365
366 double CmiWallTimer(void)
367 {
368   double t;
369 #if 0 && CMK_SMP
370   if (timerLock) CmiLock(timerLock);
371 #endif
372
373 #if CMK_TIMER_USE_XT3_DCLOCK
374   t = dclock();
375 #else
376   t = MPI_Wtime();
377 #endif
378
379 #if 0 && CMK_SMP
380   if (timerLock) CmiUnlock(timerLock);
381 #endif
382
383   return _absoluteTime? t: (t-starttimer);
384 }
385
386 double CmiCpuTimer(void)
387 {
388   double t;
389 #if 0 && CMK_SMP
390   if (timerLock) CmiLock(timerLock);
391 #endif
392 #if CMK_TIMER_USE_XT3_DCLOCK
393   t = dclock() - starttimer;
394 #else
395   t = MPI_Wtime() - starttimer;
396 #endif
397 #if 0 && CMK_SMP
398   if (timerLock) CmiUnlock(timerLock);
399 #endif
400   return t;
401 }
402
403 #endif
404
405 /* must be called on all ranks including comm thread in SMP */
406 int CmiBarrier()
407 {
408 #if CMK_SMP
409     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
410   CmiNodeAllBarrier();
411   if (CmiMyRank() == CmiMyNodeSize()) 
412 #else
413   if (CmiMyRank() == 0) 
414 #endif
415   {
416 /**
417  *  The call of CmiBarrier is usually before the initialization
418  *  of trace module of Charm++, therefore, the START_EVENT
419  *  and END_EVENT are disabled here. -Chao Mei
420  */     
421     /*START_EVENT();*/
422
423     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
424         CmiAbort("Timernit: MPI_Barrier failed!\n");
425
426     /*END_EVENT(10);*/
427   }
428   CmiNodeAllBarrier();
429   return 0;
430 }
431
432 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
433 int CmiBarrierZero()
434 {
435   int i;
436 #if CMK_SMP
437   if (CmiMyRank() == CmiMyNodeSize()) 
438 #else
439   if (CmiMyRank() == 0) 
440 #endif
441   {
442     char msg[1];
443     MPI_Status sts;
444     if (CmiMyNode() == 0)  {
445       for (i=0; i<CmiNumNodes()-1; i++) {
446          START_EVENT();
447
448          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
449             CmiPrintf("MPI_Recv failed!\n");
450
451          END_EVENT(30);
452       }
453     }
454     else {
455       START_EVENT();
456
457       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
458          printf("MPI_Send failed!\n");
459
460       END_EVENT(20);
461     }
462   }
463   CmiNodeAllBarrier();
464   return 0;
465 }
466
467 typedef struct ProcState {
468 #if MULTI_SENDQUEUE
469 PCQueue      sendMsgBuf;       /* per processor message sending queue */
470 #endif
471 CmiNodeLock  recvLock;              /* for cs->recv */
472 } ProcState;
473
474 static ProcState  *procState;
475
476 #if CMK_SMP
477
478 #if !MULTI_SENDQUEUE
479 static PCQueue sendMsgBuf;
480 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
481 #endif
482
483 #endif
484
485 /************************************************************
486  *
487  * Processor state structure
488  *
489  ************************************************************/
490
491 /* fake Cmi_charmrun_fd */
492 static int Cmi_charmrun_fd = 0;
493 #include "machine-smp.c"
494
495 CsvDeclare(CmiNodeState, NodeState);
496
497 #include "immediate.c"
498
499 #if CMK_SHARED_VARS_UNAVAILABLE
500 /************ non SMP **************/
501 static struct CmiStateStruct Cmi_state;
502 int _Cmi_mype;
503 int _Cmi_myrank;
504
505 void CmiMemLock() {}
506 void CmiMemUnlock() {}
507
508 #define CmiGetState() (&Cmi_state)
509 #define CmiGetStateN(n) (&Cmi_state)
510
511 void CmiYield(void) { sleep(0); }
512
513 static void CmiStartThreads(char **argv)
514 {
515   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
516   _Cmi_mype = Cmi_nodestart;
517   _Cmi_myrank = 0;
518 }
519 #endif  /* non smp */
520
521 /*Add a message to this processor's receive queue, pe is a rank */
522 void CmiPushPE(int pe,void *msg)
523 {
524   CmiState cs = CmiGetStateN(pe);
525   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
526 #if CMK_IMMEDIATE_MSG
527   if (CmiIsImmediate(msg)) {
528 /*
529 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
530     CmiHandleMessage(msg);
531 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
532 */
533     /**(CmiUInt2 *)msg = pe;*/
534     CMI_DEST_RANK(msg) = pe;
535     CmiPushImmediateMsg(msg);
536     return;
537   }
538 #endif
539
540 #if CMK_SMP
541   CmiLock(procState[pe].recvLock);
542 #endif
543   PCQueuePush(cs->recv,msg);
544 #if CMK_SMP
545   CmiUnlock(procState[pe].recvLock);
546 #endif
547   CmiIdleLock_addMessage(&cs->idle);
548   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
549 }
550
551 #if CMK_NODE_QUEUE_AVAILABLE
552 /*Add a message to this processor's receive queue */
553 static void CmiPushNode(void *msg)
554 {
555   MACHSTATE(3,"Pushing message into NodeRecv queue");
556 #if CMK_IMMEDIATE_MSG
557   if (CmiIsImmediate(msg)) {
558     CMI_DEST_RANK(msg) = 0;
559     CmiPushImmediateMsg(msg);
560     return;
561   }
562 #endif
563   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
564   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
565   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
566   {
567   CmiState cs=CmiGetStateN(0);
568   CmiIdleLock_addMessage(&cs->idle);
569   }
570 }
571 #endif
572
573 #ifndef CmiMyPe
574 int CmiMyPe(void)
575 {
576   return CmiGetState()->pe;
577 }
578 #endif
579
580 #ifndef CmiMyRank
581 int CmiMyRank(void)
582 {
583   return CmiGetState()->rank;
584 }
585 #endif
586
587 #ifndef CmiNodeFirst
588 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
589 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
590 #endif
591
592 #ifndef CmiNodeOf
593 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
594 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
595 #endif
596
597 static size_t CmiAllAsyncMsgsSent(void)
598 {
599    SMSG_LIST *msg_tmp = sent_msgs;
600    MPI_Status sts;
601    int done;
602
603    while(msg_tmp!=0) {
604     done = 0;
605     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
606       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
607     if(!done)
608       return 0;
609     msg_tmp = msg_tmp->next;
610 /*    MsgQueueLen--; ????? */
611    }
612    return 1;
613 }
614
615 int CmiAsyncMsgSent(CmiCommHandle c) {
616
617   SMSG_LIST *msg_tmp = sent_msgs;
618   int done;
619   MPI_Status sts;
620
621   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
622     msg_tmp = msg_tmp->next;
623   if(msg_tmp) {
624     done = 0;
625     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
626       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
627     return ((done)?1:0);
628   } else {
629     return 1;
630   }
631 }
632
633 void CmiReleaseCommHandle(CmiCommHandle c)
634 {
635   return;
636 }
637
638 #if CMK_BLUEGENEL
639 extern void MPID_Progress_test();
640 #endif
641
642 void CmiReleaseSentMessages(void)
643 {
644   SMSG_LIST *msg_tmp=sent_msgs;
645   SMSG_LIST *prev=0;
646   SMSG_LIST *temp;
647   int done;
648   MPI_Status sts;
649
650 #if CMK_BLUEGENEL
651   MPID_Progress_test();
652 #endif
653
654   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
655   while(msg_tmp!=0) {
656     done =0;
657 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
658     double startT = CmiWallTimer();
659 #endif
660     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
661       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
662     if(done) {
663       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
664       MsgQueueLen--;
665       /* Release the message */
666       temp = msg_tmp->next;
667       if(prev==0)  /* first message */
668         sent_msgs = temp;
669       else
670         prev->next = temp;
671       CmiFree(msg_tmp->msg);
672       CmiFree(msg_tmp);
673       msg_tmp = temp;
674     } else {
675       prev = msg_tmp;
676       msg_tmp = msg_tmp->next;
677     }
678 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
679     {
680     double endT = CmiWallTimer();
681     /* only record the event if it takes more than 1ms */
682     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
683     }
684 #endif
685   }
686   end_sent = prev;
687   MACHSTATE(2,"} CmiReleaseSentMessages end");
688 }
689
690 int PumpMsgs(void)
691 {
692   int nbytes, flg, res;
693   char *msg;
694   MPI_Status sts;
695   int recd=0;
696
697 #if CMI_EXERT_RECV_CAP
698   int recvCnt=0;
699 #endif
700         
701 #if CMK_BLUEGENEL
702   MPID_Progress_test();
703 #endif
704
705   MACHSTATE(2,"PumpMsgs begin {");
706
707         
708   while(1) {
709 #if CMI_EXERT_RECV_CAP
710         if(recvCnt==RECV_CAP) break;
711 #endif
712           
713     /* First check posted recvs then do  probe unmatched outstanding messages */
714 #if MPI_POST_RECV_COUNT > 0 
715     int completed_index=-1;
716     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
717         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
718     if(flg){
719         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
720             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
721
722         recd = 1;
723         msg = (char *) CmiAlloc(nbytes);
724         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
725         /* and repost the recv */
726
727         START_EVENT();
728
729         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
730             MPI_POST_RECV_SIZE,
731             MPI_BYTE,
732             MPI_ANY_SOURCE,
733             POST_RECV_TAG,
734             MPI_COMM_WORLD,
735             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
736                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
737
738         END_EVENT(50);
739
740         CpvAccess(Cmi_posted_recv_total)++;
741     }
742     else {
743         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
744         if(res != MPI_SUCCESS)
745         CmiAbort("MPI_Iprobe failed\n");
746         if(!flg) break;
747         recd = 1;
748         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
749         msg = (char *) CmiAlloc(nbytes);
750
751         START_EVENT();
752
753         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
754             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
755
756         END_EVENT(30);
757
758         CpvAccess(Cmi_unposted_recv_total)++;
759     }
760 #else
761     /* Original version */
762 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
763   double startT = CmiWallTimer(); 
764 #endif
765     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
766     if(res != MPI_SUCCESS)
767       CmiAbort("MPI_Iprobe failed\n");
768
769     if(!flg) break;
770 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
771     {
772     double endT = CmiWallTimer();
773     /* only trace the probe that last longer than 1ms */
774     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
775     }
776 #endif
777
778     recd = 1;
779     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
780     msg = (char *) CmiAlloc(nbytes);
781     
782     START_EVENT();
783
784     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
785       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
786
787     /*END_EVENT(30);*/
788
789 #endif
790
791 #if CMK_SMP_TRACE_COMMTHREAD
792         traceBeginCommOp(msg);
793         traceChangeLastTimestamp(CpvAccess(projTraceStart));
794         traceEndCommOp(msg);
795         #if CMI_MPI_TRACE_MOREDETAILED
796         char tmp[32];
797         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
798         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
799         #endif
800 #elif CMK_TRACE_COMMOVERHEAD
801         char tmp[32];
802         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
803         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
804 #endif
805         
806         
807     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
808     CMI_CHECK_CHECKSUM(msg, nbytes);
809 #if CMK_ERROR_CHECKING
810     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
811       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
812       CmiFree(msg);
813       CmiAbort("Abort!\n");
814       continue;
815     }
816 #endif
817         
818 #if CMK_BROADCAST_SPANNING_TREE
819     if (CMI_BROADCAST_ROOT(msg))
820       SendSpanningChildren(nbytes, msg);
821 #elif CMK_BROADCAST_HYPERCUBE
822     if (CMI_BROADCAST_ROOT(msg))
823       SendHypercube(nbytes, msg);
824 #endif
825         
826         /* In SMP mode, this push operation needs to be executed
827      * after forwarding broadcast messages. If it is executed
828      * earlier, then during the bcast msg forwarding period,    
829          * the msg could be already freed on the worker thread.
830          * As a result, the forwarded message could be wrong! 
831          * --Chao Mei
832          */
833 #if CMK_NODE_QUEUE_AVAILABLE
834     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
835       CmiPushNode(msg);
836     else
837 #endif
838         CmiPushPE(CMI_DEST_RANK(msg), msg);     
839         
840 #if CMI_EXERT_RECV_CAP
841         recvCnt++;
842 #endif  
843   }
844
845   
846 #if CMK_IMMEDIATE_MSG && !CMK_SMP
847   CmiHandleImmediate();
848 #endif
849   
850   MACHSTATE(2,"} PumpMsgs end ");
851   return recd;
852 }
853
854 /* blocking version */
855 static void PumpMsgsBlocking(void)
856 {
857   static int maxbytes = 20000000;
858   static char *buf = NULL;
859   int nbytes, flg;
860   MPI_Status sts;
861   char *msg;
862   int recd=0;
863
864   if (!PCQueueEmpty(CmiGetState()->recv)) return;
865   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
866   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
867   if (sent_msgs)  return;
868
869 #if 0
870   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
871 #endif
872
873   if (buf == NULL) {
874     buf = (char *) CmiAlloc(maxbytes);
875     _MEMCHECK(buf);
876   }
877
878
879 #if MPI_POST_RECV_COUNT > 0
880 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
881 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
882 #endif
883
884   START_EVENT();
885
886   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
887       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
888
889   /*END_EVENT(30);*/
890     
891    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
892    msg = (char *) CmiAlloc(nbytes);
893    memcpy(msg, buf, nbytes);
894
895 #if CMK_SMP_TRACE_COMMTHREAD
896         traceBeginCommOp(msg);
897         traceChangeLastTimestamp(CpvAccess(projTraceStart));
898         traceEndCommOp(msg);
899         #if CMI_MPI_TRACE_MOREDETAILED
900         char tmp[32];
901         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
902         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
903         #endif
904 #endif
905
906 #if CMK_BROADCAST_SPANNING_TREE
907    if (CMI_BROADCAST_ROOT(msg))
908       SendSpanningChildren(nbytes, msg);
909 #elif CMK_BROADCAST_HYPERCUBE
910    if (CMI_BROADCAST_ROOT(msg))
911       SendHypercube(nbytes, msg);
912 #endif
913   
914         /* In SMP mode, this push operation needs to be executed
915      * after forwarding broadcast messages. If it is executed
916      * earlier, then during the bcast msg forwarding period,    
917          * the msg could be already freed on the worker thread.
918          * As a result, the forwarded message could be wrong! 
919          * --Chao Mei
920          */  
921 #if CMK_NODE_QUEUE_AVAILABLE
922    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
923       CmiPushNode(msg);
924    else
925 #endif
926       CmiPushPE(CMI_DEST_RANK(msg), msg);
927 }
928
929 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
930
931 #if CMK_SMP
932
933 static int inexit = 0;
934 static CmiNodeLock  exitLock = 0;
935
936 static int MsgQueueEmpty()
937 {
938   int i;
939 #if MULTI_SENDQUEUE
940   for (i=0; i<_Cmi_mynodesize; i++)
941     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
942 #else
943   return PCQueueEmpty(sendMsgBuf);
944 #endif
945   return 1;
946 }
947
948 static int SendMsgBuf();
949
950 /* test if all processors recv queues are empty */
951 static int RecvQueueEmpty()
952 {
953   int i;
954   for (i=0; i<_Cmi_mynodesize; i++) {
955     CmiState cs=CmiGetStateN(i);
956     if (!PCQueueEmpty(cs->recv)) return 0;
957   }
958   return 1;
959 }
960
961 /**
962 CommunicationServer calls MPI to send messages in the queues and probe message from network.
963 */
964
965 #define REPORT_COMM_METRICS 0
966 #if REPORT_COMM_METRICS
967 static double pumptime = 0.0;
968 static double releasetime = 0.0;
969 static double sendtime = 0.0;
970 #endif
971
972 static void CommunicationServer(int sleepTime)
973 {
974   int static count=0;
975 /*
976   count ++;
977   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
978 */
979 #if REPORT_COMM_METRICS
980   double t1, t2, t3, t4;
981   t1 = CmiWallTimer();
982 #endif
983   PumpMsgs();
984 #if REPORT_COMM_METRICS
985   t2 = CmiWallTimer();
986 #endif
987   CmiReleaseSentMessages();
988 #if REPORT_COMM_METRICS
989   t3 = CmiWallTimer();
990 #endif
991   SendMsgBuf();
992 #if REPORT_COMM_METRICS
993   t4 = CmiWallTimer();
994   pumptime += (t2-t1);
995   releasetime += (t3-t2);
996   sendtime += (t4-t3);
997 #endif
998 /*
999   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1000 */
1001   if (inexit == CmiMyNodeSize()) {
1002     MACHSTATE(2, "CommunicationServer exiting {");
1003 #if 0
1004     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1005 #endif
1006     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1007       CmiReleaseSentMessages();
1008       SendMsgBuf();
1009       PumpMsgs();
1010     }
1011     MACHSTATE(2, "CommunicationServer barrier begin {");
1012
1013     START_EVENT();
1014
1015     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1016       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1017
1018     END_EVENT(10);
1019
1020     MACHSTATE(2, "} CommunicationServer barrier end");
1021 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1022     if (CmiMyNode() == 0){
1023       CmiPrintf("End of program\n");
1024     }
1025 #endif
1026     MACHSTATE(2, "} CommunicationServer EXIT");
1027
1028     ConverseCommonExit();   
1029 #if REPORT_COMM_METRICS
1030     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1031 #endif
1032
1033 #if ! CMK_AUTOBUILD
1034     signal(SIGINT, signal_int);
1035     MPI_Finalize();
1036     #endif
1037     exit(0);
1038   }
1039 }
1040
1041 #endif
1042
1043 static void CommunicationServerThread(int sleepTime)
1044 {
1045 #if CMK_SMP
1046   CommunicationServer(sleepTime);
1047 #endif
1048 #if CMK_IMMEDIATE_MSG
1049   CmiHandleImmediate();
1050 #endif
1051 }
1052
1053 #if CMK_NODE_QUEUE_AVAILABLE
1054 char *CmiGetNonLocalNodeQ(void)
1055 {
1056   CmiState cs = CmiGetState();
1057   char *result = 0;
1058   CmiIdleLock_checkMessage(&cs->idle);
1059 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1060     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1061     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1062     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1063     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1064     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1065 /*  }  */
1066
1067   return result;
1068 }
1069 #endif
1070
1071 void *CmiGetNonLocal(void)
1072 {
1073   static int count=0;
1074   CmiState cs = CmiGetState();
1075   void *msg;
1076
1077 #if ! CMK_SMP
1078   if (CmiNumPes() == 1) return NULL;
1079 #endif
1080
1081   CmiIdleLock_checkMessage(&cs->idle);
1082   /* although it seems that lock is not needed, I found it crashes very often
1083      on mpi-smp without lock */
1084
1085 #if ! CMK_SMP
1086   CmiReleaseSentMessages();
1087   PumpMsgs();
1088 #endif
1089
1090   /* CmiLock(procState[cs->rank].recvLock); */
1091   msg =  PCQueuePop(cs->recv);
1092   /* CmiUnlock(procState[cs->rank].recvLock); */
1093
1094 /*
1095   if (msg) {
1096     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1097   else {
1098     count++;
1099     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1100   }
1101 */
1102 #if ! CMK_SMP
1103   if (no_outstanding_sends) {
1104     while (MsgQueueLen>0) {
1105       CmiReleaseSentMessages();
1106       PumpMsgs();
1107     }
1108   }
1109
1110   if(!msg) {
1111     CmiReleaseSentMessages();
1112     if (PumpMsgs())
1113       return  PCQueuePop(cs->recv);
1114     else
1115       return 0;
1116   }
1117 #endif
1118
1119   return msg;
1120 }
1121
1122 /* called in non-smp mode */
1123 void CmiNotifyIdle(void)
1124 {
1125   CmiReleaseSentMessages();
1126   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1127 }
1128
1129
1130 /********************************************************
1131     The call to probe immediate messages has been renamed to
1132     CmiMachineProgressImpl
1133 ******************************************************/
1134 /* user call to handle immediate message, only useful in non SMP version
1135    using polling method to schedule message.
1136 */
1137 /*
1138 #if CMK_IMMEDIATE_MSG
1139 void CmiProbeImmediateMsg()
1140 {
1141 #if !CMK_SMP
1142   PumpMsgs();
1143   CmiHandleImmediate();
1144 #endif
1145 }
1146 #endif
1147 */
1148
1149 /* Network progress function is used to poll the network when for
1150    messages. This flushes receive buffers on some  implementations*/
1151 #if CMK_MACHINE_PROGRESS_DEFINED
1152 void CmiMachineProgressImpl()
1153 {
1154 #if !CMK_SMP
1155     PumpMsgs();
1156 #if CMK_IMMEDIATE_MSG
1157     CmiHandleImmediate();
1158 #endif
1159 #else
1160     /*Not implemented yet. Communication server does not seem to be
1161       thread safe, so only communication thread call it */
1162     if (CmiMyRank() == CmiMyNodeSize())
1163         CommunicationServerThread(0);
1164 #endif
1165 }
1166 #endif
1167
1168 /********************* MESSAGE SEND FUNCTIONS ******************/
1169
1170 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1171
1172 static void CmiSendSelf(char *msg)
1173 {
1174 #if CMK_IMMEDIATE_MSG
1175     if (CmiIsImmediate(msg)) {
1176       /* CmiBecomeNonImmediate(msg); */
1177       CmiPushImmediateMsg(msg);
1178       CmiHandleImmediate();
1179       return;
1180     }
1181 #endif
1182     CQdCreate(CpvAccess(cQdState), 1);
1183     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1184 }
1185
1186 void CmiSyncSendFn(int destPE, int size, char *msg)
1187 {
1188   CmiState cs = CmiGetState();
1189   char *dupmsg = (char *) CmiAlloc(size);
1190   memcpy(dupmsg, msg, size);
1191
1192   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1193
1194   if (cs->pe==destPE) {
1195     CmiSendSelf(dupmsg);
1196   }
1197   else
1198     CmiAsyncSendFn_(destPE, size, dupmsg);
1199 }
1200
1201 #if CMK_SMP
1202
1203 /* called by communication thread in SMP */
1204 static int SendMsgBuf()
1205 {
1206   SMSG_LIST *msg_tmp;
1207   char *msg;
1208   int node, rank, size;
1209   int i;
1210   int sent = 0;
1211
1212 #if CMI_EXERT_SEND_CAP
1213         int sentCnt = 0;
1214 #endif  
1215         
1216   MACHSTATE(2,"SendMsgBuf begin {");
1217 #if MULTI_SENDQUEUE
1218   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1219   {
1220     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1221     {
1222       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1223 #else
1224     /* single message sending queue */
1225     /* CmiLock(sendMsgBufLock); */
1226     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1227     /* CmiUnlock(sendMsgBufLock); */
1228     while (NULL != msg_tmp)
1229     {
1230 #endif
1231       node = msg_tmp->destpe;
1232       size = msg_tmp->size;
1233       msg = msg_tmp->msg;
1234       msg_tmp->next = 0;
1235       while (MsgQueueLen > request_max) {
1236         CmiReleaseSentMessages();
1237         PumpMsgs();
1238       }
1239       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1240 #if CMK_ERROR_CHECKING
1241       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1242 #endif
1243       CMI_SET_CHECKSUM(msg, size);
1244
1245 #if MPI_POST_RECV_COUNT > 0
1246         if(size <= MPI_POST_RECV_SIZE){
1247
1248           START_EVENT();
1249           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1250                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1251
1252           STOP_EVENT(40);
1253         }
1254         else {
1255             START_EVENT();
1256             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1257                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1258             STOP_EVENT(40);
1259         }
1260 #else
1261         START_EVENT();
1262         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1263             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1264         /*END_EVENT(40);*/
1265 #endif
1266         
1267 #if CMK_SMP_TRACE_COMMTHREAD
1268         traceBeginCommOp(msg);
1269         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1270         /* traceSendMsgComm must execute after traceBeginCommOp because
1271          * we pretend we execute an entry method, and inside this we
1272          * pretend we will send another message. Otherwise how could
1273          * a message creation just before an entry method invocation?
1274          * If such logic is broken, the projections will not trace
1275          * messages correctly! -Chao Mei
1276          */
1277         traceSendMsgComm(msg);
1278         traceEndCommOp(msg);
1279         #if CMI_MPI_TRACE_MOREDETAILED
1280         char tmp[64];
1281         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1282         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1283         #endif
1284 #endif
1285                 
1286                 
1287       MACHSTATE(3,"}MPI_send end");
1288       MsgQueueLen++;
1289       if(sent_msgs==0)
1290         sent_msgs = msg_tmp;
1291       else
1292         end_sent->next = msg_tmp;
1293       end_sent = msg_tmp;
1294       sent=1;
1295           
1296 #if CMI_EXERT_SEND_CAP    
1297           if(++sentCnt == SEND_CAP) break;
1298 #endif    
1299           
1300 #if ! MULTI_SENDQUEUE
1301       /* CmiLock(sendMsgBufLock); */
1302       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1303       /* CmiUnlock(sendMsgBufLock); */
1304 #endif
1305     }
1306 #if MULTI_SENDQUEUE
1307   }
1308 #endif
1309   MACHSTATE(2,"}SendMsgBuf end ");
1310   return sent;
1311 }
1312
1313 void EnqueueMsg(void *m, int size, int node)
1314 {
1315   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1316   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1317   msg_tmp->msg = m;
1318   msg_tmp->size = size;
1319   msg_tmp->destpe = node;
1320         
1321 #if CMK_SMP_TRACE_COMMTHREAD
1322         msg_tmp->srcpe = CmiMyPe();
1323 #endif  
1324
1325 #if MULTI_SENDQUEUE
1326   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1327 #else
1328   CmiLock(sendMsgBufLock);
1329   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1330   CmiUnlock(sendMsgBufLock);
1331 #endif
1332         
1333   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1334 }
1335
1336 #endif
1337
1338 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1339 {
1340   CmiState cs = CmiGetState();
1341   SMSG_LIST *msg_tmp;
1342   CmiUInt2  rank, node;
1343
1344   if(destPE == cs->pe) {
1345     char *dupmsg = (char *) CmiAlloc(size);
1346     memcpy(dupmsg, msg, size);
1347     CmiSendSelf(dupmsg);
1348     return 0;
1349   }
1350   CQdCreate(CpvAccess(cQdState), 1);
1351 #if CMK_SMP
1352   node = CmiNodeOf(destPE);
1353   rank = CmiRankOf(destPE);
1354   if (node == CmiMyNode())  {
1355     CmiPushPE(rank, msg);
1356     return 0;
1357   }
1358   CMI_DEST_RANK(msg) = rank;
1359   EnqueueMsg(msg, size, node);
1360   return 0;
1361 #else
1362   /* non smp */
1363   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1364   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1365   msg_tmp->msg = msg;
1366   msg_tmp->next = 0;
1367   while (MsgQueueLen > request_max) {
1368         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1369         CmiReleaseSentMessages();
1370         PumpMsgs();
1371   }
1372 #if CMK_ERROR_CHECKING
1373   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1374 #endif
1375   CMI_SET_CHECKSUM(msg, size);
1376
1377 #if MPI_POST_RECV_COUNT > 0
1378         if(size <= MPI_POST_RECV_SIZE){
1379
1380           START_EVENT();
1381           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1382                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1383           END_EVENT(40);
1384         }
1385         else {
1386           START_EVENT();
1387           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1388                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1389           END_EVENT(40);
1390         }
1391 #else
1392   START_EVENT();
1393   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1394     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1395   /*END_EVENT(40);*/
1396   #if CMK_TRACE_COMMOVERHEAD
1397         char tmp[64];
1398         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1399         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1400   #endif
1401 #endif
1402
1403   MsgQueueLen++;
1404   if(sent_msgs==0)
1405     sent_msgs = msg_tmp;
1406   else
1407     end_sent->next = msg_tmp;
1408   end_sent = msg_tmp;
1409   return (CmiCommHandle) &(msg_tmp->req);
1410 #endif              /* non-smp */
1411 }
1412
1413 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1414 {
1415   CMI_SET_BROADCAST_ROOT(msg, 0);
1416   CmiAsyncSendFn_(destPE, size, msg);
1417 }
1418
1419 void CmiFreeSendFn(int destPE, int size, char *msg)
1420 {
1421   CmiState cs = CmiGetState();
1422   CMI_SET_BROADCAST_ROOT(msg, 0);
1423
1424   if (cs->pe==destPE) {
1425     CmiSendSelf(msg);
1426   } else {
1427     CmiAsyncSendFn_(destPE, size, msg);
1428   }
1429 }
1430
1431 /*********************** BROADCAST FUNCTIONS **********************/
1432
1433 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1434 void CmiSyncSendFn1(int destPE, int size, char *msg)
1435 {
1436   CmiState cs = CmiGetState();
1437   char *dupmsg = (char *) CmiAlloc(size);
1438   memcpy(dupmsg, msg, size);
1439   if (cs->pe==destPE)
1440     CmiSendSelf(dupmsg);
1441   else
1442     CmiAsyncSendFn_(destPE, size, dupmsg);
1443 }
1444
1445 /* send msg to its spanning children in broadcast. G. Zheng */
1446 void SendSpanningChildren(int size, char *msg)
1447 {
1448   CmiState cs = CmiGetState();
1449   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1450   int startnode = CmiNodeOf(startpe);
1451   int i, exceptRank;
1452         
1453    /* first send msgs to other nodes */
1454   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1455   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1456     int nd = CmiMyNode()-startnode;
1457     if (nd<0) nd+=CmiNumNodes();
1458     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1459     if (nd > CmiNumNodes() - 1) break;
1460     nd += startnode;
1461     nd = nd%CmiNumNodes();
1462     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1463         #if CMK_SMP
1464         /* always send to the first rank of other nodes */
1465         char *newmsg = CmiCopyMsg(msg, size);
1466         CMI_DEST_RANK(newmsg) = 0;
1467     EnqueueMsg(newmsg, size, nd);
1468         #else
1469         CmiSyncSendFn1(nd, size, msg);
1470         #endif
1471   }
1472 #if CMK_SMP  
1473    /* second send msgs to my peers on this node */
1474   /* FIXME: now it's just a flat p2p send!! When node size is large,
1475    * it should also be sent in a tree
1476    */
1477    exceptRank = CMI_DEST_RANK(msg);
1478    for(i=0; i<exceptRank; i++){
1479            CmiPushPE(i, CmiCopyMsg(msg, size));
1480    }
1481    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1482            CmiPushPE(i, CmiCopyMsg(msg, size));
1483    }
1484 #endif
1485 }
1486
1487 #include <math.h>
1488
1489 /* send msg along the hypercube in broadcast. (Sameer) */
1490 void SendHypercube(int size, char *msg)
1491 {
1492   CmiState cs = CmiGetState();
1493   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1494   int startnode = CmiNodeOf(startpe);
1495   int i, exceptRank, cnt, tmp, relPE;
1496   int dims=0;
1497
1498   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1499   tmp = CmiNumNodes()-1;
1500   while(tmp>0){
1501           dims++;
1502           tmp = tmp >> 1;
1503   }
1504   if(CmiNumNodes()==1) dims=1;
1505   
1506    /* first send msgs to other nodes */  
1507   relPE = CmiMyNode()-startnode;
1508   if(relPE < 0) relPE += CmiNumNodes();
1509   cnt=0;
1510   tmp = relPE;
1511   /* count how many zeros (in binary format) relPE has */
1512   for(i=0; i<dims; i++, cnt++){
1513     if(tmp & 1 == 1) break;
1514     tmp = tmp >> 1;
1515   }
1516   
1517   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1518   for (i = cnt-1; i >= 0; i--) {
1519     int nd = relPE + (1 << i);
1520         if(nd >= CmiNumNodes()) continue;
1521         nd = (nd+startnode)%CmiNumNodes();
1522         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1523 #if CMK_SMP
1524     /* always send to the first rank of other nodes */
1525     char *newmsg = CmiCopyMsg(msg, size);
1526     CMI_DEST_RANK(newmsg) = 0;
1527     EnqueueMsg(newmsg, size, nd);
1528 #else
1529         CmiSyncSendFn1(nd, size, msg);
1530 #endif
1531   }
1532   
1533 #if CMK_SMP
1534    /* second send msgs to my peers on this node */
1535    /* FIXME: now it's just a flat p2p send!! When node size is large,
1536     * it should also be sent in a tree
1537     */
1538    exceptRank = CMI_DEST_RANK(msg);
1539    for(i=0; i<exceptRank; i++){
1540            CmiPushPE(i, CmiCopyMsg(msg, size));
1541    }
1542    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1543            CmiPushPE(i, CmiCopyMsg(msg, size));
1544    }
1545 #endif
1546 }
1547
1548 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1549 {
1550   CmiState cs = CmiGetState();
1551
1552 #if CMK_SMP     
1553   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1554   CMI_DEST_RANK(msg) = CmiMyRank();
1555 #endif
1556         
1557 #if CMK_BROADCAST_SPANNING_TREE
1558   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1559   SendSpanningChildren(size, msg);
1560
1561 #elif CMK_BROADCAST_HYPERCUBE
1562   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1563   SendHypercube(size, msg);
1564
1565 #else
1566   int i;
1567
1568   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1569     CmiSyncSendFn(i, size,msg) ;
1570   for ( i=0; i<cs->pe; i++ )
1571     CmiSyncSendFn(i, size,msg) ;
1572 #endif
1573
1574   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1575 }
1576
1577
1578 /*  FIXME: luckily async is never used  G. Zheng */
1579 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1580 {
1581   CmiState cs = CmiGetState();
1582   int i ;
1583
1584   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1585     CmiAsyncSendFn(i,size,msg) ;
1586   for ( i=0; i<cs->pe; i++ )
1587     CmiAsyncSendFn(i,size,msg) ;
1588
1589   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1590   CmiAbort("CmiAsyncBroadcastFn should never be called");
1591   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1592 }
1593
1594 void CmiFreeBroadcastFn(int size, char *msg)
1595 {
1596    CmiSyncBroadcastFn(size,msg);
1597    CmiFree(msg);
1598 }
1599
1600 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1601 {
1602
1603 #if CMK_SMP     
1604   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1605   CMI_DEST_RANK(msg) = CmiMyRank();
1606 #endif
1607
1608 #if CMK_BROADCAST_SPANNING_TREE
1609   CmiState cs = CmiGetState();
1610   CmiSyncSendFn(cs->pe, size,msg) ;
1611   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1612   SendSpanningChildren(size, msg);
1613
1614 #elif CMK_BROADCAST_HYPERCUBE
1615   CmiState cs = CmiGetState();
1616   CmiSyncSendFn(cs->pe, size,msg) ;
1617   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1618   SendHypercube(size, msg);
1619
1620 #else
1621     int i ;
1622
1623   for ( i=0; i<_Cmi_numpes; i++ )
1624     CmiSyncSendFn(i,size,msg) ;
1625 #endif
1626
1627   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1628 }
1629
1630 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1631 {
1632   int i ;
1633
1634   for ( i=1; i<_Cmi_numpes; i++ )
1635     CmiAsyncSendFn(i,size,msg) ;
1636
1637   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1638
1639   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1640 }
1641
1642 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1643 {
1644 #if CMK_SMP     
1645   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1646   CMI_DEST_RANK(msg) = CmiMyRank();
1647 #endif
1648
1649 #if CMK_BROADCAST_SPANNING_TREE
1650   CmiState cs = CmiGetState();
1651   CmiSyncSendFn(cs->pe, size,msg) ;
1652   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1653   SendSpanningChildren(size, msg);
1654
1655 #elif CMK_BROADCAST_HYPERCUBE
1656   CmiState cs = CmiGetState();
1657   CmiSyncSendFn(cs->pe, size,msg) ;
1658   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1659   SendHypercube(size, msg);
1660
1661 #else
1662   int i ;
1663
1664   for ( i=0; i<_Cmi_numpes; i++ )
1665     CmiSyncSendFn(i,size,msg) ;
1666 #endif
1667   CmiFree(msg) ;
1668   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1669 }
1670
1671 #if CMK_NODE_QUEUE_AVAILABLE
1672
1673 static void CmiSendNodeSelf(char *msg)
1674 {
1675 #if CMK_IMMEDIATE_MSG
1676 #if 0
1677     if (CmiIsImmediate(msg) && !_immRunning) {
1678       /*CmiHandleImmediateMessage(msg); */
1679       CmiPushImmediateMsg(msg);
1680       CmiHandleImmediate();
1681       return;
1682     }
1683 #endif
1684     if (CmiIsImmediate(msg))
1685     {
1686       CmiPushImmediateMsg(msg);
1687       if (!_immRunning) CmiHandleImmediate();
1688       return;
1689     }
1690 #endif
1691     CQdCreate(CpvAccess(cQdState), 1);
1692     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1693     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1694     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1695 }
1696
1697 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1698 {
1699   int i;
1700   SMSG_LIST *msg_tmp;
1701   char *dupmsg;
1702
1703   CMI_SET_BROADCAST_ROOT(msg, 0);
1704   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1705   switch (dstNode) {
1706   case NODE_BROADCAST_ALL:
1707     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1708   case NODE_BROADCAST_OTHERS:
1709     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1710     for (i=0; i<_Cmi_numnodes; i++)
1711       if (i!=_Cmi_mynode) {
1712         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1713       }
1714     break;
1715   default:
1716     dupmsg = (char *)CmiCopyMsg(msg,size);
1717     if(dstNode == _Cmi_mynode) {
1718       CmiSendNodeSelf(dupmsg);
1719     }
1720     else {
1721       CQdCreate(CpvAccess(cQdState), 1);
1722       EnqueueMsg(dupmsg, size, dstNode);
1723     }
1724   }
1725   return 0;
1726 }
1727
1728 void CmiSyncNodeSendFn(int p, int s, char *m)
1729 {
1730   CmiAsyncNodeSendFn(p, s, m);
1731 }
1732
1733 /* need */
1734 void CmiFreeNodeSendFn(int p, int s, char *m)
1735 {
1736   CmiAsyncNodeSendFn(p, s, m);
1737   CmiFree(m);
1738 }
1739
1740 /* need */
1741 void CmiSyncNodeBroadcastFn(int s, char *m)
1742 {
1743   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1744 }
1745
1746 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1747 {
1748 }
1749
1750 /* need */
1751 void CmiFreeNodeBroadcastFn(int s, char *m)
1752 {
1753   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1754   CmiFree(m);
1755 }
1756
1757 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1758 {
1759   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1760 }
1761
1762 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1763 {
1764   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1765 }
1766
1767 /* need */
1768 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1769 {
1770   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1771   CmiFree(m);
1772 }
1773 #endif
1774
1775 /************************** MAIN ***********************************/
1776 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1777
1778 void ConverseExit(void)
1779 {
1780 #if ! CMK_SMP
1781   while(!CmiAllAsyncMsgsSent()) {
1782     PumpMsgs();
1783     CmiReleaseSentMessages();
1784   }
1785   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1786     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1787
1788   ConverseCommonExit();
1789 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1790   if (CmiMyPe() == 0){
1791     CmiPrintf("End of program\n");
1792 #if MPI_POST_RECV_COUNT > 0
1793     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1794 #endif
1795 }
1796 #endif
1797 #if ! CMK_AUTOBUILD
1798   signal(SIGINT, signal_int);
1799   MPI_Finalize();
1800 #endif
1801   exit(0);
1802
1803 #else
1804     /* SMP version, communication thread will exit */
1805   ConverseCommonExit();
1806   /* atomic increment */
1807   CmiLock(exitLock);
1808   inexit++;
1809   CmiUnlock(exitLock);
1810   while (1) CmiYield();
1811 #endif
1812 }
1813
1814 static void registerMPITraceEvents() {
1815 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1816     traceRegisterUserEvent("MPI_Barrier", 10);
1817     traceRegisterUserEvent("MPI_Send", 20);
1818     traceRegisterUserEvent("MPI_Recv", 30);
1819     traceRegisterUserEvent("MPI_Isend", 40);
1820     traceRegisterUserEvent("MPI_Irecv", 50);
1821     traceRegisterUserEvent("MPI_Test", 60);
1822     traceRegisterUserEvent("MPI_Iprobe", 70);
1823 #endif
1824 }
1825
1826
1827 static char     **Cmi_argv;
1828 static char     **Cmi_argvcopy;
1829 static CmiStartFn Cmi_startfn;   /* The start function */
1830 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1831
1832 typedef struct {
1833   int sleepMs; /*Milliseconds to sleep while idle*/
1834   int nIdles; /*Number of times we've been idle in a row*/
1835   CmiState cs; /*Machine state*/
1836 } CmiIdleState;
1837
1838 static CmiIdleState *CmiNotifyGetState(void)
1839 {
1840   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1841   s->sleepMs=0;
1842   s->nIdles=0;
1843   s->cs=CmiGetState();
1844   return s;
1845 }
1846
1847 static void CmiNotifyBeginIdle(CmiIdleState *s)
1848 {
1849   s->sleepMs=0;
1850   s->nIdles=0;
1851 }
1852
1853 static void CmiNotifyStillIdle(CmiIdleState *s)
1854 {
1855 #if ! CMK_SMP
1856   CmiReleaseSentMessages();
1857   PumpMsgs();
1858 #else
1859 /*  CmiYield();  */
1860 #endif
1861
1862 #if 1
1863   {
1864   int nSpins=20; /*Number of times to spin before sleeping*/
1865   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1866   s->nIdles++;
1867   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1868     s->sleepMs+=2;
1869     if (s->sleepMs>10) s->sleepMs=10;
1870   }
1871   /*Comm. thread will listen on sockets-- just sleep*/
1872   if (s->sleepMs>0) {
1873     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1874     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1875     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1876   }
1877   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1878   }
1879 #endif
1880 }
1881
1882 #if MACHINE_DEBUG_LOG
1883 FILE *debugLog = NULL;
1884 #endif
1885
1886 static int machine_exit_idx;
1887 static void machine_exit(char *m) {
1888   EmergencyExit();
1889   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1890   fflush(stdout);
1891   CmiNodeBarrier();
1892   if (CmiMyRank() == 0) {
1893     MPI_Barrier(MPI_COMM_WORLD);
1894     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1895     MPI_Abort(MPI_COMM_WORLD, 1);
1896   } else {
1897     while (1) CmiYield();
1898   }
1899 }
1900
1901 static void KillOnAllSigs(int sigNo) {
1902   static int already_in_signal_handler = 0;
1903   char *m;
1904   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1905   already_in_signal_handler = 1;
1906 #if CMK_CCS_AVAILABLE
1907   if (CpvAccess(cmiArgDebugFlag)) {
1908     CpdNotify(CPD_SIGNAL, sigNo);
1909     CpdFreeze();
1910   }
1911 #endif
1912   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1913       "Signal: %d\n",CmiMyPe(),sigNo);
1914   CmiPrintStackTrace(1);
1915
1916   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1917   CmiSetHandler(m, machine_exit_idx);
1918   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1919   machine_exit(m);
1920 }
1921
1922 static void ConverseRunPE(int everReturn)
1923 {
1924   CmiIdleState *s=CmiNotifyGetState();
1925   CmiState cs;
1926   char** CmiMyArgv;
1927
1928   CmiNodeAllBarrier();
1929
1930   cs = CmiGetState();
1931   CpvInitialize(void *,CmiLocalQueue);
1932   CpvAccess(CmiLocalQueue) = cs->localqueue;
1933
1934   if (CmiMyRank())
1935     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1936   else
1937     CmiMyArgv=Cmi_argv;
1938
1939   CthInit(CmiMyArgv);
1940
1941   ConverseCommonInit(CmiMyArgv);
1942   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1943
1944 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1945   CpvInitialize(double, projTraceStart);
1946   /* only PE 0 needs to care about registration (to generate sts file). */
1947   if (CmiMyPe() == 0) {
1948     registerMachineUserEventsFunction(&registerMPITraceEvents);
1949   }
1950 #endif
1951
1952   /* initialize the network progress counter*/
1953   /* Network progress function is used to poll the network when for
1954      messages. This flushes receive buffers on some  implementations*/
1955   CpvInitialize(int , networkProgressCount);
1956   CpvAccess(networkProgressCount) = 0;
1957
1958 #if CMK_SMP
1959   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1960   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1961 #else
1962   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1963 #endif
1964
1965 #if MACHINE_DEBUG_LOG
1966   if (CmiMyRank() == 0) {
1967     char ln[200];
1968     sprintf(ln,"debugLog.%d",CmiMyNode());
1969     debugLog=fopen(ln,"w");
1970   }
1971 #endif
1972
1973   /* Converse initialization finishes, immediate messages can be processed.
1974      node barrier previously should take care of the node synchronization */
1975   _immediateReady = 1;
1976
1977   /* communication thread */
1978   if (CmiMyRank() == CmiMyNodeSize()) {
1979     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1980     while (1) CommunicationServerThread(5);
1981   }
1982   else {  /* worker thread */
1983   if (!everReturn) {
1984     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1985     if (Cmi_usrsched==0) CsdScheduler(-1);
1986     ConverseExit();
1987   }
1988   }
1989 }
1990
1991 static char *thread_level_tostring(int thread_level)
1992 {
1993 #if CMK_MPI_INIT_THREAD
1994   switch (thread_level) {
1995   case MPI_THREAD_SINGLE:
1996       return "MPI_THREAD_SINGLE";
1997   case MPI_THREAD_FUNNELED:
1998       return "MPI_THREAD_FUNNELED";
1999   case MPI_THREAD_SERIALIZED:
2000       return "MPI_THREAD_SERIALIZED";
2001   case MPI_THREAD_MULTIPLE :
2002       return "MPI_THREAD_MULTIPLE ";
2003   default: {
2004       char *str = (char*)malloc(5);
2005       sprintf(str,"%d", thread_level);
2006       return str;
2007       }
2008   }
2009   return  "unknown";
2010 #else
2011   char *str = (char*)malloc(5);
2012   sprintf(str,"%d", thread_level);
2013   return str;
2014 #endif
2015 }
2016
2017 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2018 {
2019   int n,i;
2020   int ver, subver;
2021   int provided;
2022   int thread_level;
2023
2024 #if MACHINE_DEBUG
2025   debugLog=NULL;
2026 #endif
2027 #if CMK_USE_HP_MAIN_FIX
2028 #if FOR_CPLUS
2029   _main(argc,argv);
2030 #endif
2031 #endif
2032
2033 #if CMK_MPI_INIT_THREAD
2034 #if CMK_SMP
2035   thread_level = MPI_THREAD_FUNNELED;
2036 #else
2037   thread_level = MPI_THREAD_SINGLE;
2038 #endif
2039   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2040   _thread_provided = provided;
2041 #else
2042   MPI_Init(&argc, &argv);
2043   thread_level = 0;
2044   provided = -1;
2045 #endif
2046   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2047   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2048
2049   MPI_Get_version(&ver, &subver);
2050   if (_Cmi_mynode == 0) {
2051     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2052   }
2053
2054   /* processor per node */
2055   _Cmi_mynodesize = 1;
2056   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2057     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2058 #if ! CMK_SMP
2059   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2060     CmiAbort("+ppn cannot be used in non SMP version!\n");
2061 #endif
2062   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2063   if (idleblock && _Cmi_mynode == 0) {
2064     printf("Charm++: Running in idle blocking mode.\n");
2065   }
2066
2067   /* setup signal handlers */
2068   signal(SIGSEGV, KillOnAllSigs);
2069   signal(SIGFPE, KillOnAllSigs);
2070   signal(SIGILL, KillOnAllSigs);
2071   signal_int = signal(SIGINT, KillOnAllSigs);
2072   signal(SIGTERM, KillOnAllSigs);
2073   signal(SIGABRT, KillOnAllSigs);
2074 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2075   signal(SIGQUIT, KillOnAllSigs);
2076   signal(SIGBUS, KillOnAllSigs);
2077 /*#     if CMK_HANDLE_SIGUSR
2078   signal(SIGUSR1, HandleUserSignals);
2079   signal(SIGUSR2, HandleUserSignals);
2080 #     endif*/
2081 #   endif /*UNIX*/
2082   
2083 #if CMK_NO_OUTSTANDING_SENDS
2084   no_outstanding_sends=1;
2085 #endif
2086   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2087     no_outstanding_sends = 1;
2088     if (_Cmi_mynode == 0)
2089       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2090         no_outstanding_sends?"":" not");
2091   }
2092   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2093   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2094   Cmi_argvcopy = CmiCopyArgs(argv);
2095   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2096   /* find dim = log2(numpes), to pretend we are a hypercube */
2097   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2098     Cmi_dim++ ;
2099  /* CmiSpanTreeInit();*/
2100   request_max=MAX_QLEN;
2101   CmiGetArgInt(argv,"+requestmax",&request_max);
2102   /*printf("request max=%d\n", request_max);*/
2103
2104   /* checksum flag */
2105   if (CmiGetArgFlag(argv,"+checksum")) {
2106 #if !CMK_OPTIMIZE
2107     checksum_flag = 1;
2108     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2109 #else
2110     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2111 #endif
2112   }
2113
2114   {
2115   int debug = CmiGetArgFlag(argv,"++debug");
2116   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2117   if (debug || debug_no_pause)
2118   {   /*Pause so user has a chance to start and attach debugger*/
2119 #if CMK_HAS_GETPID
2120     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2121     fflush(stdout);
2122     if (!debug_no_pause)
2123       sleep(15);
2124 #else
2125     printf("++debug ignored.\n");
2126 #endif
2127   }
2128   }
2129
2130 #if MPI_POST_RECV_COUNT > 0
2131
2132   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2133   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2134   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2135   CpvInitialize(char*,CmiPostedRecvBuffers);
2136
2137     /* Post some extra recvs to help out with incoming messages */
2138     /* On some MPIs the messages are unexpected and thus slow */
2139
2140     /* An array of request handles for posted recvs */
2141     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2142
2143     /* An array of buffers for posted recvs */
2144     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2145
2146     /* Post Recvs */
2147     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2148         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2149                     MPI_POST_RECV_SIZE,
2150                     MPI_BYTE,
2151                     MPI_ANY_SOURCE,
2152                     POST_RECV_TAG,
2153                     MPI_COMM_WORLD,
2154                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2155           CmiAbort("MPI_Irecv failed\n");
2156     }
2157
2158 #endif
2159
2160
2161
2162   /* CmiTimerInit(); */
2163
2164 #if 0
2165   CthInit(argv);
2166   ConverseCommonInit(argv);
2167
2168   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2169   if (initret==0) {
2170     fn(CmiGetArgc(argv), argv);
2171     if (usched==0) CsdScheduler(-1);
2172     ConverseExit();
2173   }
2174 #endif
2175
2176   CsvInitialize(CmiNodeState, NodeState);
2177   CmiNodeStateInit(&CsvAccess(NodeState));
2178
2179   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2180
2181   for (i=0; i<_Cmi_mynodesize+1; i++) {
2182 #if MULTI_SENDQUEUE
2183     procState[i].sendMsgBuf = PCQueueCreate();
2184 #endif
2185     procState[i].recvLock = CmiCreateLock();
2186   }
2187 #if CMK_SMP
2188 #if !MULTI_SENDQUEUE
2189   sendMsgBuf = PCQueueCreate();
2190   sendMsgBufLock = CmiCreateLock();
2191 #endif
2192   exitLock = CmiCreateLock();            /* exit count lock */
2193 #endif
2194
2195   /* Network progress function is used to poll the network when for
2196      messages. This flushes receive buffers on some  implementations*/
2197   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2198   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2199
2200   CmiStartThreads(argv);
2201   ConverseRunPE(initret);
2202 }
2203
2204 /***********************************************************************
2205  *
2206  * Abort function:
2207  *
2208  ************************************************************************/
2209
2210 void CmiAbort(const char *message)
2211 {
2212   char *m;
2213   /* if CharmDebug is attached simply try to send a message to it */
2214 #if CMK_CCS_AVAILABLE
2215   if (CpvAccess(cmiArgDebugFlag)) {
2216     CpdNotify(CPD_ABORT, message);
2217     CpdFreeze();
2218   }
2219 #endif  
2220   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2221         "Reason: %s\n",CmiMyPe(),message);
2222  /*  CmiError(message); */
2223   CmiPrintStackTrace(0);
2224   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2225   CmiSetHandler(m, machine_exit_idx);
2226   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2227   machine_exit(m);
2228   /* Program never reaches here */
2229   MPI_Abort(MPI_COMM_WORLD, 1);
2230 }
2231
2232
2233 #if 0
2234
2235 /* ****************************************************************** */
2236 /*    The following internal functions implement recd msg queue       */
2237 /* ****************************************************************** */
2238
2239 static void ** AllocBlock(unsigned int len)
2240 {
2241   void ** blk;
2242
2243   blk=(void **)CmiAlloc(len*sizeof(void *));
2244   if(blk==(void **)0) {
2245     CmiError("Cannot Allocate Memory!\n");
2246     MPI_Abort(MPI_COMM_WORLD, 1);
2247   }
2248   return blk;
2249 }
2250
2251 static void
2252 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2253 {
2254   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2255   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2256 }
2257
2258 void recdQueueInit(void)
2259 {
2260   recdQueue_blk = AllocBlock(BLK_LEN);
2261   recdQueue_blk_len = BLK_LEN;
2262   recdQueue_first = 0;
2263   recdQueue_len = 0;
2264 }
2265
2266 void recdQueueAddToBack(void *element)
2267 {
2268 #if NODE_0_IS_CONVHOST
2269   inside_comm = 1;
2270 #endif
2271   if(recdQueue_len==recdQueue_blk_len) {
2272     void **blk;
2273     recdQueue_blk_len *= 3;
2274     blk = AllocBlock(recdQueue_blk_len);
2275     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2276     CmiFree(recdQueue_blk);
2277     recdQueue_blk = blk;
2278     recdQueue_first = 0;
2279   }
2280   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2281 #if NODE_0_IS_CONVHOST
2282   inside_comm = 0;
2283 #endif
2284 }
2285
2286
2287 void * recdQueueRemoveFromFront(void)
2288 {
2289   if(recdQueue_len) {
2290     void *element;
2291     element = recdQueue_blk[recdQueue_first++];
2292     recdQueue_first %= recdQueue_blk_len;
2293     recdQueue_len--;
2294     return element;
2295   }
2296   return 0;
2297 }
2298
2299 #endif
2300
2301 /*@}*/