enable dynamic cap in the non-SMP mode.
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #define CMI_DYNAMIC_EXERT_CAP 0
51 /* This macro defines the max number of msgs in the sender msg buffer 
52  * that is allowed for recving operation to continue
53  */
54 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
55 #define CMI_DYNAMIC_MAXCAPSIZE 1000
56 #define CMI_DYNAMIC_SEND_CAPSIZE 4
57 #define CMI_DYNAMIC_RECV_CAPSIZE 3
58 /* initial values, -1 indiates there's no cap */
59 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
60 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 3
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70
71 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
72 #define CMI_MPI_TRACE_MOREDETAILED 0
73 #undef CMI_MPI_TRACE_USEREVENTS
74 #define CMI_MPI_TRACE_USEREVENTS 1
75 #else
76 #undef CMK_SMP_TRACE_COMMTHREAD
77 #define CMK_SMP_TRACE_COMMTHREAD 0
78 #endif
79
80 #define CMK_TRACE_COMMOVERHEAD 0
81 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_TRACE_COMMOVERHEAD
86 #define CMK_TRACE_COMMOVERHEAD 0
87 #endif
88
89 #include "machine.h"
90
91 #include "pcqueue.h"
92
93 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
94
95 #if CMK_BLUEGENEL
96 #define MAX_QLEN 8
97 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
98 #else
99 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
100 #define MAX_QLEN 200
101 #endif
102
103 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
104 CpvStaticDeclare(double, projTraceStart);
105 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
106 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
107 #else
108 # define  START_EVENT()
109 # define  END_EVENT(x)
110 #endif
111
112 /*
113     To reduce the buffer used in broadcast and distribute the load from
114   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
115   spanning tree broadcast algorithm.
116     This will use the fourth short in message as an indicator of spanning tree
117   root.
118 */
119 #define CMK_BROADCAST_SPANNING_TREE    1
120 #define CMK_BROADCAST_HYPERCUBE        0
121
122 #define BROADCAST_SPANNING_FACTOR      4
123 /* The number of children used when a msg is broadcast inside a node */
124 #define BROADCAST_SPANNING_INTRA_FACTOR      8
125
126 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
127 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
128
129 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
130 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
131
132 /* FIXME: need a random number that everyone agrees ! */
133 #define CHARM_MAGIC_NUMBER               126
134
135 #if CMK_ERROR_CHECKING
136 static int checksum_flag = 0;
137 #define CMI_SET_CHECKSUM(msg, len)      \
138         if (checksum_flag)  {   \
139           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
140           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
141         }
142 #define CMI_CHECK_CHECKSUM(msg, len)    \
143         if (checksum_flag)      \
144           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
145             CmiAbort("Fatal error: checksum doesn't agree!\n");
146 #else
147 #define CMI_SET_CHECKSUM(msg, len)
148 #define CMI_CHECK_CHECKSUM(msg, len)
149 #endif
150
151 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
152 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
153 #else
154 #define CMI_SET_BROADCAST_ROOT(msg, root) 
155 #endif
156
157
158 /** 
159     If MPI_POST_RECV is defined, we provide default values for size 
160     and number of posted recieves. If MPI_POST_RECV_COUNT is set
161     then a default value for MPI_POST_RECV_SIZE is used if not specified
162     by the user.
163 */
164 #ifdef MPI_POST_RECV
165 #define MPI_POST_RECV_COUNT 10
166 #undef MPI_POST_RECV
167 #endif
168 #if MPI_POST_RECV_COUNT > 0
169 #warning "Using MPI posted receives which have not yet been tested"
170 #ifndef MPI_POST_RECV_SIZE
171 #define MPI_POST_RECV_SIZE 200
172 #endif
173 /* #undef  MPI_POST_RECV_DEBUG  */
174 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
175 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
176 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
177 CpvDeclare(char*,CmiPostedRecvBuffers);
178 #endif
179
180 /*
181  to avoid MPI's in order delivery, changing MPI Tag all the time
182 */
183 #define TAG     1375
184
185 #if MPI_POST_RECV_COUNT > 0
186 #define POST_RECV_TAG TAG+1
187 #define BARRIER_ZERO_TAG TAG
188 #else
189 #define BARRIER_ZERO_TAG     1375
190 #endif
191
192 #include <signal.h>
193 void (*signal_int)(int);
194
195 /*
196 static int mpi_tag = TAG;
197 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
198 */
199
200 static int        _thread_provided = -1;
201 int               _Cmi_numpes;
202 int               _Cmi_mynode;    /* Which address space am I */
203 int               _Cmi_mynodesize;/* Number of processors in my address space */
204 int               _Cmi_numnodes;  /* Total number of address spaces */
205 int               _Cmi_numpes;    /* Total number of processors */
206 static int        Cmi_nodestart; /* First processor in this address space */
207 CpvDeclare(void*, CmiLocalQueue);
208
209 /*Network progress utility variables. Period controls the rate at
210   which the network poll is called */
211 CpvDeclare(unsigned , networkProgressCount);
212 int networkProgressPeriod;
213
214 int               idleblock = 0;
215
216 #define BLK_LEN  512
217
218 #if CMK_NODE_QUEUE_AVAILABLE
219 #define DGRAM_NODEMESSAGE   (0xFB)
220
221 #define NODE_BROADCAST_OTHERS (-1)
222 #define NODE_BROADCAST_ALL    (-2)
223 #endif
224
225 #if 0
226 static void **recdQueue_blk;
227 static unsigned int recdQueue_blk_len;
228 static unsigned int recdQueue_first;
229 static unsigned int recdQueue_len;
230 static void recdQueueInit(void);
231 static void recdQueueAddToBack(void *element);
232 static void *recdQueueRemoveFromFront(void);
233 #endif
234
235 static void ConverseRunPE(int everReturn);
236 static void CommunicationServer(int sleepTime);
237 static void CommunicationServerThread(int sleepTime);
238
239 typedef struct msg_list {
240      char *msg;
241      struct msg_list *next;
242      int size, destpe;
243
244 #if CMK_SMP_TRACE_COMMTHREAD
245         int srcpe;
246 #endif  
247         
248      MPI_Request req;
249 } SMSG_LIST;
250
251 int MsgQueueLen=0;
252 static int request_max;
253
254 static SMSG_LIST *sent_msgs=0;
255 static SMSG_LIST *end_sent=0;
256
257 static int Cmi_dim;
258
259 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
260
261 #if NODE_0_IS_CONVHOST
262 int inside_comm = 0;
263 #endif
264
265 void CmiAbort(const char *message);
266 static void PerrorExit(const char *msg);
267
268 void SendSpanningChildren(int size, char *msg);
269 void SendHypercube(int size, char *msg);
270
271 static void PerrorExit(const char *msg)
272 {
273   perror(msg);
274   exit(1);
275 }
276
277 extern unsigned char computeCheckSum(unsigned char *data, int len);
278
279 /**************************  TIMER FUNCTIONS **************************/
280
281 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
282
283 /* MPI calls are not threadsafe, even the timer on some machines */
284 static CmiNodeLock  timerLock = 0;
285 static int _absoluteTime = 0;
286 static double starttimer = 0;
287 static int _is_global = 0;
288
289 int CmiTimerIsSynchronized()
290 {
291   int  flag;
292   void *v;
293
294   /*  check if it using synchronized timer */
295   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
296     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
297   if (flag) {
298     _is_global = *(int*)v;
299     if (_is_global && CmiMyPe() == 0)
300       printf("Charm++> MPI timer is synchronized\n");
301   }
302   return _is_global;
303 }
304
305 int CmiTimerAbsolute()
306 {       
307   return _absoluteTime;
308 }
309
310 double CmiStartTimer()
311 {
312   return 0.0;
313 }
314
315 double CmiInitTime()
316 {
317   return starttimer;
318 }
319
320 void CmiTimerInit(char **argv)
321 {
322   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
323   if (_absoluteTime && CmiMyPe() == 0)
324       printf("Charm++> absolute MPI timer is used\n");
325
326   _is_global = CmiTimerIsSynchronized();
327
328   if (_is_global) {
329     if (CmiMyRank() == 0) {
330       double minTimer;
331 #if CMK_TIMER_USE_XT3_DCLOCK
332       starttimer = dclock();
333 #else
334       starttimer = MPI_Wtime();
335 #endif
336
337       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
338                                   MPI_COMM_WORLD );
339       starttimer = minTimer;
340     }
341   }
342   else {  /* we don't have a synchronous timer, set our own start time */
343     CmiBarrier();
344     CmiBarrier();
345     CmiBarrier();
346 #if CMK_TIMER_USE_XT3_DCLOCK
347     starttimer = dclock();
348 #else
349     starttimer = MPI_Wtime();
350 #endif
351   }
352
353 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
354   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
355     timerLock = CmiCreateLock();
356 #endif
357   CmiNodeAllBarrier();          /* for smp */
358 }
359
360 /**
361  * Since the timerLock is never created, and is
362  * always NULL, then all the if-condition inside
363  * the timer functions could be disabled right
364  * now in the case of SMP. --Chao Mei
365  */
366 double CmiTimer(void)
367 {
368   double t;
369 #if 0 && CMK_SMP
370   if (timerLock) CmiLock(timerLock);
371 #endif
372
373 #if CMK_TIMER_USE_XT3_DCLOCK
374   t = dclock();
375 #else
376   t = MPI_Wtime();
377 #endif
378
379 #if 0 && CMK_SMP
380   if (timerLock) CmiUnlock(timerLock);
381 #endif
382
383   return _absoluteTime?t: (t-starttimer);
384 }
385
386 double CmiWallTimer(void)
387 {
388   double t;
389 #if 0 && CMK_SMP
390   if (timerLock) CmiLock(timerLock);
391 #endif
392
393 #if CMK_TIMER_USE_XT3_DCLOCK
394   t = dclock();
395 #else
396   t = MPI_Wtime();
397 #endif
398
399 #if 0 && CMK_SMP
400   if (timerLock) CmiUnlock(timerLock);
401 #endif
402
403   return _absoluteTime? t: (t-starttimer);
404 }
405
406 double CmiCpuTimer(void)
407 {
408   double t;
409 #if 0 && CMK_SMP
410   if (timerLock) CmiLock(timerLock);
411 #endif
412 #if CMK_TIMER_USE_XT3_DCLOCK
413   t = dclock() - starttimer;
414 #else
415   t = MPI_Wtime() - starttimer;
416 #endif
417 #if 0 && CMK_SMP
418   if (timerLock) CmiUnlock(timerLock);
419 #endif
420   return t;
421 }
422
423 #endif
424
425 /* must be called on all ranks including comm thread in SMP */
426 int CmiBarrier()
427 {
428 #if CMK_SMP
429     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
430   CmiNodeAllBarrier();
431   if (CmiMyRank() == CmiMyNodeSize()) 
432 #else
433   if (CmiMyRank() == 0) 
434 #endif
435   {
436 /**
437  *  The call of CmiBarrier is usually before the initialization
438  *  of trace module of Charm++, therefore, the START_EVENT
439  *  and END_EVENT are disabled here. -Chao Mei
440  */     
441     /*START_EVENT();*/
442
443     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
444         CmiAbort("Timernit: MPI_Barrier failed!\n");
445
446     /*END_EVENT(10);*/
447   }
448   CmiNodeAllBarrier();
449   return 0;
450 }
451
452 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
453 int CmiBarrierZero()
454 {
455   int i;
456 #if CMK_SMP
457   if (CmiMyRank() == CmiMyNodeSize()) 
458 #else
459   if (CmiMyRank() == 0) 
460 #endif
461   {
462     char msg[1];
463     MPI_Status sts;
464     if (CmiMyNode() == 0)  {
465       for (i=0; i<CmiNumNodes()-1; i++) {
466          START_EVENT();
467
468          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
469             CmiPrintf("MPI_Recv failed!\n");
470
471          END_EVENT(30);
472       }
473     }
474     else {
475       START_EVENT();
476
477       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
478          printf("MPI_Send failed!\n");
479
480       END_EVENT(20);
481     }
482   }
483   CmiNodeAllBarrier();
484   return 0;
485 }
486
487 typedef struct ProcState {
488 #if MULTI_SENDQUEUE
489 PCQueue      sendMsgBuf;       /* per processor message sending queue */
490 #endif
491 CmiNodeLock  recvLock;              /* for cs->recv */
492 } ProcState;
493
494 static ProcState  *procState;
495
496 #if CMK_SMP
497
498 #if !MULTI_SENDQUEUE
499 static PCQueue sendMsgBuf;
500 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
501 #endif
502
503 #endif
504
505 /************************************************************
506  *
507  * Processor state structure
508  *
509  ************************************************************/
510
511 /* fake Cmi_charmrun_fd */
512 static int Cmi_charmrun_fd = 0;
513 #include "machine-smp.c"
514
515 CsvDeclare(CmiNodeState, NodeState);
516
517 #include "immediate.c"
518
519 #if CMK_SHARED_VARS_UNAVAILABLE
520 /************ non SMP **************/
521 static struct CmiStateStruct Cmi_state;
522 int _Cmi_mype;
523 int _Cmi_myrank;
524
525 void CmiMemLock() {}
526 void CmiMemUnlock() {}
527
528 #define CmiGetState() (&Cmi_state)
529 #define CmiGetStateN(n) (&Cmi_state)
530
531 void CmiYield(void) { sleep(0); }
532
533 static void CmiStartThreads(char **argv)
534 {
535   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
536   _Cmi_mype = Cmi_nodestart;
537   _Cmi_myrank = 0;
538 }
539 #endif  /* non smp */
540
541 /*Add a message to this processor's receive queue, pe is a rank */
542 void CmiPushPE(int pe,void *msg)
543 {
544   CmiState cs = CmiGetStateN(pe);
545   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
546 #if CMK_IMMEDIATE_MSG
547   if (CmiIsImmediate(msg)) {
548 /*
549 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
550     CmiHandleMessage(msg);
551 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
552 */
553     /**(CmiUInt2 *)msg = pe;*/
554     CMI_DEST_RANK(msg) = pe;
555     CmiPushImmediateMsg(msg);
556     return;
557   }
558 #endif
559
560 #if CMK_SMP
561   CmiLock(procState[pe].recvLock);
562 #endif
563   PCQueuePush(cs->recv,msg);
564 #if CMK_SMP
565   CmiUnlock(procState[pe].recvLock);
566 #endif
567   CmiIdleLock_addMessage(&cs->idle);
568   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
569 }
570
571 #if CMK_NODE_QUEUE_AVAILABLE
572 /*Add a message to this processor's receive queue */
573 static void CmiPushNode(void *msg)
574 {
575   MACHSTATE(3,"Pushing message into NodeRecv queue");
576 #if CMK_IMMEDIATE_MSG
577   if (CmiIsImmediate(msg)) {
578     CMI_DEST_RANK(msg) = 0;
579     CmiPushImmediateMsg(msg);
580     return;
581   }
582 #endif
583   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
584   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
585   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
586   {
587   CmiState cs=CmiGetStateN(0);
588   CmiIdleLock_addMessage(&cs->idle);
589   }
590 }
591 #endif
592
593 #ifndef CmiMyPe
594 int CmiMyPe(void)
595 {
596   return CmiGetState()->pe;
597 }
598 #endif
599
600 #ifndef CmiMyRank
601 int CmiMyRank(void)
602 {
603   return CmiGetState()->rank;
604 }
605 #endif
606
607 #ifndef CmiNodeFirst
608 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
609 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
610 #endif
611
612 #ifndef CmiNodeOf
613 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
614 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
615 #endif
616
617 static size_t CmiAllAsyncMsgsSent(void)
618 {
619    SMSG_LIST *msg_tmp = sent_msgs;
620    MPI_Status sts;
621    int done;
622
623    while(msg_tmp!=0) {
624     done = 0;
625     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
626       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
627     if(!done)
628       return 0;
629     msg_tmp = msg_tmp->next;
630 /*    MsgQueueLen--; ????? */
631    }
632    return 1;
633 }
634
635 int CmiAsyncMsgSent(CmiCommHandle c) {
636
637   SMSG_LIST *msg_tmp = sent_msgs;
638   int done;
639   MPI_Status sts;
640
641   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
642     msg_tmp = msg_tmp->next;
643   if(msg_tmp) {
644     done = 0;
645     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
646       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
647     return ((done)?1:0);
648   } else {
649     return 1;
650   }
651 }
652
653 void CmiReleaseCommHandle(CmiCommHandle c)
654 {
655   return;
656 }
657
658 #if CMK_BLUEGENEL
659 extern void MPID_Progress_test();
660 #endif
661
662 void CmiReleaseSentMessages(void)
663 {
664   SMSG_LIST *msg_tmp=sent_msgs;
665   SMSG_LIST *prev=0;
666   SMSG_LIST *temp;
667   int done;
668   MPI_Status sts;
669
670 #if CMK_BLUEGENEL
671   MPID_Progress_test();
672 #endif
673
674   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
675   while(msg_tmp!=0) {
676     done =0;
677 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
678     double startT = CmiWallTimer();
679 #endif
680     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
681       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
682     if(done) {
683       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
684       MsgQueueLen--;
685       /* Release the message */
686       temp = msg_tmp->next;
687       if(prev==0)  /* first message */
688         sent_msgs = temp;
689       else
690         prev->next = temp;
691       CmiFree(msg_tmp->msg);
692       CmiFree(msg_tmp);
693       msg_tmp = temp;
694     } else {
695       prev = msg_tmp;
696       msg_tmp = msg_tmp->next;
697     }
698 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
699     {
700     double endT = CmiWallTimer();
701     /* only record the event if it takes more than 1ms */
702     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
703     }
704 #endif
705   }
706   end_sent = prev;
707   MACHSTATE(2,"} CmiReleaseSentMessages end");
708 }
709
710 int PumpMsgs(void)
711 {
712   int nbytes, flg, res;
713   char *msg;
714   MPI_Status sts;
715   int recd=0;
716
717 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
718   int recvCnt=0;
719 #endif
720         
721 #if CMK_BLUEGENEL
722   MPID_Progress_test();
723 #endif
724
725   MACHSTATE(2,"PumpMsgs begin {");
726
727 #if CMI_DYNAMIC_EXERT_CAP
728   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
729 #endif
730         
731   while(1) {
732 #if CMI_EXERT_RECV_CAP
733         if(recvCnt==RECV_CAP) break;
734 #elif CMI_DYNAMIC_EXERT_CAP
735         if(recvCnt >= dynamicRecvCap) break;
736 #endif
737           
738     /* First check posted recvs then do  probe unmatched outstanding messages */
739 #if MPI_POST_RECV_COUNT > 0 
740     int completed_index=-1;
741     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
742         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
743     if(flg){
744         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
745             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
746
747         recd = 1;
748         msg = (char *) CmiAlloc(nbytes);
749         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
750         /* and repost the recv */
751
752         START_EVENT();
753
754         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
755             MPI_POST_RECV_SIZE,
756             MPI_BYTE,
757             MPI_ANY_SOURCE,
758             POST_RECV_TAG,
759             MPI_COMM_WORLD,
760             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
761                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
762
763         END_EVENT(50);
764
765         CpvAccess(Cmi_posted_recv_total)++;
766     }
767     else {
768         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
769         if(res != MPI_SUCCESS)
770         CmiAbort("MPI_Iprobe failed\n");
771         if(!flg) break;
772         recd = 1;
773         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
774         msg = (char *) CmiAlloc(nbytes);
775
776         START_EVENT();
777
778         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
779             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
780
781         END_EVENT(30);
782
783         CpvAccess(Cmi_unposted_recv_total)++;
784     }
785 #else
786     /* Original version */
787 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
788   double startT = CmiWallTimer(); 
789 #endif
790     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
791     if(res != MPI_SUCCESS)
792       CmiAbort("MPI_Iprobe failed\n");
793
794     if(!flg) break;
795 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
796     {
797     double endT = CmiWallTimer();
798     /* only trace the probe that last longer than 1ms */
799     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
800     }
801 #endif
802
803     recd = 1;
804     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
805     msg = (char *) CmiAlloc(nbytes);
806     
807     START_EVENT();
808
809     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
810       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
811
812     /*END_EVENT(30);*/
813
814 #endif
815
816 #if CMK_SMP_TRACE_COMMTHREAD
817         traceBeginCommOp(msg);
818         traceChangeLastTimestamp(CpvAccess(projTraceStart));
819         traceEndCommOp(msg);
820         #if CMI_MPI_TRACE_MOREDETAILED
821         char tmp[32];
822         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
823         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
824         #endif
825 #elif CMK_TRACE_COMMOVERHEAD
826         char tmp[32];
827         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
828         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
829 #endif
830         
831         
832     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
833     CMI_CHECK_CHECKSUM(msg, nbytes);
834 #if CMK_ERROR_CHECKING
835     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
836       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
837       CmiFree(msg);
838       CmiAbort("Abort!\n");
839       continue;
840     }
841 #endif
842         
843 #if CMK_BROADCAST_SPANNING_TREE
844     if (CMI_BROADCAST_ROOT(msg))
845       SendSpanningChildren(nbytes, msg);
846 #elif CMK_BROADCAST_HYPERCUBE
847     if (CMI_BROADCAST_ROOT(msg))
848       SendHypercube(nbytes, msg);
849 #endif
850         
851         /* In SMP mode, this push operation needs to be executed
852      * after forwarding broadcast messages. If it is executed
853      * earlier, then during the bcast msg forwarding period,    
854          * the msg could be already freed on the worker thread.
855          * As a result, the forwarded message could be wrong! 
856          * --Chao Mei
857          */
858 #if CMK_NODE_QUEUE_AVAILABLE
859     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
860       CmiPushNode(msg);
861     else
862 #endif
863         CmiPushPE(CMI_DEST_RANK(msg), msg);     
864         
865 #if CMI_EXERT_RECV_CAP
866         recvCnt++;
867 #elif CMI_DYNAMIC_EXERT_CAP
868         recvCnt++;
869 #if CMK_SMP
870         /* check sendMsgBuf to get the  number of messages that have not been sent
871          * which is only available in SMP mode
872          * MsgQueueLen indicates the number of messages that have not been released 
873          * by MPI 
874          */
875         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
876                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
877                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
878         }
879 #else
880         /* MsgQueueLen indicates the number of messages that have not been released 
881          * by MPI 
882          */
883         if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
884                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
885         }
886 #endif
887
888 #endif  
889         
890   }
891
892   
893 #if CMK_IMMEDIATE_MSG && !CMK_SMP
894   CmiHandleImmediate();
895 #endif
896   
897   MACHSTATE(2,"} PumpMsgs end ");
898   return recd;
899 }
900
901 /* blocking version */
902 static void PumpMsgsBlocking(void)
903 {
904   static int maxbytes = 20000000;
905   static char *buf = NULL;
906   int nbytes, flg;
907   MPI_Status sts;
908   char *msg;
909   int recd=0;
910
911   if (!PCQueueEmpty(CmiGetState()->recv)) return;
912   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
913   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
914   if (sent_msgs)  return;
915
916 #if 0
917   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
918 #endif
919
920   if (buf == NULL) {
921     buf = (char *) CmiAlloc(maxbytes);
922     _MEMCHECK(buf);
923   }
924
925
926 #if MPI_POST_RECV_COUNT > 0
927 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
928 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
929 #endif
930
931   START_EVENT();
932
933   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
934       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
935
936   /*END_EVENT(30);*/
937     
938    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
939    msg = (char *) CmiAlloc(nbytes);
940    memcpy(msg, buf, nbytes);
941
942 #if CMK_SMP_TRACE_COMMTHREAD
943         traceBeginCommOp(msg);
944         traceChangeLastTimestamp(CpvAccess(projTraceStart));
945         traceEndCommOp(msg);
946         #if CMI_MPI_TRACE_MOREDETAILED
947         char tmp[32];
948         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
949         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
950         #endif
951 #endif
952
953 #if CMK_BROADCAST_SPANNING_TREE
954    if (CMI_BROADCAST_ROOT(msg))
955       SendSpanningChildren(nbytes, msg);
956 #elif CMK_BROADCAST_HYPERCUBE
957    if (CMI_BROADCAST_ROOT(msg))
958       SendHypercube(nbytes, msg);
959 #endif
960   
961         /* In SMP mode, this push operation needs to be executed
962      * after forwarding broadcast messages. If it is executed
963      * earlier, then during the bcast msg forwarding period,    
964          * the msg could be already freed on the worker thread.
965          * As a result, the forwarded message could be wrong! 
966          * --Chao Mei
967          */  
968 #if CMK_NODE_QUEUE_AVAILABLE
969    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
970       CmiPushNode(msg);
971    else
972 #endif
973       CmiPushPE(CMI_DEST_RANK(msg), msg);
974 }
975
976 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
977
978 #if CMK_SMP
979
980 static int inexit = 0;
981 static CmiNodeLock  exitLock = 0;
982
983 static int MsgQueueEmpty()
984 {
985   int i;
986 #if MULTI_SENDQUEUE
987   for (i=0; i<_Cmi_mynodesize; i++)
988     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
989 #else
990   return PCQueueEmpty(sendMsgBuf);
991 #endif
992   return 1;
993 }
994
995 static int SendMsgBuf();
996
997 /* test if all processors recv queues are empty */
998 static int RecvQueueEmpty()
999 {
1000   int i;
1001   for (i=0; i<_Cmi_mynodesize; i++) {
1002     CmiState cs=CmiGetStateN(i);
1003     if (!PCQueueEmpty(cs->recv)) return 0;
1004   }
1005   return 1;
1006 }
1007
1008 /**
1009 CommunicationServer calls MPI to send messages in the queues and probe message from network.
1010 */
1011
1012 #define REPORT_COMM_METRICS 0
1013 #if REPORT_COMM_METRICS
1014 static double pumptime = 0.0;
1015 static double releasetime = 0.0;
1016 static double sendtime = 0.0;
1017 #endif
1018
1019 static void CommunicationServer(int sleepTime)
1020 {
1021   int static count=0;
1022 /*
1023   count ++;
1024   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1025 */
1026 #if REPORT_COMM_METRICS
1027   double t1, t2, t3, t4;
1028   t1 = CmiWallTimer();
1029 #endif
1030   PumpMsgs();
1031 #if REPORT_COMM_METRICS
1032   t2 = CmiWallTimer();
1033 #endif
1034   CmiReleaseSentMessages();
1035 #if REPORT_COMM_METRICS
1036   t3 = CmiWallTimer();
1037 #endif
1038   SendMsgBuf();
1039 #if REPORT_COMM_METRICS
1040   t4 = CmiWallTimer();
1041   pumptime += (t2-t1);
1042   releasetime += (t3-t2);
1043   sendtime += (t4-t3);
1044 #endif
1045 /*
1046   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1047 */
1048   if (inexit == CmiMyNodeSize()) {
1049     MACHSTATE(2, "CommunicationServer exiting {");
1050 #if 0
1051     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1052 #endif
1053     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1054       CmiReleaseSentMessages();
1055       SendMsgBuf();
1056       PumpMsgs();
1057     }
1058     MACHSTATE(2, "CommunicationServer barrier begin {");
1059
1060     START_EVENT();
1061
1062     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1063       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1064
1065     END_EVENT(10);
1066
1067     MACHSTATE(2, "} CommunicationServer barrier end");
1068 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1069     if (CmiMyNode() == 0){
1070       CmiPrintf("End of program\n");
1071     }
1072 #endif
1073     MACHSTATE(2, "} CommunicationServer EXIT");
1074
1075     ConverseCommonExit();   
1076 #if REPORT_COMM_METRICS
1077     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1078 #endif
1079
1080 #if ! CMK_AUTOBUILD
1081     signal(SIGINT, signal_int);
1082     MPI_Finalize();
1083     #endif
1084     exit(0);
1085   }
1086 }
1087
1088 #endif
1089
1090 static void CommunicationServerThread(int sleepTime)
1091 {
1092 #if CMK_SMP
1093   CommunicationServer(sleepTime);
1094 #endif
1095 #if CMK_IMMEDIATE_MSG
1096   CmiHandleImmediate();
1097 #endif
1098 }
1099
1100 #if CMK_NODE_QUEUE_AVAILABLE
1101 char *CmiGetNonLocalNodeQ(void)
1102 {
1103   CmiState cs = CmiGetState();
1104   char *result = 0;
1105   CmiIdleLock_checkMessage(&cs->idle);
1106 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1107     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1108     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1109     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1110     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1111     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1112 /*  }  */
1113
1114   return result;
1115 }
1116 #endif
1117
1118 void *CmiGetNonLocal(void)
1119 {
1120   static int count=0;
1121   CmiState cs = CmiGetState();
1122   void *msg;
1123
1124 #if ! CMK_SMP
1125   if (CmiNumPes() == 1) return NULL;
1126 #endif
1127
1128   CmiIdleLock_checkMessage(&cs->idle);
1129   /* although it seems that lock is not needed, I found it crashes very often
1130      on mpi-smp without lock */
1131
1132 #if ! CMK_SMP
1133   CmiReleaseSentMessages();
1134   PumpMsgs();
1135 #endif
1136
1137   /* CmiLock(procState[cs->rank].recvLock); */
1138   msg =  PCQueuePop(cs->recv);
1139   /* CmiUnlock(procState[cs->rank].recvLock); */
1140
1141 /*
1142   if (msg) {
1143     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1144   else {
1145     count++;
1146     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1147   }
1148 */
1149 #if ! CMK_SMP
1150   if (no_outstanding_sends) {
1151     while (MsgQueueLen>0) {
1152       CmiReleaseSentMessages();
1153       PumpMsgs();
1154     }
1155   }
1156
1157   if(!msg) {
1158     CmiReleaseSentMessages();
1159     if (PumpMsgs())
1160       return  PCQueuePop(cs->recv);
1161     else
1162       return 0;
1163   }
1164 #endif
1165
1166   return msg;
1167 }
1168
1169 /* called in non-smp mode */
1170 void CmiNotifyIdle(void)
1171 {
1172   CmiReleaseSentMessages();
1173   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1174 }
1175
1176
1177 /********************************************************
1178     The call to probe immediate messages has been renamed to
1179     CmiMachineProgressImpl
1180 ******************************************************/
1181 /* user call to handle immediate message, only useful in non SMP version
1182    using polling method to schedule message.
1183 */
1184 /*
1185 #if CMK_IMMEDIATE_MSG
1186 void CmiProbeImmediateMsg()
1187 {
1188 #if !CMK_SMP
1189   PumpMsgs();
1190   CmiHandleImmediate();
1191 #endif
1192 }
1193 #endif
1194 */
1195
1196 /* Network progress function is used to poll the network when for
1197    messages. This flushes receive buffers on some  implementations*/
1198 #if CMK_MACHINE_PROGRESS_DEFINED
1199 void CmiMachineProgressImpl()
1200 {
1201 #if !CMK_SMP
1202     PumpMsgs();
1203 #if CMK_IMMEDIATE_MSG
1204     CmiHandleImmediate();
1205 #endif
1206 #else
1207     /*Not implemented yet. Communication server does not seem to be
1208       thread safe, so only communication thread call it */
1209     if (CmiMyRank() == CmiMyNodeSize())
1210         CommunicationServerThread(0);
1211 #endif
1212 }
1213 #endif
1214
1215 /********************* MESSAGE SEND FUNCTIONS ******************/
1216
1217 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1218
1219 static void CmiSendSelf(char *msg)
1220 {
1221 #if CMK_IMMEDIATE_MSG
1222     if (CmiIsImmediate(msg)) {
1223       /* CmiBecomeNonImmediate(msg); */
1224       CmiPushImmediateMsg(msg);
1225       CmiHandleImmediate();
1226       return;
1227     }
1228 #endif
1229     CQdCreate(CpvAccess(cQdState), 1);
1230     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1231 }
1232
1233 void CmiSyncSendFn(int destPE, int size, char *msg)
1234 {
1235   CmiState cs = CmiGetState();
1236   char *dupmsg = (char *) CmiAlloc(size);
1237   memcpy(dupmsg, msg, size);
1238
1239   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1240
1241   if (cs->pe==destPE) {
1242     CmiSendSelf(dupmsg);
1243   }
1244   else
1245     CmiAsyncSendFn_(destPE, size, dupmsg);
1246 }
1247
1248 #if CMK_SMP
1249
1250 /* called by communication thread in SMP */
1251 static int SendMsgBuf()
1252 {
1253   SMSG_LIST *msg_tmp;
1254   char *msg;
1255   int node, rank, size;
1256   int i;
1257   int sent = 0;
1258
1259 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1260         int sentCnt = 0;
1261 #endif  
1262         
1263 #if CMI_DYNAMIC_EXERT_CAP
1264         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1265 #endif
1266         
1267   MACHSTATE(2,"SendMsgBuf begin {");
1268 #if MULTI_SENDQUEUE
1269   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1270   {
1271     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1272     {
1273       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1274 #else
1275     /* single message sending queue */
1276     /* CmiLock(sendMsgBufLock); */
1277     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1278     /* CmiUnlock(sendMsgBufLock); */
1279     while (NULL != msg_tmp)
1280     {
1281 #endif
1282       node = msg_tmp->destpe;
1283       size = msg_tmp->size;
1284       msg = msg_tmp->msg;
1285       msg_tmp->next = 0;
1286                 
1287 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1288       while (MsgQueueLen > request_max) {
1289                 CmiReleaseSentMessages();
1290                 PumpMsgs();
1291       }
1292 #endif
1293           
1294       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1295 #if CMK_ERROR_CHECKING
1296       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1297 #endif
1298       CMI_SET_CHECKSUM(msg, size);
1299
1300 #if MPI_POST_RECV_COUNT > 0
1301         if(size <= MPI_POST_RECV_SIZE){
1302
1303           START_EVENT();
1304           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1305                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1306
1307           STOP_EVENT(40);
1308         }
1309         else {
1310             START_EVENT();
1311             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1312                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1313             STOP_EVENT(40);
1314         }
1315 #else
1316         START_EVENT();
1317         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1318             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1319         /*END_EVENT(40);*/
1320 #endif
1321         
1322 #if CMK_SMP_TRACE_COMMTHREAD
1323         traceBeginCommOp(msg);
1324         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1325         /* traceSendMsgComm must execute after traceBeginCommOp because
1326          * we pretend we execute an entry method, and inside this we
1327          * pretend we will send another message. Otherwise how could
1328          * a message creation just before an entry method invocation?
1329          * If such logic is broken, the projections will not trace
1330          * messages correctly! -Chao Mei
1331          */
1332         traceSendMsgComm(msg);
1333         traceEndCommOp(msg);
1334         #if CMI_MPI_TRACE_MOREDETAILED
1335         char tmp[64];
1336         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1337         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1338         #endif
1339 #endif
1340                 
1341                 
1342       MACHSTATE(3,"}MPI_send end");
1343       MsgQueueLen++;
1344       if(sent_msgs==0)
1345         sent_msgs = msg_tmp;
1346       else
1347         end_sent->next = msg_tmp;
1348       end_sent = msg_tmp;
1349       sent=1;
1350           
1351 #if CMI_EXERT_SEND_CAP    
1352           if(++sentCnt == SEND_CAP) break;
1353 #elif CMI_DYNAMIC_EXERT_CAP
1354           if(++sentCnt >= dynamicSendCap) break;
1355           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1356                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1357 #endif    
1358           
1359 #if ! MULTI_SENDQUEUE
1360       /* CmiLock(sendMsgBufLock); */
1361       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1362       /* CmiUnlock(sendMsgBufLock); */
1363 #endif
1364     }
1365 #if MULTI_SENDQUEUE
1366   }
1367 #endif
1368   MACHSTATE(2,"}SendMsgBuf end ");
1369   return sent;
1370 }
1371
1372 void EnqueueMsg(void *m, int size, int node)
1373 {
1374   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1375   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1376   msg_tmp->msg = m;
1377   msg_tmp->size = size;
1378   msg_tmp->destpe = node;
1379         
1380 #if CMK_SMP_TRACE_COMMTHREAD
1381         msg_tmp->srcpe = CmiMyPe();
1382 #endif  
1383
1384 #if MULTI_SENDQUEUE
1385   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1386 #else
1387   CmiLock(sendMsgBufLock);
1388   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1389   CmiUnlock(sendMsgBufLock);
1390 #endif
1391         
1392   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1393 }
1394
1395 #endif
1396
1397 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1398 {
1399   CmiState cs = CmiGetState();
1400   SMSG_LIST *msg_tmp;
1401   CmiUInt2  rank, node;
1402
1403   if(destPE == cs->pe) {
1404     char *dupmsg = (char *) CmiAlloc(size);
1405     memcpy(dupmsg, msg, size);
1406     CmiSendSelf(dupmsg);
1407     return 0;
1408   }
1409   CQdCreate(CpvAccess(cQdState), 1);
1410 #if CMK_SMP
1411   node = CmiNodeOf(destPE);
1412   rank = CmiRankOf(destPE);
1413   if (node == CmiMyNode())  {
1414     CmiPushPE(rank, msg);
1415     return 0;
1416   }
1417   CMI_DEST_RANK(msg) = rank;
1418   EnqueueMsg(msg, size, node);
1419   return 0;
1420 #else
1421   /* non smp */
1422   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1423   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1424   msg_tmp->msg = msg;
1425   msg_tmp->next = 0;
1426   while (MsgQueueLen > request_max) {
1427         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1428         CmiReleaseSentMessages();
1429         PumpMsgs();
1430   }
1431 #if CMK_ERROR_CHECKING
1432   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1433 #endif
1434   CMI_SET_CHECKSUM(msg, size);
1435
1436 #if MPI_POST_RECV_COUNT > 0
1437         if(size <= MPI_POST_RECV_SIZE){
1438
1439           START_EVENT();
1440           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1441                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1442           END_EVENT(40);
1443         }
1444         else {
1445           START_EVENT();
1446           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1447                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1448           END_EVENT(40);
1449         }
1450 #else
1451   START_EVENT();
1452   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1453     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1454   /*END_EVENT(40);*/
1455   #if CMK_TRACE_COMMOVERHEAD
1456         char tmp[64];
1457         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1458         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1459   #endif
1460 #endif
1461
1462   MsgQueueLen++;
1463   if(sent_msgs==0)
1464     sent_msgs = msg_tmp;
1465   else
1466     end_sent->next = msg_tmp;
1467   end_sent = msg_tmp;
1468   return (CmiCommHandle) &(msg_tmp->req);
1469 #endif              /* non-smp */
1470 }
1471
1472 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1473 {
1474   CMI_SET_BROADCAST_ROOT(msg, 0);
1475   CmiAsyncSendFn_(destPE, size, msg);
1476 }
1477
1478 void CmiFreeSendFn(int destPE, int size, char *msg)
1479 {
1480   CmiState cs = CmiGetState();
1481   CMI_SET_BROADCAST_ROOT(msg, 0);
1482
1483   if (cs->pe==destPE) {
1484     CmiSendSelf(msg);
1485   } else {
1486     CmiAsyncSendFn_(destPE, size, msg);
1487   }
1488 }
1489
1490 /*********************** BROADCAST FUNCTIONS **********************/
1491
1492 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1493 void CmiSyncSendFn1(int destPE, int size, char *msg)
1494 {
1495   CmiState cs = CmiGetState();
1496   char *dupmsg = (char *) CmiAlloc(size);
1497   memcpy(dupmsg, msg, size);
1498   if (cs->pe==destPE)
1499     CmiSendSelf(dupmsg);
1500   else
1501     CmiAsyncSendFn_(destPE, size, dupmsg);
1502 }
1503
1504 /* send msg to its spanning children in broadcast. G. Zheng */
1505 void SendSpanningChildren(int size, char *msg)
1506 {
1507   CmiState cs = CmiGetState();
1508   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1509   int startnode = CmiNodeOf(startpe);
1510   int i, exceptRank;
1511         
1512    /* first send msgs to other nodes */
1513   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1514   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1515     int nd = CmiMyNode()-startnode;
1516     if (nd<0) nd+=CmiNumNodes();
1517     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1518     if (nd > CmiNumNodes() - 1) break;
1519     nd += startnode;
1520     nd = nd%CmiNumNodes();
1521     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1522 #if CMK_SMP
1523       /* always send to the first rank of other nodes */
1524     char *newmsg = CmiCopyMsg(msg, size);
1525     CMI_DEST_RANK(newmsg) = 0;
1526     EnqueueMsg(newmsg, size, nd);
1527 #else
1528     CmiSyncSendFn1(nd, size, msg);
1529 #endif
1530   }
1531 #if CMK_SMP  
1532    /* second send msgs to my peers on this node */
1533   /* FIXME: now it's just a flat p2p send!! When node size is large,
1534    * it should also be sent in a tree
1535    */
1536    exceptRank = CMI_DEST_RANK(msg);
1537    for(i=0; i<exceptRank; i++){
1538            CmiPushPE(i, CmiCopyMsg(msg, size));
1539    }
1540    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1541            CmiPushPE(i, CmiCopyMsg(msg, size));
1542    }
1543 #endif
1544 }
1545
1546 #include <math.h>
1547
1548 /* send msg along the hypercube in broadcast. (Sameer) */
1549 void SendHypercube(int size, char *msg)
1550 {
1551   CmiState cs = CmiGetState();
1552   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1553   int startnode = CmiNodeOf(startpe);
1554   int i, exceptRank, cnt, tmp, relPE;
1555   int dims=0;
1556
1557   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1558   tmp = CmiNumNodes()-1;
1559   while(tmp>0){
1560           dims++;
1561           tmp = tmp >> 1;
1562   }
1563   if(CmiNumNodes()==1) dims=1;
1564   
1565    /* first send msgs to other nodes */  
1566   relPE = CmiMyNode()-startnode;
1567   if(relPE < 0) relPE += CmiNumNodes();
1568   cnt=0;
1569   tmp = relPE;
1570   /* count how many zeros (in binary format) relPE has */
1571   for(i=0; i<dims; i++, cnt++){
1572     if(tmp & 1 == 1) break;
1573     tmp = tmp >> 1;
1574   }
1575   
1576   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1577   for (i = cnt-1; i >= 0; i--) {
1578     int nd = relPE + (1 << i);
1579         if(nd >= CmiNumNodes()) continue;
1580         nd = (nd+startnode)%CmiNumNodes();
1581         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1582 #if CMK_SMP
1583     /* always send to the first rank of other nodes */
1584     char *newmsg = CmiCopyMsg(msg, size);
1585     CMI_DEST_RANK(newmsg) = 0;
1586     EnqueueMsg(newmsg, size, nd);
1587 #else
1588         CmiSyncSendFn1(nd, size, msg);
1589 #endif
1590   }
1591   
1592 #if CMK_SMP
1593    /* second send msgs to my peers on this node */
1594    /* FIXME: now it's just a flat p2p send!! When node size is large,
1595     * it should also be sent in a tree
1596     */
1597    exceptRank = CMI_DEST_RANK(msg);
1598    for(i=0; i<exceptRank; i++){
1599            CmiPushPE(i, CmiCopyMsg(msg, size));
1600    }
1601    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1602            CmiPushPE(i, CmiCopyMsg(msg, size));
1603    }
1604 #endif
1605 }
1606
1607 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1608 {
1609   CmiState cs = CmiGetState();
1610
1611 #if CMK_SMP     
1612   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1613   CMI_DEST_RANK(msg) = CmiMyRank();
1614 #endif
1615         
1616 #if CMK_BROADCAST_SPANNING_TREE
1617   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1618   SendSpanningChildren(size, msg);
1619
1620 #elif CMK_BROADCAST_HYPERCUBE
1621   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1622   SendHypercube(size, msg);
1623
1624 #else
1625   int i;
1626
1627   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1628     CmiSyncSendFn(i, size,msg) ;
1629   for ( i=0; i<cs->pe; i++ )
1630     CmiSyncSendFn(i, size,msg) ;
1631 #endif
1632
1633   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1634 }
1635
1636
1637 /*  FIXME: luckily async is never used  G. Zheng */
1638 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1639 {
1640   CmiState cs = CmiGetState();
1641   int i ;
1642
1643   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1644     CmiAsyncSendFn(i,size,msg) ;
1645   for ( i=0; i<cs->pe; i++ )
1646     CmiAsyncSendFn(i,size,msg) ;
1647
1648   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1649   CmiAbort("CmiAsyncBroadcastFn should never be called");
1650   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1651 }
1652
1653 void CmiFreeBroadcastFn(int size, char *msg)
1654 {
1655    CmiSyncBroadcastFn(size,msg);
1656    CmiFree(msg);
1657 }
1658
1659 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1660 {
1661
1662 #if CMK_SMP     
1663   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1664   CMI_DEST_RANK(msg) = CmiMyRank();
1665 #endif
1666
1667 #if CMK_BROADCAST_SPANNING_TREE
1668   CmiState cs = CmiGetState();
1669   CmiSyncSendFn(cs->pe, size,msg) ;
1670   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1671   SendSpanningChildren(size, msg);
1672
1673 #elif CMK_BROADCAST_HYPERCUBE
1674   CmiState cs = CmiGetState();
1675   CmiSyncSendFn(cs->pe, size,msg) ;
1676   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1677   SendHypercube(size, msg);
1678
1679 #else
1680     int i ;
1681
1682   for ( i=0; i<_Cmi_numpes; i++ )
1683     CmiSyncSendFn(i,size,msg) ;
1684 #endif
1685
1686   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1687 }
1688
1689 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1690 {
1691   int i ;
1692
1693   for ( i=1; i<_Cmi_numpes; i++ )
1694     CmiAsyncSendFn(i,size,msg) ;
1695
1696   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1697
1698   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1699 }
1700
1701 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1702 {
1703 #if CMK_SMP     
1704   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1705   CMI_DEST_RANK(msg) = CmiMyRank();
1706 #endif
1707
1708 #if CMK_BROADCAST_SPANNING_TREE
1709   CmiState cs = CmiGetState();
1710   CmiSyncSendFn(cs->pe, size,msg) ;
1711   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1712   SendSpanningChildren(size, msg);
1713
1714 #elif CMK_BROADCAST_HYPERCUBE
1715   CmiState cs = CmiGetState();
1716   CmiSyncSendFn(cs->pe, size,msg) ;
1717   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1718   SendHypercube(size, msg);
1719
1720 #else
1721   int i ;
1722
1723   for ( i=0; i<_Cmi_numpes; i++ )
1724     CmiSyncSendFn(i,size,msg) ;
1725 #endif
1726   CmiFree(msg) ;
1727   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1728 }
1729
1730 #if CMK_NODE_QUEUE_AVAILABLE
1731
1732 static void CmiSendNodeSelf(char *msg)
1733 {
1734 #if CMK_IMMEDIATE_MSG
1735 #if 0
1736     if (CmiIsImmediate(msg) && !_immRunning) {
1737       /*CmiHandleImmediateMessage(msg); */
1738       CmiPushImmediateMsg(msg);
1739       CmiHandleImmediate();
1740       return;
1741     }
1742 #endif
1743     if (CmiIsImmediate(msg))
1744     {
1745       CmiPushImmediateMsg(msg);
1746       if (!_immRunning) CmiHandleImmediate();
1747       return;
1748     }
1749 #endif
1750     CQdCreate(CpvAccess(cQdState), 1);
1751     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1752     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1753     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1754 }
1755
1756 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1757 {
1758   int i;
1759   SMSG_LIST *msg_tmp;
1760   char *dupmsg;
1761
1762   CMI_SET_BROADCAST_ROOT(msg, 0);
1763   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1764   switch (dstNode) {
1765   case NODE_BROADCAST_ALL:
1766     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1767   case NODE_BROADCAST_OTHERS:
1768     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1769     for (i=0; i<_Cmi_numnodes; i++)
1770       if (i!=_Cmi_mynode) {
1771         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1772       }
1773     break;
1774   default:
1775     dupmsg = (char *)CmiCopyMsg(msg,size);
1776     if(dstNode == _Cmi_mynode) {
1777       CmiSendNodeSelf(dupmsg);
1778     }
1779     else {
1780       CQdCreate(CpvAccess(cQdState), 1);
1781       EnqueueMsg(dupmsg, size, dstNode);
1782     }
1783   }
1784   return 0;
1785 }
1786
1787 void CmiSyncNodeSendFn(int p, int s, char *m)
1788 {
1789   CmiAsyncNodeSendFn(p, s, m);
1790 }
1791
1792 /* need */
1793 void CmiFreeNodeSendFn(int p, int s, char *m)
1794 {
1795   CmiAsyncNodeSendFn(p, s, m);
1796   CmiFree(m);
1797 }
1798
1799 /* need */
1800 void CmiSyncNodeBroadcastFn(int s, char *m)
1801 {
1802   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1803 }
1804
1805 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1806 {
1807 }
1808
1809 /* need */
1810 void CmiFreeNodeBroadcastFn(int s, char *m)
1811 {
1812   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1813   CmiFree(m);
1814 }
1815
1816 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1817 {
1818   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1819 }
1820
1821 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1822 {
1823   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1824 }
1825
1826 /* need */
1827 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1828 {
1829   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1830   CmiFree(m);
1831 }
1832 #endif
1833
1834 /************************** MAIN ***********************************/
1835 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1836
1837 void ConverseExit(void)
1838 {
1839 #if ! CMK_SMP
1840   while(!CmiAllAsyncMsgsSent()) {
1841     PumpMsgs();
1842     CmiReleaseSentMessages();
1843   }
1844   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1845     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1846
1847   ConverseCommonExit();
1848 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1849   if (CmiMyPe() == 0){
1850     CmiPrintf("End of program\n");
1851 #if MPI_POST_RECV_COUNT > 0
1852     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1853 #endif
1854 }
1855 #endif
1856 #if ! CMK_AUTOBUILD
1857   signal(SIGINT, signal_int);
1858   MPI_Finalize();
1859 #endif
1860   exit(0);
1861
1862 #else
1863     /* SMP version, communication thread will exit */
1864   ConverseCommonExit();
1865   /* atomic increment */
1866   CmiLock(exitLock);
1867   inexit++;
1868   CmiUnlock(exitLock);
1869   while (1) CmiYield();
1870 #endif
1871 }
1872
1873 static void registerMPITraceEvents() {
1874 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1875     traceRegisterUserEvent("MPI_Barrier", 10);
1876     traceRegisterUserEvent("MPI_Send", 20);
1877     traceRegisterUserEvent("MPI_Recv", 30);
1878     traceRegisterUserEvent("MPI_Isend", 40);
1879     traceRegisterUserEvent("MPI_Irecv", 50);
1880     traceRegisterUserEvent("MPI_Test", 60);
1881     traceRegisterUserEvent("MPI_Iprobe", 70);
1882 #endif
1883 }
1884
1885
1886 static char     **Cmi_argv;
1887 static char     **Cmi_argvcopy;
1888 static CmiStartFn Cmi_startfn;   /* The start function */
1889 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1890
1891 typedef struct {
1892   int sleepMs; /*Milliseconds to sleep while idle*/
1893   int nIdles; /*Number of times we've been idle in a row*/
1894   CmiState cs; /*Machine state*/
1895 } CmiIdleState;
1896
1897 static CmiIdleState *CmiNotifyGetState(void)
1898 {
1899   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1900   s->sleepMs=0;
1901   s->nIdles=0;
1902   s->cs=CmiGetState();
1903   return s;
1904 }
1905
1906 static void CmiNotifyBeginIdle(CmiIdleState *s)
1907 {
1908   s->sleepMs=0;
1909   s->nIdles=0;
1910 }
1911
1912 static void CmiNotifyStillIdle(CmiIdleState *s)
1913 {
1914 #if ! CMK_SMP
1915   CmiReleaseSentMessages();
1916   PumpMsgs();
1917 #else
1918 /*  CmiYield();  */
1919 #endif
1920
1921 #if 1
1922   {
1923   int nSpins=20; /*Number of times to spin before sleeping*/
1924   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1925   s->nIdles++;
1926   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1927     s->sleepMs+=2;
1928     if (s->sleepMs>10) s->sleepMs=10;
1929   }
1930   /*Comm. thread will listen on sockets-- just sleep*/
1931   if (s->sleepMs>0) {
1932     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1933     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1934     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1935   }
1936   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1937   }
1938 #endif
1939 }
1940
1941 #if MACHINE_DEBUG_LOG
1942 FILE *debugLog = NULL;
1943 #endif
1944
1945 static int machine_exit_idx;
1946 static void machine_exit(char *m) {
1947   EmergencyExit();
1948   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1949   fflush(stdout);
1950   CmiNodeBarrier();
1951   if (CmiMyRank() == 0) {
1952     MPI_Barrier(MPI_COMM_WORLD);
1953     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1954     MPI_Abort(MPI_COMM_WORLD, 1);
1955   } else {
1956     while (1) CmiYield();
1957   }
1958 }
1959
1960 static void KillOnAllSigs(int sigNo) {
1961   static int already_in_signal_handler = 0;
1962   char *m;
1963   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1964   already_in_signal_handler = 1;
1965 #if CMK_CCS_AVAILABLE
1966   if (CpvAccess(cmiArgDebugFlag)) {
1967     CpdNotify(CPD_SIGNAL, sigNo);
1968     CpdFreeze();
1969   }
1970 #endif
1971   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1972       "Signal: %d\n",CmiMyPe(),sigNo);
1973   CmiPrintStackTrace(1);
1974
1975   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1976   CmiSetHandler(m, machine_exit_idx);
1977   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1978   machine_exit(m);
1979 }
1980
1981 static void ConverseRunPE(int everReturn)
1982 {
1983   CmiIdleState *s=CmiNotifyGetState();
1984   CmiState cs;
1985   char** CmiMyArgv;
1986
1987   CmiNodeAllBarrier();
1988
1989   cs = CmiGetState();
1990   CpvInitialize(void *,CmiLocalQueue);
1991   CpvAccess(CmiLocalQueue) = cs->localqueue;
1992
1993   if (CmiMyRank())
1994     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1995   else
1996     CmiMyArgv=Cmi_argv;
1997
1998   CthInit(CmiMyArgv);
1999
2000   ConverseCommonInit(CmiMyArgv);
2001   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
2002
2003 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2004   CpvInitialize(double, projTraceStart);
2005   /* only PE 0 needs to care about registration (to generate sts file). */
2006   if (CmiMyPe() == 0) {
2007     registerMachineUserEventsFunction(&registerMPITraceEvents);
2008   }
2009 #endif
2010
2011   /* initialize the network progress counter*/
2012   /* Network progress function is used to poll the network when for
2013      messages. This flushes receive buffers on some  implementations*/
2014   CpvInitialize(int , networkProgressCount);
2015   CpvAccess(networkProgressCount) = 0;
2016
2017 #if CMK_SMP
2018   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2019   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2020 #else
2021   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2022 #endif
2023
2024 #if MACHINE_DEBUG_LOG
2025   if (CmiMyRank() == 0) {
2026     char ln[200];
2027     sprintf(ln,"debugLog.%d",CmiMyNode());
2028     debugLog=fopen(ln,"w");
2029   }
2030 #endif
2031
2032   /* Converse initialization finishes, immediate messages can be processed.
2033      node barrier previously should take care of the node synchronization */
2034   _immediateReady = 1;
2035
2036   /* communication thread */
2037   if (CmiMyRank() == CmiMyNodeSize()) {
2038     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2039     while (1) CommunicationServerThread(5);
2040   }
2041   else {  /* worker thread */
2042   if (!everReturn) {
2043     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2044     if (Cmi_usrsched==0) CsdScheduler(-1);
2045     ConverseExit();
2046   }
2047   }
2048 }
2049
2050 static char *thread_level_tostring(int thread_level)
2051 {
2052 #if CMK_MPI_INIT_THREAD
2053   switch (thread_level) {
2054   case MPI_THREAD_SINGLE:
2055       return "MPI_THREAD_SINGLE";
2056   case MPI_THREAD_FUNNELED:
2057       return "MPI_THREAD_FUNNELED";
2058   case MPI_THREAD_SERIALIZED:
2059       return "MPI_THREAD_SERIALIZED";
2060   case MPI_THREAD_MULTIPLE :
2061       return "MPI_THREAD_MULTIPLE ";
2062   default: {
2063       char *str = (char*)malloc(5);
2064       sprintf(str,"%d", thread_level);
2065       return str;
2066       }
2067   }
2068   return  "unknown";
2069 #else
2070   char *str = (char*)malloc(5);
2071   sprintf(str,"%d", thread_level);
2072   return str;
2073 #endif
2074 }
2075
2076 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2077 {
2078   int n,i;
2079   int ver, subver;
2080   int provided;
2081   int thread_level;
2082
2083 #if MACHINE_DEBUG
2084   debugLog=NULL;
2085 #endif
2086 #if CMK_USE_HP_MAIN_FIX
2087 #if FOR_CPLUS
2088   _main(argc,argv);
2089 #endif
2090 #endif
2091
2092 #if CMK_MPI_INIT_THREAD
2093 #if CMK_SMP
2094   thread_level = MPI_THREAD_FUNNELED;
2095 #else
2096   thread_level = MPI_THREAD_SINGLE;
2097 #endif
2098   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2099   _thread_provided = provided;
2100 #else
2101   MPI_Init(&argc, &argv);
2102   thread_level = 0;
2103   provided = -1;
2104 #endif
2105   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2106   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2107
2108   MPI_Get_version(&ver, &subver);
2109   if (_Cmi_mynode == 0) {
2110     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2111   }
2112
2113   /* processor per node */
2114   _Cmi_mynodesize = 1;
2115   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2116     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2117 #if ! CMK_SMP
2118   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2119     CmiAbort("+ppn cannot be used in non SMP version!\n");
2120 #endif
2121   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2122   if (idleblock && _Cmi_mynode == 0) {
2123     printf("Charm++: Running in idle blocking mode.\n");
2124   }
2125
2126   /* setup signal handlers */
2127   signal(SIGSEGV, KillOnAllSigs);
2128   signal(SIGFPE, KillOnAllSigs);
2129   signal(SIGILL, KillOnAllSigs);
2130   signal_int = signal(SIGINT, KillOnAllSigs);
2131   signal(SIGTERM, KillOnAllSigs);
2132   signal(SIGABRT, KillOnAllSigs);
2133 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2134   signal(SIGQUIT, KillOnAllSigs);
2135   signal(SIGBUS, KillOnAllSigs);
2136 /*#     if CMK_HANDLE_SIGUSR
2137   signal(SIGUSR1, HandleUserSignals);
2138   signal(SIGUSR2, HandleUserSignals);
2139 #     endif*/
2140 #   endif /*UNIX*/
2141   
2142 #if CMK_NO_OUTSTANDING_SENDS
2143   no_outstanding_sends=1;
2144 #endif
2145   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2146     no_outstanding_sends = 1;
2147     if (_Cmi_mynode == 0)
2148       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2149         no_outstanding_sends?"":" not");
2150   }
2151   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2152   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2153   Cmi_argvcopy = CmiCopyArgs(argv);
2154   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2155   /* find dim = log2(numpes), to pretend we are a hypercube */
2156   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2157     Cmi_dim++ ;
2158  /* CmiSpanTreeInit();*/
2159   request_max=MAX_QLEN;
2160   CmiGetArgInt(argv,"+requestmax",&request_max);
2161   /*printf("request max=%d\n", request_max);*/
2162
2163   /* checksum flag */
2164   if (CmiGetArgFlag(argv,"+checksum")) {
2165 #if !CMK_OPTIMIZE
2166     checksum_flag = 1;
2167     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2168 #else
2169     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2170 #endif
2171   }
2172
2173   {
2174   int debug = CmiGetArgFlag(argv,"++debug");
2175   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2176   if (debug || debug_no_pause)
2177   {   /*Pause so user has a chance to start and attach debugger*/
2178 #if CMK_HAS_GETPID
2179     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2180     fflush(stdout);
2181     if (!debug_no_pause)
2182       sleep(15);
2183 #else
2184     printf("++debug ignored.\n");
2185 #endif
2186   }
2187   }
2188
2189 #if MPI_POST_RECV_COUNT > 0
2190
2191   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2192   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2193   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2194   CpvInitialize(char*,CmiPostedRecvBuffers);
2195
2196     /* Post some extra recvs to help out with incoming messages */
2197     /* On some MPIs the messages are unexpected and thus slow */
2198
2199     /* An array of request handles for posted recvs */
2200     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2201
2202     /* An array of buffers for posted recvs */
2203     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2204
2205     /* Post Recvs */
2206     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2207         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2208                     MPI_POST_RECV_SIZE,
2209                     MPI_BYTE,
2210                     MPI_ANY_SOURCE,
2211                     POST_RECV_TAG,
2212                     MPI_COMM_WORLD,
2213                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2214           CmiAbort("MPI_Irecv failed\n");
2215     }
2216
2217 #endif
2218
2219
2220
2221   /* CmiTimerInit(); */
2222
2223 #if 0
2224   CthInit(argv);
2225   ConverseCommonInit(argv);
2226
2227   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2228   if (initret==0) {
2229     fn(CmiGetArgc(argv), argv);
2230     if (usched==0) CsdScheduler(-1);
2231     ConverseExit();
2232   }
2233 #endif
2234
2235   CsvInitialize(CmiNodeState, NodeState);
2236   CmiNodeStateInit(&CsvAccess(NodeState));
2237
2238   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2239
2240   for (i=0; i<_Cmi_mynodesize+1; i++) {
2241 #if MULTI_SENDQUEUE
2242     procState[i].sendMsgBuf = PCQueueCreate();
2243 #endif
2244     procState[i].recvLock = CmiCreateLock();
2245   }
2246 #if CMK_SMP
2247 #if !MULTI_SENDQUEUE
2248   sendMsgBuf = PCQueueCreate();
2249   sendMsgBufLock = CmiCreateLock();
2250 #endif
2251   exitLock = CmiCreateLock();            /* exit count lock */
2252 #endif
2253
2254   /* Network progress function is used to poll the network when for
2255      messages. This flushes receive buffers on some  implementations*/
2256   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2257   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2258
2259   CmiStartThreads(argv);
2260   ConverseRunPE(initret);
2261 }
2262
2263 /***********************************************************************
2264  *
2265  * Abort function:
2266  *
2267  ************************************************************************/
2268
2269 void CmiAbort(const char *message)
2270 {
2271   char *m;
2272   /* if CharmDebug is attached simply try to send a message to it */
2273 #if CMK_CCS_AVAILABLE
2274   if (CpvAccess(cmiArgDebugFlag)) {
2275     CpdNotify(CPD_ABORT, message);
2276     CpdFreeze();
2277   }
2278 #endif  
2279   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2280         "Reason: %s\n",CmiMyPe(),message);
2281  /*  CmiError(message); */
2282   CmiPrintStackTrace(0);
2283   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2284   CmiSetHandler(m, machine_exit_idx);
2285   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2286   machine_exit(m);
2287   /* Program never reaches here */
2288   MPI_Abort(MPI_COMM_WORLD, 1);
2289 }
2290
2291
2292 #if 0
2293
2294 /* ****************************************************************** */
2295 /*    The following internal functions implement recd msg queue       */
2296 /* ****************************************************************** */
2297
2298 static void ** AllocBlock(unsigned int len)
2299 {
2300   void ** blk;
2301
2302   blk=(void **)CmiAlloc(len*sizeof(void *));
2303   if(blk==(void **)0) {
2304     CmiError("Cannot Allocate Memory!\n");
2305     MPI_Abort(MPI_COMM_WORLD, 1);
2306   }
2307   return blk;
2308 }
2309
2310 static void
2311 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2312 {
2313   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2314   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2315 }
2316
2317 void recdQueueInit(void)
2318 {
2319   recdQueue_blk = AllocBlock(BLK_LEN);
2320   recdQueue_blk_len = BLK_LEN;
2321   recdQueue_first = 0;
2322   recdQueue_len = 0;
2323 }
2324
2325 void recdQueueAddToBack(void *element)
2326 {
2327 #if NODE_0_IS_CONVHOST
2328   inside_comm = 1;
2329 #endif
2330   if(recdQueue_len==recdQueue_blk_len) {
2331     void **blk;
2332     recdQueue_blk_len *= 3;
2333     blk = AllocBlock(recdQueue_blk_len);
2334     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2335     CmiFree(recdQueue_blk);
2336     recdQueue_blk = blk;
2337     recdQueue_first = 0;
2338   }
2339   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2340 #if NODE_0_IS_CONVHOST
2341   inside_comm = 0;
2342 #endif
2343 }
2344
2345
2346 void * recdQueueRemoveFromFront(void)
2347 {
2348   if(recdQueue_len) {
2349     void *element;
2350     element = recdQueue_blk[recdQueue_first++];
2351     recdQueue_first %= recdQueue_blk_len;
2352     recdQueue_len--;
2353     return element;
2354   }
2355   return 0;
2356 }
2357
2358 #endif
2359
2360 /*@}*/