added semi-dynamic cap control for send/recv for smp mode. What semi-dynamic means...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #if CMK_SMP
51 /* currently only considering the smp case */
52 #define CMI_DYNAMIC_EXERT_CAP 1
53 /* This macro defines the max number of msgs in the sender msg buffer 
54  * that is allowed for recving operation to continue
55  */
56 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 10
57 #define CMI_DYNAMIC_SEND_CAPSIZE 10
58 #define CMI_DYNAMIC_RECV_CAPSIZE 10
59 /* initial values, -1 indiates there's no cap */
60 static int dynamicSendCap = -1;
61 static int dynamicRecvCap = -1;
62 #endif
63
64 #if CMI_EXERT_SEND_CAP
65 #define SEND_CAP 3
66 #endif
67
68 #if CMI_EXERT_RECV_CAP
69 #define RECV_CAP 2
70 #endif
71
72
73 #if CMK_SMP_TRACE_COMMTHREAD
74 #define CMI_MPI_TRACE_MOREDETAILED 0
75 #undef CMI_MPI_TRACE_USEREVENTS
76 #define CMI_MPI_TRACE_USEREVENTS 1
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #endif
84
85 #include "machine.h"
86
87 #include "pcqueue.h"
88
89 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
90
91 #if CMK_BLUEGENEL
92 #define MAX_QLEN 8
93 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
94 #else
95 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
96 #define MAX_QLEN 200
97 #endif
98
99 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
100 CpvStaticDeclare(double, projTraceStart);
101 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
102 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
103 #else
104 # define  START_EVENT()
105 # define  END_EVENT(x)
106 #endif
107
108 /*
109     To reduce the buffer used in broadcast and distribute the load from
110   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
111   spanning tree broadcast algorithm.
112     This will use the fourth short in message as an indicator of spanning tree
113   root.
114 */
115 #define CMK_BROADCAST_SPANNING_TREE    1
116 #define CMK_BROADCAST_HYPERCUBE        0
117
118 #define BROADCAST_SPANNING_FACTOR      4
119 /* The number of children used when a msg is broadcast inside a node */
120 #define BROADCAST_SPANNING_INTRA_FACTOR      8
121
122 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
123 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
124
125 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
126 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
127
128 /* FIXME: need a random number that everyone agrees ! */
129 #define CHARM_MAGIC_NUMBER               126
130
131 #if CMK_ERROR_CHECKING
132 static int checksum_flag = 0;
133 #define CMI_SET_CHECKSUM(msg, len)      \
134         if (checksum_flag)  {   \
135           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
136           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
137         }
138 #define CMI_CHECK_CHECKSUM(msg, len)    \
139         if (checksum_flag)      \
140           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
141             CmiAbort("Fatal error: checksum doesn't agree!\n");
142 #else
143 #define CMI_SET_CHECKSUM(msg, len)
144 #define CMI_CHECK_CHECKSUM(msg, len)
145 #endif
146
147 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
148 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
149 #else
150 #define CMI_SET_BROADCAST_ROOT(msg, root) 
151 #endif
152
153
154 /** 
155     If MPI_POST_RECV is defined, we provide default values for size 
156     and number of posted recieves. If MPI_POST_RECV_COUNT is set
157     then a default value for MPI_POST_RECV_SIZE is used if not specified
158     by the user.
159 */
160 #ifdef MPI_POST_RECV
161 #define MPI_POST_RECV_COUNT 10
162 #undef MPI_POST_RECV
163 #endif
164 #if MPI_POST_RECV_COUNT > 0
165 #warning "Using MPI posted receives which have not yet been tested"
166 #ifndef MPI_POST_RECV_SIZE
167 #define MPI_POST_RECV_SIZE 200
168 #endif
169 /* #undef  MPI_POST_RECV_DEBUG  */
170 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
171 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
172 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
173 CpvDeclare(char*,CmiPostedRecvBuffers);
174 #endif
175
176 /*
177  to avoid MPI's in order delivery, changing MPI Tag all the time
178 */
179 #define TAG     1375
180
181 #if MPI_POST_RECV_COUNT > 0
182 #define POST_RECV_TAG TAG+1
183 #define BARRIER_ZERO_TAG TAG
184 #else
185 #define BARRIER_ZERO_TAG     1375
186 #endif
187
188 #include <signal.h>
189 void (*signal_int)(int);
190
191 /*
192 static int mpi_tag = TAG;
193 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
194 */
195
196 static int        _thread_provided = -1;
197 int               _Cmi_numpes;
198 int               _Cmi_mynode;    /* Which address space am I */
199 int               _Cmi_mynodesize;/* Number of processors in my address space */
200 int               _Cmi_numnodes;  /* Total number of address spaces */
201 int               _Cmi_numpes;    /* Total number of processors */
202 static int        Cmi_nodestart; /* First processor in this address space */
203 CpvDeclare(void*, CmiLocalQueue);
204
205 /*Network progress utility variables. Period controls the rate at
206   which the network poll is called */
207 CpvDeclare(unsigned , networkProgressCount);
208 int networkProgressPeriod;
209
210 int               idleblock = 0;
211
212 #define BLK_LEN  512
213
214 #if CMK_NODE_QUEUE_AVAILABLE
215 #define DGRAM_NODEMESSAGE   (0xFB)
216
217 #define NODE_BROADCAST_OTHERS (-1)
218 #define NODE_BROADCAST_ALL    (-2)
219 #endif
220
221 #if 0
222 static void **recdQueue_blk;
223 static unsigned int recdQueue_blk_len;
224 static unsigned int recdQueue_first;
225 static unsigned int recdQueue_len;
226 static void recdQueueInit(void);
227 static void recdQueueAddToBack(void *element);
228 static void *recdQueueRemoveFromFront(void);
229 #endif
230
231 static void ConverseRunPE(int everReturn);
232 static void CommunicationServer(int sleepTime);
233 static void CommunicationServerThread(int sleepTime);
234
235 typedef struct msg_list {
236      char *msg;
237      struct msg_list *next;
238      int size, destpe;
239
240 #if CMK_SMP_TRACE_COMMTHREAD
241         int srcpe;
242 #endif  
243         
244      MPI_Request req;
245 } SMSG_LIST;
246
247 int MsgQueueLen=0;
248 static int request_max;
249
250 static SMSG_LIST *sent_msgs=0;
251 static SMSG_LIST *end_sent=0;
252
253 static int Cmi_dim;
254
255 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
256
257 #if NODE_0_IS_CONVHOST
258 int inside_comm = 0;
259 #endif
260
261 void CmiAbort(const char *message);
262 static void PerrorExit(const char *msg);
263
264 void SendSpanningChildren(int size, char *msg);
265 void SendHypercube(int size, char *msg);
266
267 static void PerrorExit(const char *msg)
268 {
269   perror(msg);
270   exit(1);
271 }
272
273 extern unsigned char computeCheckSum(unsigned char *data, int len);
274
275 /**************************  TIMER FUNCTIONS **************************/
276
277 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
278
279 /* MPI calls are not threadsafe, even the timer on some machines */
280 static CmiNodeLock  timerLock = 0;
281 static int _absoluteTime = 0;
282 static double starttimer = 0;
283 static int _is_global = 0;
284
285 int CmiTimerIsSynchronized()
286 {
287   int  flag;
288   void *v;
289
290   /*  check if it using synchronized timer */
291   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
292     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
293   if (flag) {
294     _is_global = *(int*)v;
295     if (_is_global && CmiMyPe() == 0)
296       printf("Charm++> MPI timer is synchronized!\n");
297   }
298   return _is_global;
299 }
300
301 int CmiTimerAbsolute()
302 {       
303   return _absoluteTime;
304 }
305
306 double CmiStartTimer()
307 {
308   return 0.0;
309 }
310
311 double CmiInitTime()
312 {
313   return starttimer;
314 }
315
316 void CmiTimerInit(char **argv)
317 {
318   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
319
320   _is_global = CmiTimerIsSynchronized();
321
322   if (_is_global) {
323     if (CmiMyRank() == 0) {
324       double minTimer;
325 #if CMK_TIMER_USE_XT3_DCLOCK
326       starttimer = dclock();
327 #else
328       starttimer = MPI_Wtime();
329 #endif
330
331       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
332                                   MPI_COMM_WORLD );
333       starttimer = minTimer;
334     }
335   }
336   else {  /* we don't have a synchronous timer, set our own start time */
337     CmiBarrier();
338     CmiBarrier();
339     CmiBarrier();
340 #if CMK_TIMER_USE_XT3_DCLOCK
341     starttimer = dclock();
342 #else
343     starttimer = MPI_Wtime();
344 #endif
345   }
346
347 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
348   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
349     timerLock = CmiCreateLock();
350 #endif
351   CmiNodeAllBarrier();          /* for smp */
352 }
353
354 /**
355  * Since the timerLock is never created, and is
356  * always NULL, then all the if-condition inside
357  * the timer functions could be disabled right
358  * now in the case of SMP. --Chao Mei
359  */
360 double CmiTimer(void)
361 {
362   double t;
363 #if 0 && CMK_SMP
364   if (timerLock) CmiLock(timerLock);
365 #endif
366
367 #if CMK_TIMER_USE_XT3_DCLOCK
368   t = dclock();
369 #else
370   t = MPI_Wtime();
371 #endif
372
373 #if 0 && CMK_SMP
374   if (timerLock) CmiUnlock(timerLock);
375 #endif
376
377   return _absoluteTime?t: (t-starttimer);
378 }
379
380 double CmiWallTimer(void)
381 {
382   double t;
383 #if 0 && CMK_SMP
384   if (timerLock) CmiLock(timerLock);
385 #endif
386
387 #if CMK_TIMER_USE_XT3_DCLOCK
388   t = dclock();
389 #else
390   t = MPI_Wtime();
391 #endif
392
393 #if 0 && CMK_SMP
394   if (timerLock) CmiUnlock(timerLock);
395 #endif
396
397   return _absoluteTime? t: (t-starttimer);
398 }
399
400 double CmiCpuTimer(void)
401 {
402   double t;
403 #if 0 && CMK_SMP
404   if (timerLock) CmiLock(timerLock);
405 #endif
406 #if CMK_TIMER_USE_XT3_DCLOCK
407   t = dclock() - starttimer;
408 #else
409   t = MPI_Wtime() - starttimer;
410 #endif
411 #if 0 && CMK_SMP
412   if (timerLock) CmiUnlock(timerLock);
413 #endif
414   return t;
415 }
416
417 #endif
418
419 /* must be called on all ranks including comm thread in SMP */
420 int CmiBarrier()
421 {
422 #if CMK_SMP
423     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
424   CmiNodeAllBarrier();
425   if (CmiMyRank() == CmiMyNodeSize()) 
426 #else
427   if (CmiMyRank() == 0) 
428 #endif
429   {
430 /**
431  *  The call of CmiBarrier is usually before the initialization
432  *  of trace module of Charm++, therefore, the START_EVENT
433  *  and END_EVENT are disabled here. -Chao Mei
434  */     
435     /*START_EVENT();*/
436
437     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
438         CmiAbort("Timernit: MPI_Barrier failed!\n");
439
440     /*END_EVENT(10);*/
441   }
442   CmiNodeAllBarrier();
443   return 0;
444 }
445
446 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
447 int CmiBarrierZero()
448 {
449   int i;
450 #if CMK_SMP
451   if (CmiMyRank() == CmiMyNodeSize()) 
452 #else
453   if (CmiMyRank() == 0) 
454 #endif
455   {
456     char msg[1];
457     MPI_Status sts;
458     if (CmiMyNode() == 0)  {
459       for (i=0; i<CmiNumNodes()-1; i++) {
460          START_EVENT();
461
462          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
463             CmiPrintf("MPI_Recv failed!\n");
464
465          END_EVENT(30);
466       }
467     }
468     else {
469       START_EVENT();
470
471       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
472          printf("MPI_Send failed!\n");
473
474       END_EVENT(20);
475     }
476   }
477   CmiNodeAllBarrier();
478   return 0;
479 }
480
481 typedef struct ProcState {
482 #if MULTI_SENDQUEUE
483 PCQueue      sendMsgBuf;       /* per processor message sending queue */
484 #endif
485 CmiNodeLock  recvLock;              /* for cs->recv */
486 } ProcState;
487
488 static ProcState  *procState;
489
490 #if CMK_SMP
491
492 #if !MULTI_SENDQUEUE
493 static PCQueue sendMsgBuf;
494 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
495 #endif
496
497 #endif
498
499 /************************************************************
500  *
501  * Processor state structure
502  *
503  ************************************************************/
504
505 /* fake Cmi_charmrun_fd */
506 static int Cmi_charmrun_fd = 0;
507 #include "machine-smp.c"
508
509 CsvDeclare(CmiNodeState, NodeState);
510
511 #include "immediate.c"
512
513 #if CMK_SHARED_VARS_UNAVAILABLE
514 /************ non SMP **************/
515 static struct CmiStateStruct Cmi_state;
516 int _Cmi_mype;
517 int _Cmi_myrank;
518
519 void CmiMemLock() {}
520 void CmiMemUnlock() {}
521
522 #define CmiGetState() (&Cmi_state)
523 #define CmiGetStateN(n) (&Cmi_state)
524
525 void CmiYield(void) { sleep(0); }
526
527 static void CmiStartThreads(char **argv)
528 {
529   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
530   _Cmi_mype = Cmi_nodestart;
531   _Cmi_myrank = 0;
532 }
533 #endif  /* non smp */
534
535 /*Add a message to this processor's receive queue, pe is a rank */
536 void CmiPushPE(int pe,void *msg)
537 {
538   CmiState cs = CmiGetStateN(pe);
539   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
540 #if CMK_IMMEDIATE_MSG
541   if (CmiIsImmediate(msg)) {
542 /*
543 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
544     CmiHandleMessage(msg);
545 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
546 */
547     /**(CmiUInt2 *)msg = pe;*/
548     CMI_DEST_RANK(msg) = pe;
549     CmiPushImmediateMsg(msg);
550     return;
551   }
552 #endif
553
554 #if CMK_SMP
555   CmiLock(procState[pe].recvLock);
556 #endif
557   PCQueuePush(cs->recv,msg);
558 #if CMK_SMP
559   CmiUnlock(procState[pe].recvLock);
560 #endif
561   CmiIdleLock_addMessage(&cs->idle);
562   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
563 }
564
565 #if CMK_NODE_QUEUE_AVAILABLE
566 /*Add a message to this processor's receive queue */
567 static void CmiPushNode(void *msg)
568 {
569   MACHSTATE(3,"Pushing message into NodeRecv queue");
570 #if CMK_IMMEDIATE_MSG
571   if (CmiIsImmediate(msg)) {
572     CMI_DEST_RANK(msg) = 0;
573     CmiPushImmediateMsg(msg);
574     return;
575   }
576 #endif
577   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
578   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
579   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
580   {
581   CmiState cs=CmiGetStateN(0);
582   CmiIdleLock_addMessage(&cs->idle);
583   }
584 }
585 #endif
586
587 #ifndef CmiMyPe
588 int CmiMyPe(void)
589 {
590   return CmiGetState()->pe;
591 }
592 #endif
593
594 #ifndef CmiMyRank
595 int CmiMyRank(void)
596 {
597   return CmiGetState()->rank;
598 }
599 #endif
600
601 #ifndef CmiNodeFirst
602 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
603 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
604 #endif
605
606 #ifndef CmiNodeOf
607 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
608 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
609 #endif
610
611 static size_t CmiAllAsyncMsgsSent(void)
612 {
613    SMSG_LIST *msg_tmp = sent_msgs;
614    MPI_Status sts;
615    int done;
616
617    while(msg_tmp!=0) {
618     done = 0;
619     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
620       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
621     if(!done)
622       return 0;
623     msg_tmp = msg_tmp->next;
624 /*    MsgQueueLen--; ????? */
625    }
626    return 1;
627 }
628
629 int CmiAsyncMsgSent(CmiCommHandle c) {
630
631   SMSG_LIST *msg_tmp = sent_msgs;
632   int done;
633   MPI_Status sts;
634
635   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
636     msg_tmp = msg_tmp->next;
637   if(msg_tmp) {
638     done = 0;
639     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
640       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
641     return ((done)?1:0);
642   } else {
643     return 1;
644   }
645 }
646
647 void CmiReleaseCommHandle(CmiCommHandle c)
648 {
649   return;
650 }
651
652 #if CMK_BLUEGENEL
653 extern void MPID_Progress_test();
654 #endif
655
656 void CmiReleaseSentMessages(void)
657 {
658   SMSG_LIST *msg_tmp=sent_msgs;
659   SMSG_LIST *prev=0;
660   SMSG_LIST *temp;
661   int done;
662   MPI_Status sts;
663
664 #if CMK_BLUEGENEL
665   MPID_Progress_test();
666 #endif
667
668   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
669   while(msg_tmp!=0) {
670     done =0;
671 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
672     double startT = CmiWallTimer();
673 #endif
674     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
675       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
676     if(done) {
677       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
678       MsgQueueLen--;
679       /* Release the message */
680       temp = msg_tmp->next;
681       if(prev==0)  /* first message */
682         sent_msgs = temp;
683       else
684         prev->next = temp;
685       CmiFree(msg_tmp->msg);
686       CmiFree(msg_tmp);
687       msg_tmp = temp;
688     } else {
689       prev = msg_tmp;
690       msg_tmp = msg_tmp->next;
691     }
692 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
693     {
694     double endT = CmiWallTimer();
695     /* only record the event if it takes more than 1ms */
696     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
697     }
698 #endif
699   }
700   end_sent = prev;
701   MACHSTATE(2,"} CmiReleaseSentMessages end");
702 }
703
704 int PumpMsgs(void)
705 {
706   int nbytes, flg, res;
707   char *msg;
708   MPI_Status sts;
709   int recd=0;
710
711 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
712   int recvCnt=0;
713 #endif
714         
715 #if CMK_BLUEGENEL
716   MPID_Progress_test();
717 #endif
718
719   MACHSTATE(2,"PumpMsgs begin {");
720
721         
722   while(1) {
723 #if CMI_EXERT_RECV_CAP
724         if(recvCnt==RECV_CAP) break;
725 #elif CMI_DYNAMIC_EXERT_CAP
726         if(recvCnt == dynamicRecvCap) break;
727 #endif
728           
729     /* First check posted recvs then do  probe unmatched outstanding messages */
730 #if MPI_POST_RECV_COUNT > 0 
731     int completed_index=-1;
732     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
733         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
734     if(flg){
735         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
736             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
737
738         recd = 1;
739         msg = (char *) CmiAlloc(nbytes);
740         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
741         /* and repost the recv */
742
743         START_EVENT();
744
745         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
746             MPI_POST_RECV_SIZE,
747             MPI_BYTE,
748             MPI_ANY_SOURCE,
749             POST_RECV_TAG,
750             MPI_COMM_WORLD,
751             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
752                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
753
754         END_EVENT(50);
755
756         CpvAccess(Cmi_posted_recv_total)++;
757     }
758     else {
759         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
760         if(res != MPI_SUCCESS)
761         CmiAbort("MPI_Iprobe failed\n");
762         if(!flg) break;
763         recd = 1;
764         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
765         msg = (char *) CmiAlloc(nbytes);
766
767         START_EVENT();
768
769         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
770             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
771
772         END_EVENT(30);
773
774         CpvAccess(Cmi_unposted_recv_total)++;
775     }
776 #else
777     /* Original version */
778 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
779   double startT = CmiWallTimer(); 
780 #endif
781     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
782     if(res != MPI_SUCCESS)
783       CmiAbort("MPI_Iprobe failed\n");
784
785     if(!flg) break;
786 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
787     {
788     double endT = CmiWallTimer();
789     /* only trace the probe that last longer than 1ms */
790     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
791     }
792 #endif
793
794     recd = 1;
795     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
796     msg = (char *) CmiAlloc(nbytes);
797     
798     START_EVENT();
799
800     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
801       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
802
803     /*END_EVENT(30);*/
804
805 #endif
806
807 #if CMK_SMP_TRACE_COMMTHREAD
808         traceBeginCommOp(msg);
809         traceChangeLastTimestamp(CpvAccess(projTraceStart));
810         traceEndCommOp(msg);
811         #if CMI_MPI_TRACE_MOREDETAILED
812         char tmp[32];
813         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
814         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
815         #endif
816 #elif CMK_TRACE_COMMOVERHEAD
817         char tmp[32];
818         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
819         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
820 #endif
821         
822         
823     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
824     CMI_CHECK_CHECKSUM(msg, nbytes);
825 #if CMK_ERROR_CHECKING
826     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
827       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
828       CmiFree(msg);
829       CmiAbort("Abort!\n");
830       continue;
831     }
832 #endif
833         
834 #if CMK_BROADCAST_SPANNING_TREE
835     if (CMI_BROADCAST_ROOT(msg))
836       SendSpanningChildren(nbytes, msg);
837 #elif CMK_BROADCAST_HYPERCUBE
838     if (CMI_BROADCAST_ROOT(msg))
839       SendHypercube(nbytes, msg);
840 #endif
841         
842         /* In SMP mode, this push operation needs to be executed
843      * after forwarding broadcast messages. If it is executed
844      * earlier, then during the bcast msg forwarding period,    
845          * the msg could be already freed on the worker thread.
846          * As a result, the forwarded message could be wrong! 
847          * --Chao Mei
848          */
849 #if CMK_NODE_QUEUE_AVAILABLE
850     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
851       CmiPushNode(msg);
852     else
853 #endif
854         CmiPushPE(CMI_DEST_RANK(msg), msg);     
855         
856 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
857         recvCnt++;
858         /* check sendMsgBuf  to get the  number of messages that have not been sent */
859         /* MsgQueueLen indicates the number of messages that have not been released by MPI */
860         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD){
861                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
862         }
863 #endif  
864         
865   }
866
867   
868 #if CMK_IMMEDIATE_MSG && !CMK_SMP
869   CmiHandleImmediate();
870 #endif
871   
872   MACHSTATE(2,"} PumpMsgs end ");
873   return recd;
874 }
875
876 /* blocking version */
877 static void PumpMsgsBlocking(void)
878 {
879   static int maxbytes = 20000000;
880   static char *buf = NULL;
881   int nbytes, flg;
882   MPI_Status sts;
883   char *msg;
884   int recd=0;
885
886   if (!PCQueueEmpty(CmiGetState()->recv)) return;
887   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
888   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
889   if (sent_msgs)  return;
890
891 #if 0
892   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
893 #endif
894
895   if (buf == NULL) {
896     buf = (char *) CmiAlloc(maxbytes);
897     _MEMCHECK(buf);
898   }
899
900
901 #if MPI_POST_RECV_COUNT > 0
902 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
903 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
904 #endif
905
906   START_EVENT();
907
908   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
909       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
910
911   /*END_EVENT(30);*/
912     
913    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
914    msg = (char *) CmiAlloc(nbytes);
915    memcpy(msg, buf, nbytes);
916
917 #if CMK_SMP_TRACE_COMMTHREAD
918         traceBeginCommOp(msg);
919         traceChangeLastTimestamp(CpvAccess(projTraceStart));
920         traceEndCommOp(msg);
921         #if CMI_MPI_TRACE_MOREDETAILED
922         char tmp[32];
923         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
924         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
925         #endif
926 #endif
927
928 #if CMK_BROADCAST_SPANNING_TREE
929    if (CMI_BROADCAST_ROOT(msg))
930       SendSpanningChildren(nbytes, msg);
931 #elif CMK_BROADCAST_HYPERCUBE
932    if (CMI_BROADCAST_ROOT(msg))
933       SendHypercube(nbytes, msg);
934 #endif
935   
936         /* In SMP mode, this push operation needs to be executed
937      * after forwarding broadcast messages. If it is executed
938      * earlier, then during the bcast msg forwarding period,    
939          * the msg could be already freed on the worker thread.
940          * As a result, the forwarded message could be wrong! 
941          * --Chao Mei
942          */  
943 #if CMK_NODE_QUEUE_AVAILABLE
944    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
945       CmiPushNode(msg);
946    else
947 #endif
948       CmiPushPE(CMI_DEST_RANK(msg), msg);
949 }
950
951 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
952
953 #if CMK_SMP
954
955 static int inexit = 0;
956 static CmiNodeLock  exitLock = 0;
957
958 static int MsgQueueEmpty()
959 {
960   int i;
961 #if MULTI_SENDQUEUE
962   for (i=0; i<_Cmi_mynodesize; i++)
963     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
964 #else
965   return PCQueueEmpty(sendMsgBuf);
966 #endif
967   return 1;
968 }
969
970 static int SendMsgBuf();
971
972 /* test if all processors recv queues are empty */
973 static int RecvQueueEmpty()
974 {
975   int i;
976   for (i=0; i<_Cmi_mynodesize; i++) {
977     CmiState cs=CmiGetStateN(i);
978     if (!PCQueueEmpty(cs->recv)) return 0;
979   }
980   return 1;
981 }
982
983 /**
984 CommunicationServer calls MPI to send messages in the queues and probe message from network.
985 */
986
987 #define REPORT_COMM_METRICS 0
988 #if REPORT_COMM_METRICS
989 static double pumptime = 0.0;
990 static double releasetime = 0.0;
991 static double sendtime = 0.0;
992 #endif
993
994 static void CommunicationServer(int sleepTime)
995 {
996   int static count=0;
997 /*
998   count ++;
999   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1000 */
1001 #if REPORT_COMM_METRICS
1002   double t1, t2, t3, t4;
1003   t1 = CmiWallTimer();
1004 #endif
1005   PumpMsgs();
1006 #if REPORT_COMM_METRICS
1007   t2 = CmiWallTimer();
1008 #endif
1009   CmiReleaseSentMessages();
1010 #if REPORT_COMM_METRICS
1011   t3 = CmiWallTimer();
1012 #endif
1013   SendMsgBuf();
1014 #if REPORT_COMM_METRICS
1015   t4 = CmiWallTimer();
1016   pumptime += (t2-t1);
1017   releasetime += (t3-t2);
1018   sendtime += (t4-t3);
1019 #endif
1020 /*
1021   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1022 */
1023   if (inexit == CmiMyNodeSize()) {
1024     MACHSTATE(2, "CommunicationServer exiting {");
1025 #if 0
1026     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1027 #endif
1028     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1029       CmiReleaseSentMessages();
1030       SendMsgBuf();
1031       PumpMsgs();
1032     }
1033     MACHSTATE(2, "CommunicationServer barrier begin {");
1034
1035     START_EVENT();
1036
1037     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1038       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1039
1040     END_EVENT(10);
1041
1042     MACHSTATE(2, "} CommunicationServer barrier end");
1043 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1044     if (CmiMyNode() == 0){
1045       CmiPrintf("End of program\n");
1046     }
1047 #endif
1048     MACHSTATE(2, "} CommunicationServer EXIT");
1049
1050     ConverseCommonExit();   
1051 #if REPORT_COMM_METRICS
1052     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1053 #endif
1054
1055 #if ! CMK_AUTOBUILD
1056     signal(SIGINT, signal_int);
1057     MPI_Finalize();
1058     #endif
1059     exit(0);
1060   }
1061 }
1062
1063 #endif
1064
1065 static void CommunicationServerThread(int sleepTime)
1066 {
1067 #if CMK_SMP
1068   CommunicationServer(sleepTime);
1069 #endif
1070 #if CMK_IMMEDIATE_MSG
1071   CmiHandleImmediate();
1072 #endif
1073 }
1074
1075 #if CMK_NODE_QUEUE_AVAILABLE
1076 char *CmiGetNonLocalNodeQ(void)
1077 {
1078   CmiState cs = CmiGetState();
1079   char *result = 0;
1080   CmiIdleLock_checkMessage(&cs->idle);
1081 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1082     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1083     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1084     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1085     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1086     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1087 /*  }  */
1088
1089   return result;
1090 }
1091 #endif
1092
1093 void *CmiGetNonLocal(void)
1094 {
1095   static int count=0;
1096   CmiState cs = CmiGetState();
1097   void *msg;
1098
1099 #if ! CMK_SMP
1100   if (CmiNumPes() == 1) return NULL;
1101 #endif
1102
1103   CmiIdleLock_checkMessage(&cs->idle);
1104   /* although it seems that lock is not needed, I found it crashes very often
1105      on mpi-smp without lock */
1106
1107 #if ! CMK_SMP
1108   CmiReleaseSentMessages();
1109   PumpMsgs();
1110 #endif
1111
1112   /* CmiLock(procState[cs->rank].recvLock); */
1113   msg =  PCQueuePop(cs->recv);
1114   /* CmiUnlock(procState[cs->rank].recvLock); */
1115
1116 /*
1117   if (msg) {
1118     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1119   else {
1120     count++;
1121     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1122   }
1123 */
1124 #if ! CMK_SMP
1125   if (no_outstanding_sends) {
1126     while (MsgQueueLen>0) {
1127       CmiReleaseSentMessages();
1128       PumpMsgs();
1129     }
1130   }
1131
1132   if(!msg) {
1133     CmiReleaseSentMessages();
1134     if (PumpMsgs())
1135       return  PCQueuePop(cs->recv);
1136     else
1137       return 0;
1138   }
1139 #endif
1140
1141   return msg;
1142 }
1143
1144 /* called in non-smp mode */
1145 void CmiNotifyIdle(void)
1146 {
1147   CmiReleaseSentMessages();
1148   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1149 }
1150
1151
1152 /********************************************************
1153     The call to probe immediate messages has been renamed to
1154     CmiMachineProgressImpl
1155 ******************************************************/
1156 /* user call to handle immediate message, only useful in non SMP version
1157    using polling method to schedule message.
1158 */
1159 /*
1160 #if CMK_IMMEDIATE_MSG
1161 void CmiProbeImmediateMsg()
1162 {
1163 #if !CMK_SMP
1164   PumpMsgs();
1165   CmiHandleImmediate();
1166 #endif
1167 }
1168 #endif
1169 */
1170
1171 /* Network progress function is used to poll the network when for
1172    messages. This flushes receive buffers on some  implementations*/
1173 #if CMK_MACHINE_PROGRESS_DEFINED
1174 void CmiMachineProgressImpl()
1175 {
1176 #if !CMK_SMP
1177     PumpMsgs();
1178 #if CMK_IMMEDIATE_MSG
1179     CmiHandleImmediate();
1180 #endif
1181 #else
1182     /*Not implemented yet. Communication server does not seem to be
1183       thread safe, so only communication thread call it */
1184     if (CmiMyRank() == CmiMyNodeSize())
1185         CommunicationServerThread(0);
1186 #endif
1187 }
1188 #endif
1189
1190 /********************* MESSAGE SEND FUNCTIONS ******************/
1191
1192 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1193
1194 static void CmiSendSelf(char *msg)
1195 {
1196 #if CMK_IMMEDIATE_MSG
1197     if (CmiIsImmediate(msg)) {
1198       /* CmiBecomeNonImmediate(msg); */
1199       CmiPushImmediateMsg(msg);
1200       CmiHandleImmediate();
1201       return;
1202     }
1203 #endif
1204     CQdCreate(CpvAccess(cQdState), 1);
1205     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1206 }
1207
1208 void CmiSyncSendFn(int destPE, int size, char *msg)
1209 {
1210   CmiState cs = CmiGetState();
1211   char *dupmsg = (char *) CmiAlloc(size);
1212   memcpy(dupmsg, msg, size);
1213
1214   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1215
1216   if (cs->pe==destPE) {
1217     CmiSendSelf(dupmsg);
1218   }
1219   else
1220     CmiAsyncSendFn_(destPE, size, dupmsg);
1221 }
1222
1223 #if CMK_SMP
1224
1225 /* called by communication thread in SMP */
1226 static int SendMsgBuf()
1227 {
1228   SMSG_LIST *msg_tmp;
1229   char *msg;
1230   int node, rank, size;
1231   int i;
1232   int sent = 0;
1233
1234 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1235         int sentCnt = 0;
1236 #endif  
1237         
1238   MACHSTATE(2,"SendMsgBuf begin {");
1239 #if MULTI_SENDQUEUE
1240   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1241   {
1242     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1243     {
1244       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1245 #else
1246     /* single message sending queue */
1247     /* CmiLock(sendMsgBufLock); */
1248     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1249     /* CmiUnlock(sendMsgBufLock); */
1250     while (NULL != msg_tmp)
1251     {
1252 #endif
1253       node = msg_tmp->destpe;
1254       size = msg_tmp->size;
1255       msg = msg_tmp->msg;
1256       msg_tmp->next = 0;
1257                 
1258 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1259       while (MsgQueueLen > request_max) {
1260                 CmiReleaseSentMessages();
1261                 PumpMsgs();
1262       }
1263 #endif
1264           
1265       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1266 #if CMK_ERROR_CHECKING
1267       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1268 #endif
1269       CMI_SET_CHECKSUM(msg, size);
1270
1271 #if MPI_POST_RECV_COUNT > 0
1272         if(size <= MPI_POST_RECV_SIZE){
1273
1274           START_EVENT();
1275           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1276                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1277
1278           STOP_EVENT(40);
1279         }
1280         else {
1281             START_EVENT();
1282             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1283                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1284             STOP_EVENT(40);
1285         }
1286 #else
1287         START_EVENT();
1288         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1289             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1290         /*END_EVENT(40);*/
1291 #endif
1292         
1293 #if CMK_SMP_TRACE_COMMTHREAD
1294         traceBeginCommOp(msg);
1295         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1296         /* traceSendMsgComm must execute after traceBeginCommOp because
1297          * we pretend we execute an entry method, and inside this we
1298          * pretend we will send another message. Otherwise how could
1299          * a message creation just before an entry method invocation?
1300          * If such logic is broken, the projections will not trace
1301          * messages correctly! -Chao Mei
1302          */
1303         traceSendMsgComm(msg);
1304         traceEndCommOp(msg);
1305         #if CMI_MPI_TRACE_MOREDETAILED
1306         char tmp[64];
1307         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1308         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1309         #endif
1310 #endif
1311                 
1312                 
1313       MACHSTATE(3,"}MPI_send end");
1314       MsgQueueLen++;
1315       if(sent_msgs==0)
1316         sent_msgs = msg_tmp;
1317       else
1318         end_sent->next = msg_tmp;
1319       end_sent = msg_tmp;
1320       sent=1;
1321           
1322 #if CMI_EXERT_SEND_CAP    
1323           if(++sentCnt == SEND_CAP) break;
1324 #elif CMI_DYNAMIC_EXERT_CAP
1325           if(++sentCnt == dynamicSendCap) break;
1326           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1327                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1328 #endif    
1329           
1330 #if ! MULTI_SENDQUEUE
1331       /* CmiLock(sendMsgBufLock); */
1332       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1333       /* CmiUnlock(sendMsgBufLock); */
1334 #endif
1335     }
1336 #if MULTI_SENDQUEUE
1337   }
1338 #endif
1339   MACHSTATE(2,"}SendMsgBuf end ");
1340   return sent;
1341 }
1342
1343 void EnqueueMsg(void *m, int size, int node)
1344 {
1345   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1346   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1347   msg_tmp->msg = m;
1348   msg_tmp->size = size;
1349   msg_tmp->destpe = node;
1350         
1351 #if CMK_SMP_TRACE_COMMTHREAD
1352         msg_tmp->srcpe = CmiMyPe();
1353 #endif  
1354
1355 #if MULTI_SENDQUEUE
1356   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1357 #else
1358   CmiLock(sendMsgBufLock);
1359   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1360   CmiUnlock(sendMsgBufLock);
1361 #endif
1362         
1363   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1364 }
1365
1366 #endif
1367
1368 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1369 {
1370   CmiState cs = CmiGetState();
1371   SMSG_LIST *msg_tmp;
1372   CmiUInt2  rank, node;
1373
1374   if(destPE == cs->pe) {
1375     char *dupmsg = (char *) CmiAlloc(size);
1376     memcpy(dupmsg, msg, size);
1377     CmiSendSelf(dupmsg);
1378     return 0;
1379   }
1380   CQdCreate(CpvAccess(cQdState), 1);
1381 #if CMK_SMP
1382   node = CmiNodeOf(destPE);
1383   rank = CmiRankOf(destPE);
1384   if (node == CmiMyNode())  {
1385     CmiPushPE(rank, msg);
1386     return 0;
1387   }
1388   CMI_DEST_RANK(msg) = rank;
1389   EnqueueMsg(msg, size, node);
1390   return 0;
1391 #else
1392   /* non smp */
1393   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1394   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1395   msg_tmp->msg = msg;
1396   msg_tmp->next = 0;
1397   while (MsgQueueLen > request_max) {
1398         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1399         CmiReleaseSentMessages();
1400         PumpMsgs();
1401   }
1402 #if CMK_ERROR_CHECKING
1403   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1404 #endif
1405   CMI_SET_CHECKSUM(msg, size);
1406
1407 #if MPI_POST_RECV_COUNT > 0
1408         if(size <= MPI_POST_RECV_SIZE){
1409
1410           START_EVENT();
1411           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1412                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1413           END_EVENT(40);
1414         }
1415         else {
1416           START_EVENT();
1417           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1418                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1419           END_EVENT(40);
1420         }
1421 #else
1422   START_EVENT();
1423   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1424     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1425   /*END_EVENT(40);*/
1426   #if CMK_TRACE_COMMOVERHEAD
1427         char tmp[64];
1428         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1429         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1430   #endif
1431 #endif
1432
1433   MsgQueueLen++;
1434   if(sent_msgs==0)
1435     sent_msgs = msg_tmp;
1436   else
1437     end_sent->next = msg_tmp;
1438   end_sent = msg_tmp;
1439   return (CmiCommHandle) &(msg_tmp->req);
1440 #endif              /* non-smp */
1441 }
1442
1443 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1444 {
1445   CMI_SET_BROADCAST_ROOT(msg, 0);
1446   CmiAsyncSendFn_(destPE, size, msg);
1447 }
1448
1449 void CmiFreeSendFn(int destPE, int size, char *msg)
1450 {
1451   CmiState cs = CmiGetState();
1452   CMI_SET_BROADCAST_ROOT(msg, 0);
1453
1454   if (cs->pe==destPE) {
1455     CmiSendSelf(msg);
1456   } else {
1457     CmiAsyncSendFn_(destPE, size, msg);
1458   }
1459 }
1460
1461 /*********************** BROADCAST FUNCTIONS **********************/
1462
1463 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1464 void CmiSyncSendFn1(int destPE, int size, char *msg)
1465 {
1466   CmiState cs = CmiGetState();
1467   char *dupmsg = (char *) CmiAlloc(size);
1468   memcpy(dupmsg, msg, size);
1469   if (cs->pe==destPE)
1470     CmiSendSelf(dupmsg);
1471   else
1472     CmiAsyncSendFn_(destPE, size, dupmsg);
1473 }
1474
1475 /* send msg to its spanning children in broadcast. G. Zheng */
1476 void SendSpanningChildren(int size, char *msg)
1477 {
1478   CmiState cs = CmiGetState();
1479   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1480   int startnode = CmiNodeOf(startpe);
1481   int i, exceptRank;
1482         
1483    /* first send msgs to other nodes */
1484   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1485   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1486     int nd = CmiMyNode()-startnode;
1487     if (nd<0) nd+=CmiNumNodes();
1488     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1489     if (nd > CmiNumNodes() - 1) break;
1490     nd += startnode;
1491     nd = nd%CmiNumNodes();
1492     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1493         #if CMK_SMP
1494         /* always send to the first rank of other nodes */
1495         char *newmsg = CmiCopyMsg(msg, size);
1496         CMI_DEST_RANK(newmsg) = 0;
1497     EnqueueMsg(newmsg, size, nd);
1498         #else
1499         CmiSyncSendFn1(nd, size, msg);
1500         #endif
1501   }
1502 #if CMK_SMP  
1503    /* second send msgs to my peers on this node */
1504   /* FIXME: now it's just a flat p2p send!! When node size is large,
1505    * it should also be sent in a tree
1506    */
1507    exceptRank = CMI_DEST_RANK(msg);
1508    for(i=0; i<exceptRank; i++){
1509            CmiPushPE(i, CmiCopyMsg(msg, size));
1510    }
1511    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1512            CmiPushPE(i, CmiCopyMsg(msg, size));
1513    }
1514 #endif
1515 }
1516
1517 #include <math.h>
1518
1519 /* send msg along the hypercube in broadcast. (Sameer) */
1520 void SendHypercube(int size, char *msg)
1521 {
1522   CmiState cs = CmiGetState();
1523   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1524   int startnode = CmiNodeOf(startpe);
1525   int i, exceptRank, cnt, tmp, relPE;
1526   int dims=0;
1527
1528   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1529   tmp = CmiNumNodes()-1;
1530   while(tmp>0){
1531           dims++;
1532           tmp = tmp >> 1;
1533   }
1534   if(CmiNumNodes()==1) dims=1;
1535   
1536    /* first send msgs to other nodes */  
1537   relPE = CmiMyNode()-startnode;
1538   if(relPE < 0) relPE += CmiNumNodes();
1539   cnt=0;
1540   tmp = relPE;
1541   /* count how many zeros (in binary format) relPE has */
1542   for(i=0; i<dims; i++, cnt++){
1543     if(tmp & 1 == 1) break;
1544     tmp = tmp >> 1;
1545   }
1546   
1547   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1548   for (i = cnt-1; i >= 0; i--) {
1549     int nd = relPE + (1 << i);
1550         if(nd >= CmiNumNodes()) continue;
1551         nd = (nd+startnode)%CmiNumNodes();
1552         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1553 #if CMK_SMP
1554     /* always send to the first rank of other nodes */
1555     char *newmsg = CmiCopyMsg(msg, size);
1556     CMI_DEST_RANK(newmsg) = 0;
1557     EnqueueMsg(newmsg, size, nd);
1558 #else
1559         CmiSyncSendFn1(nd, size, msg);
1560 #endif
1561   }
1562   
1563 #if CMK_SMP
1564    /* second send msgs to my peers on this node */
1565    /* FIXME: now it's just a flat p2p send!! When node size is large,
1566     * it should also be sent in a tree
1567     */
1568    exceptRank = CMI_DEST_RANK(msg);
1569    for(i=0; i<exceptRank; i++){
1570            CmiPushPE(i, CmiCopyMsg(msg, size));
1571    }
1572    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1573            CmiPushPE(i, CmiCopyMsg(msg, size));
1574    }
1575 #endif
1576 }
1577
1578 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1579 {
1580   CmiState cs = CmiGetState();
1581
1582 #if CMK_SMP     
1583   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1584   CMI_DEST_RANK(msg) = CmiMyRank();
1585 #endif
1586         
1587 #if CMK_BROADCAST_SPANNING_TREE
1588   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1589   SendSpanningChildren(size, msg);
1590
1591 #elif CMK_BROADCAST_HYPERCUBE
1592   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1593   SendHypercube(size, msg);
1594
1595 #else
1596   int i;
1597
1598   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1599     CmiSyncSendFn(i, size,msg) ;
1600   for ( i=0; i<cs->pe; i++ )
1601     CmiSyncSendFn(i, size,msg) ;
1602 #endif
1603
1604   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1605 }
1606
1607
1608 /*  FIXME: luckily async is never used  G. Zheng */
1609 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1610 {
1611   CmiState cs = CmiGetState();
1612   int i ;
1613
1614   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1615     CmiAsyncSendFn(i,size,msg) ;
1616   for ( i=0; i<cs->pe; i++ )
1617     CmiAsyncSendFn(i,size,msg) ;
1618
1619   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1620   CmiAbort("CmiAsyncBroadcastFn should never be called");
1621   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1622 }
1623
1624 void CmiFreeBroadcastFn(int size, char *msg)
1625 {
1626    CmiSyncBroadcastFn(size,msg);
1627    CmiFree(msg);
1628 }
1629
1630 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1631 {
1632
1633 #if CMK_SMP     
1634   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1635   CMI_DEST_RANK(msg) = CmiMyRank();
1636 #endif
1637
1638 #if CMK_BROADCAST_SPANNING_TREE
1639   CmiState cs = CmiGetState();
1640   CmiSyncSendFn(cs->pe, size,msg) ;
1641   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1642   SendSpanningChildren(size, msg);
1643
1644 #elif CMK_BROADCAST_HYPERCUBE
1645   CmiState cs = CmiGetState();
1646   CmiSyncSendFn(cs->pe, size,msg) ;
1647   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1648   SendHypercube(size, msg);
1649
1650 #else
1651     int i ;
1652
1653   for ( i=0; i<_Cmi_numpes; i++ )
1654     CmiSyncSendFn(i,size,msg) ;
1655 #endif
1656
1657   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1658 }
1659
1660 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1661 {
1662   int i ;
1663
1664   for ( i=1; i<_Cmi_numpes; i++ )
1665     CmiAsyncSendFn(i,size,msg) ;
1666
1667   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1668
1669   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1670 }
1671
1672 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1673 {
1674 #if CMK_SMP     
1675   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1676   CMI_DEST_RANK(msg) = CmiMyRank();
1677 #endif
1678
1679 #if CMK_BROADCAST_SPANNING_TREE
1680   CmiState cs = CmiGetState();
1681   CmiSyncSendFn(cs->pe, size,msg) ;
1682   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1683   SendSpanningChildren(size, msg);
1684
1685 #elif CMK_BROADCAST_HYPERCUBE
1686   CmiState cs = CmiGetState();
1687   CmiSyncSendFn(cs->pe, size,msg) ;
1688   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1689   SendHypercube(size, msg);
1690
1691 #else
1692   int i ;
1693
1694   for ( i=0; i<_Cmi_numpes; i++ )
1695     CmiSyncSendFn(i,size,msg) ;
1696 #endif
1697   CmiFree(msg) ;
1698   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1699 }
1700
1701 #if CMK_NODE_QUEUE_AVAILABLE
1702
1703 static void CmiSendNodeSelf(char *msg)
1704 {
1705 #if CMK_IMMEDIATE_MSG
1706 #if 0
1707     if (CmiIsImmediate(msg) && !_immRunning) {
1708       /*CmiHandleImmediateMessage(msg); */
1709       CmiPushImmediateMsg(msg);
1710       CmiHandleImmediate();
1711       return;
1712     }
1713 #endif
1714     if (CmiIsImmediate(msg))
1715     {
1716       CmiPushImmediateMsg(msg);
1717       if (!_immRunning) CmiHandleImmediate();
1718       return;
1719     }
1720 #endif
1721     CQdCreate(CpvAccess(cQdState), 1);
1722     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1723     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1724     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1725 }
1726
1727 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1728 {
1729   int i;
1730   SMSG_LIST *msg_tmp;
1731   char *dupmsg;
1732
1733   CMI_SET_BROADCAST_ROOT(msg, 0);
1734   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1735   switch (dstNode) {
1736   case NODE_BROADCAST_ALL:
1737     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1738   case NODE_BROADCAST_OTHERS:
1739     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1740     for (i=0; i<_Cmi_numnodes; i++)
1741       if (i!=_Cmi_mynode) {
1742         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1743       }
1744     break;
1745   default:
1746     dupmsg = (char *)CmiCopyMsg(msg,size);
1747     if(dstNode == _Cmi_mynode) {
1748       CmiSendNodeSelf(dupmsg);
1749     }
1750     else {
1751       CQdCreate(CpvAccess(cQdState), 1);
1752       EnqueueMsg(dupmsg, size, dstNode);
1753     }
1754   }
1755   return 0;
1756 }
1757
1758 void CmiSyncNodeSendFn(int p, int s, char *m)
1759 {
1760   CmiAsyncNodeSendFn(p, s, m);
1761 }
1762
1763 /* need */
1764 void CmiFreeNodeSendFn(int p, int s, char *m)
1765 {
1766   CmiAsyncNodeSendFn(p, s, m);
1767   CmiFree(m);
1768 }
1769
1770 /* need */
1771 void CmiSyncNodeBroadcastFn(int s, char *m)
1772 {
1773   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1774 }
1775
1776 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1777 {
1778 }
1779
1780 /* need */
1781 void CmiFreeNodeBroadcastFn(int s, char *m)
1782 {
1783   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1784   CmiFree(m);
1785 }
1786
1787 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1788 {
1789   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1790 }
1791
1792 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1793 {
1794   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1795 }
1796
1797 /* need */
1798 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1799 {
1800   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1801   CmiFree(m);
1802 }
1803 #endif
1804
1805 /************************** MAIN ***********************************/
1806 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1807
1808 void ConverseExit(void)
1809 {
1810 #if ! CMK_SMP
1811   while(!CmiAllAsyncMsgsSent()) {
1812     PumpMsgs();
1813     CmiReleaseSentMessages();
1814   }
1815   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1816     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1817
1818   ConverseCommonExit();
1819 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1820   if (CmiMyPe() == 0){
1821     CmiPrintf("End of program\n");
1822 #if MPI_POST_RECV_COUNT > 0
1823     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1824 #endif
1825 }
1826 #endif
1827 #if ! CMK_AUTOBUILD
1828   signal(SIGINT, signal_int);
1829   MPI_Finalize();
1830 #endif
1831   exit(0);
1832
1833 #else
1834     /* SMP version, communication thread will exit */
1835   ConverseCommonExit();
1836   /* atomic increment */
1837   CmiLock(exitLock);
1838   inexit++;
1839   CmiUnlock(exitLock);
1840   while (1) CmiYield();
1841 #endif
1842 }
1843
1844 static void registerMPITraceEvents() {
1845 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1846     traceRegisterUserEvent("MPI_Barrier", 10);
1847     traceRegisterUserEvent("MPI_Send", 20);
1848     traceRegisterUserEvent("MPI_Recv", 30);
1849     traceRegisterUserEvent("MPI_Isend", 40);
1850     traceRegisterUserEvent("MPI_Irecv", 50);
1851     traceRegisterUserEvent("MPI_Test", 60);
1852     traceRegisterUserEvent("MPI_Iprobe", 70);
1853 #endif
1854 }
1855
1856
1857 static char     **Cmi_argv;
1858 static char     **Cmi_argvcopy;
1859 static CmiStartFn Cmi_startfn;   /* The start function */
1860 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1861
1862 typedef struct {
1863   int sleepMs; /*Milliseconds to sleep while idle*/
1864   int nIdles; /*Number of times we've been idle in a row*/
1865   CmiState cs; /*Machine state*/
1866 } CmiIdleState;
1867
1868 static CmiIdleState *CmiNotifyGetState(void)
1869 {
1870   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1871   s->sleepMs=0;
1872   s->nIdles=0;
1873   s->cs=CmiGetState();
1874   return s;
1875 }
1876
1877 static void CmiNotifyBeginIdle(CmiIdleState *s)
1878 {
1879   s->sleepMs=0;
1880   s->nIdles=0;
1881 }
1882
1883 static void CmiNotifyStillIdle(CmiIdleState *s)
1884 {
1885 #if ! CMK_SMP
1886   CmiReleaseSentMessages();
1887   PumpMsgs();
1888 #else
1889 /*  CmiYield();  */
1890 #endif
1891
1892 #if 1
1893   {
1894   int nSpins=20; /*Number of times to spin before sleeping*/
1895   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1896   s->nIdles++;
1897   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1898     s->sleepMs+=2;
1899     if (s->sleepMs>10) s->sleepMs=10;
1900   }
1901   /*Comm. thread will listen on sockets-- just sleep*/
1902   if (s->sleepMs>0) {
1903     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1904     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1905     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1906   }
1907   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1908   }
1909 #endif
1910 }
1911
1912 #if MACHINE_DEBUG_LOG
1913 FILE *debugLog = NULL;
1914 #endif
1915
1916 static int machine_exit_idx;
1917 static void machine_exit(char *m) {
1918   EmergencyExit();
1919   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1920   fflush(stdout);
1921   CmiNodeBarrier();
1922   if (CmiMyRank() == 0) {
1923     MPI_Barrier(MPI_COMM_WORLD);
1924     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1925     MPI_Abort(MPI_COMM_WORLD, 1);
1926   } else {
1927     while (1) CmiYield();
1928   }
1929 }
1930
1931 static void KillOnAllSigs(int sigNo) {
1932   static int already_in_signal_handler = 0;
1933   char *m;
1934   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1935   already_in_signal_handler = 1;
1936 #if CMK_CCS_AVAILABLE
1937   if (CpvAccess(cmiArgDebugFlag)) {
1938     CpdNotify(CPD_SIGNAL, sigNo);
1939     CpdFreeze();
1940   }
1941 #endif
1942   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1943       "Signal: %d\n",CmiMyPe(),sigNo);
1944   CmiPrintStackTrace(1);
1945
1946   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1947   CmiSetHandler(m, machine_exit_idx);
1948   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1949   machine_exit(m);
1950 }
1951
1952 static void ConverseRunPE(int everReturn)
1953 {
1954   CmiIdleState *s=CmiNotifyGetState();
1955   CmiState cs;
1956   char** CmiMyArgv;
1957
1958   CmiNodeAllBarrier();
1959
1960   cs = CmiGetState();
1961   CpvInitialize(void *,CmiLocalQueue);
1962   CpvAccess(CmiLocalQueue) = cs->localqueue;
1963
1964   if (CmiMyRank())
1965     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1966   else
1967     CmiMyArgv=Cmi_argv;
1968
1969   CthInit(CmiMyArgv);
1970
1971   ConverseCommonInit(CmiMyArgv);
1972   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1973
1974 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1975   CpvInitialize(double, projTraceStart);
1976   /* only PE 0 needs to care about registration (to generate sts file). */
1977   if (CmiMyPe() == 0) {
1978     registerMachineUserEventsFunction(&registerMPITraceEvents);
1979   }
1980 #endif
1981
1982   /* initialize the network progress counter*/
1983   /* Network progress function is used to poll the network when for
1984      messages. This flushes receive buffers on some  implementations*/
1985   CpvInitialize(int , networkProgressCount);
1986   CpvAccess(networkProgressCount) = 0;
1987
1988 #if CMK_SMP
1989   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1990   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1991 #else
1992   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1993 #endif
1994
1995 #if MACHINE_DEBUG_LOG
1996   if (CmiMyRank() == 0) {
1997     char ln[200];
1998     sprintf(ln,"debugLog.%d",CmiMyNode());
1999     debugLog=fopen(ln,"w");
2000   }
2001 #endif
2002
2003   /* Converse initialization finishes, immediate messages can be processed.
2004      node barrier previously should take care of the node synchronization */
2005   _immediateReady = 1;
2006
2007   /* communication thread */
2008   if (CmiMyRank() == CmiMyNodeSize()) {
2009     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2010     while (1) CommunicationServerThread(5);
2011   }
2012   else {  /* worker thread */
2013   if (!everReturn) {
2014     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2015     if (Cmi_usrsched==0) CsdScheduler(-1);
2016     ConverseExit();
2017   }
2018   }
2019 }
2020
2021 static char *thread_level_tostring(int thread_level)
2022 {
2023 #if CMK_MPI_INIT_THREAD
2024   switch (thread_level) {
2025   case MPI_THREAD_SINGLE:
2026       return "MPI_THREAD_SINGLE";
2027   case MPI_THREAD_FUNNELED:
2028       return "MPI_THREAD_FUNNELED";
2029   case MPI_THREAD_SERIALIZED:
2030       return "MPI_THREAD_SERIALIZED";
2031   case MPI_THREAD_MULTIPLE :
2032       return "MPI_THREAD_MULTIPLE ";
2033   default: {
2034       char *str = (char*)malloc(5);
2035       sprintf(str,"%d", thread_level);
2036       return str;
2037       }
2038   }
2039   return  "unknown";
2040 #else
2041   char *str = (char*)malloc(5);
2042   sprintf(str,"%d", thread_level);
2043   return str;
2044 #endif
2045 }
2046
2047 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2048 {
2049   int n,i;
2050   int ver, subver;
2051   int provided;
2052   int thread_level;
2053
2054 #if MACHINE_DEBUG
2055   debugLog=NULL;
2056 #endif
2057 #if CMK_USE_HP_MAIN_FIX
2058 #if FOR_CPLUS
2059   _main(argc,argv);
2060 #endif
2061 #endif
2062
2063 #if CMK_MPI_INIT_THREAD
2064 #if CMK_SMP
2065   thread_level = MPI_THREAD_FUNNELED;
2066 #else
2067   thread_level = MPI_THREAD_SINGLE;
2068 #endif
2069   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2070   _thread_provided = provided;
2071 #else
2072   MPI_Init(&argc, &argv);
2073   thread_level = 0;
2074   provided = -1;
2075 #endif
2076   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2077   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2078
2079   MPI_Get_version(&ver, &subver);
2080   if (_Cmi_mynode == 0) {
2081     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2082   }
2083
2084   /* processor per node */
2085   _Cmi_mynodesize = 1;
2086   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2087     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2088 #if ! CMK_SMP
2089   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2090     CmiAbort("+ppn cannot be used in non SMP version!\n");
2091 #endif
2092   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2093   if (idleblock && _Cmi_mynode == 0) {
2094     printf("Charm++: Running in idle blocking mode.\n");
2095   }
2096
2097   /* setup signal handlers */
2098   signal(SIGSEGV, KillOnAllSigs);
2099   signal(SIGFPE, KillOnAllSigs);
2100   signal(SIGILL, KillOnAllSigs);
2101   signal_int = signal(SIGINT, KillOnAllSigs);
2102   signal(SIGTERM, KillOnAllSigs);
2103   signal(SIGABRT, KillOnAllSigs);
2104 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2105   signal(SIGQUIT, KillOnAllSigs);
2106   signal(SIGBUS, KillOnAllSigs);
2107 /*#     if CMK_HANDLE_SIGUSR
2108   signal(SIGUSR1, HandleUserSignals);
2109   signal(SIGUSR2, HandleUserSignals);
2110 #     endif*/
2111 #   endif /*UNIX*/
2112   
2113 #if CMK_NO_OUTSTANDING_SENDS
2114   no_outstanding_sends=1;
2115 #endif
2116   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2117     no_outstanding_sends = 1;
2118     if (_Cmi_mynode == 0)
2119       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2120         no_outstanding_sends?"":" not");
2121   }
2122   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2123   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2124   Cmi_argvcopy = CmiCopyArgs(argv);
2125   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2126   /* find dim = log2(numpes), to pretend we are a hypercube */
2127   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2128     Cmi_dim++ ;
2129  /* CmiSpanTreeInit();*/
2130   request_max=MAX_QLEN;
2131   CmiGetArgInt(argv,"+requestmax",&request_max);
2132   /*printf("request max=%d\n", request_max);*/
2133
2134   /* checksum flag */
2135   if (CmiGetArgFlag(argv,"+checksum")) {
2136 #if !CMK_OPTIMIZE
2137     checksum_flag = 1;
2138     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2139 #else
2140     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2141 #endif
2142   }
2143
2144   {
2145   int debug = CmiGetArgFlag(argv,"++debug");
2146   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2147   if (debug || debug_no_pause)
2148   {   /*Pause so user has a chance to start and attach debugger*/
2149 #if CMK_HAS_GETPID
2150     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2151     fflush(stdout);
2152     if (!debug_no_pause)
2153       sleep(15);
2154 #else
2155     printf("++debug ignored.\n");
2156 #endif
2157   }
2158   }
2159
2160 #if MPI_POST_RECV_COUNT > 0
2161
2162   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2163   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2164   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2165   CpvInitialize(char*,CmiPostedRecvBuffers);
2166
2167     /* Post some extra recvs to help out with incoming messages */
2168     /* On some MPIs the messages are unexpected and thus slow */
2169
2170     /* An array of request handles for posted recvs */
2171     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2172
2173     /* An array of buffers for posted recvs */
2174     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2175
2176     /* Post Recvs */
2177     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2178         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2179                     MPI_POST_RECV_SIZE,
2180                     MPI_BYTE,
2181                     MPI_ANY_SOURCE,
2182                     POST_RECV_TAG,
2183                     MPI_COMM_WORLD,
2184                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2185           CmiAbort("MPI_Irecv failed\n");
2186     }
2187
2188 #endif
2189
2190
2191
2192   /* CmiTimerInit(); */
2193
2194 #if 0
2195   CthInit(argv);
2196   ConverseCommonInit(argv);
2197
2198   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2199   if (initret==0) {
2200     fn(CmiGetArgc(argv), argv);
2201     if (usched==0) CsdScheduler(-1);
2202     ConverseExit();
2203   }
2204 #endif
2205
2206   CsvInitialize(CmiNodeState, NodeState);
2207   CmiNodeStateInit(&CsvAccess(NodeState));
2208
2209   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2210
2211   for (i=0; i<_Cmi_mynodesize+1; i++) {
2212 #if MULTI_SENDQUEUE
2213     procState[i].sendMsgBuf = PCQueueCreate();
2214 #endif
2215     procState[i].recvLock = CmiCreateLock();
2216   }
2217 #if CMK_SMP
2218 #if !MULTI_SENDQUEUE
2219   sendMsgBuf = PCQueueCreate();
2220   sendMsgBufLock = CmiCreateLock();
2221 #endif
2222   exitLock = CmiCreateLock();            /* exit count lock */
2223 #endif
2224
2225   /* Network progress function is used to poll the network when for
2226      messages. This flushes receive buffers on some  implementations*/
2227   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2228   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2229
2230   CmiStartThreads(argv);
2231   ConverseRunPE(initret);
2232 }
2233
2234 /***********************************************************************
2235  *
2236  * Abort function:
2237  *
2238  ************************************************************************/
2239
2240 void CmiAbort(const char *message)
2241 {
2242   char *m;
2243   /* if CharmDebug is attached simply try to send a message to it */
2244 #if CMK_CCS_AVAILABLE
2245   if (CpvAccess(cmiArgDebugFlag)) {
2246     CpdNotify(CPD_ABORT, message);
2247     CpdFreeze();
2248   }
2249 #endif  
2250   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2251         "Reason: %s\n",CmiMyPe(),message);
2252  /*  CmiError(message); */
2253   CmiPrintStackTrace(0);
2254   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2255   CmiSetHandler(m, machine_exit_idx);
2256   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2257   machine_exit(m);
2258   /* Program never reaches here */
2259   MPI_Abort(MPI_COMM_WORLD, 1);
2260 }
2261
2262
2263 #if 0
2264
2265 /* ****************************************************************** */
2266 /*    The following internal functions implement recd msg queue       */
2267 /* ****************************************************************** */
2268
2269 static void ** AllocBlock(unsigned int len)
2270 {
2271   void ** blk;
2272
2273   blk=(void **)CmiAlloc(len*sizeof(void *));
2274   if(blk==(void **)0) {
2275     CmiError("Cannot Allocate Memory!\n");
2276     MPI_Abort(MPI_COMM_WORLD, 1);
2277   }
2278   return blk;
2279 }
2280
2281 static void
2282 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2283 {
2284   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2285   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2286 }
2287
2288 void recdQueueInit(void)
2289 {
2290   recdQueue_blk = AllocBlock(BLK_LEN);
2291   recdQueue_blk_len = BLK_LEN;
2292   recdQueue_first = 0;
2293   recdQueue_len = 0;
2294 }
2295
2296 void recdQueueAddToBack(void *element)
2297 {
2298 #if NODE_0_IS_CONVHOST
2299   inside_comm = 1;
2300 #endif
2301   if(recdQueue_len==recdQueue_blk_len) {
2302     void **blk;
2303     recdQueue_blk_len *= 3;
2304     blk = AllocBlock(recdQueue_blk_len);
2305     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2306     CmiFree(recdQueue_blk);
2307     recdQueue_blk = blk;
2308     recdQueue_first = 0;
2309   }
2310   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2311 #if NODE_0_IS_CONVHOST
2312   inside_comm = 0;
2313 #endif
2314 }
2315
2316
2317 void * recdQueueRemoveFromFront(void)
2318 {
2319   if(recdQueue_len) {
2320     void *element;
2321     element = recdQueue_blk[recdQueue_first++];
2322     recdQueue_first %= recdQueue_blk_len;
2323     recdQueue_len--;
2324     return element;
2325   }
2326   return 0;
2327 }
2328
2329 #endif
2330
2331 /*@}*/