Modified and tested the post recv scheme for both non-SMP and SMP cases
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #define CMI_DYNAMIC_EXERT_CAP 0
51 /* This macro defines the max number of msgs in the sender msg buffer 
52  * that is allowed for recving operation to continue
53  */
54 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
55 #define CMI_DYNAMIC_MAXCAPSIZE 1000
56 #define CMI_DYNAMIC_SEND_CAPSIZE 4
57 #define CMI_DYNAMIC_RECV_CAPSIZE 3
58 /* initial values, -1 indiates there's no cap */
59 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
60 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 3
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70
71 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
72 #define CMI_MPI_TRACE_MOREDETAILED 0
73 #undef CMI_MPI_TRACE_USEREVENTS
74 #define CMI_MPI_TRACE_USEREVENTS 1
75 #else
76 #undef CMK_SMP_TRACE_COMMTHREAD
77 #define CMK_SMP_TRACE_COMMTHREAD 0
78 #endif
79
80 #define CMK_TRACE_COMMOVERHEAD 0
81 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_TRACE_COMMOVERHEAD
86 #define CMK_TRACE_COMMOVERHEAD 0
87 #endif
88
89 #include "machine.h"
90
91 #include "pcqueue.h"
92
93 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
94
95 #if CMK_BLUEGENEL
96 #define MAX_QLEN 8
97 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
98 #else
99 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
100 #define MAX_QLEN 200
101 #endif
102
103 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
104 CpvStaticDeclare(double, projTraceStart);
105 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
106 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
107 #else
108 # define  START_EVENT()
109 # define  END_EVENT(x)
110 #endif
111
112 /*
113     To reduce the buffer used in broadcast and distribute the load from
114   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
115   spanning tree broadcast algorithm.
116     This will use the fourth short in message as an indicator of spanning tree
117   root.
118 */
119 #define CMK_BROADCAST_SPANNING_TREE    1
120 #define CMK_BROADCAST_HYPERCUBE        0
121
122 #define BROADCAST_SPANNING_FACTOR      4
123 /* The number of children used when a msg is broadcast inside a node */
124 #define BROADCAST_SPANNING_INTRA_FACTOR      8
125
126 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
127 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
128
129 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
130 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
131
132 /* FIXME: need a random number that everyone agrees ! */
133 #define CHARM_MAGIC_NUMBER               126
134
135 #if CMK_ERROR_CHECKING
136 static int checksum_flag = 0;
137 #define CMI_SET_CHECKSUM(msg, len)      \
138         if (checksum_flag)  {   \
139           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
140           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
141         }
142 #define CMI_CHECK_CHECKSUM(msg, len)    \
143         if (checksum_flag)      \
144           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
145             CmiAbort("Fatal error: checksum doesn't agree!\n");
146 #else
147 #define CMI_SET_CHECKSUM(msg, len)
148 #define CMI_CHECK_CHECKSUM(msg, len)
149 #endif
150
151 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
152 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
153 #else
154 #define CMI_SET_BROADCAST_ROOT(msg, root) 
155 #endif
156
157
158 /** 
159     If MPI_POST_RECV is defined, we provide default values for size 
160     and number of posted recieves. If MPI_POST_RECV_COUNT is set
161     then a default value for MPI_POST_RECV_SIZE is used if not specified
162     by the user.
163 */
164
165 #define MPI_POST_RECV 0
166 #if MPI_POST_RECV
167 #define MPI_POST_RECV_COUNT 10
168 #undef MPI_POST_RECV
169 #endif
170 #if MPI_POST_RECV_COUNT > 0
171 #ifndef MPI_POST_RECV_SIZE
172 #define MPI_POST_RECV_LOWERSIZE 2000
173 #define MPI_POST_RECV_UPPERSIZE 4000
174 #define MPI_POST_RECV_SIZE MPI_POST_RECV_UPPERSIZE
175 #endif
176 /* #undef  MPI_POST_RECV_DEBUG  */
177 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
178 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
179 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
180 CpvDeclare(char*,CmiPostedRecvBuffers);
181 #endif
182
183 /*
184  to avoid MPI's in order delivery, changing MPI Tag all the time
185 */
186 #define TAG     1375
187
188 #if MPI_POST_RECV_COUNT > 0
189 #define POST_RECV_TAG (TAG+1)
190 #define BARRIER_ZERO_TAG TAG
191 #else
192 #define BARRIER_ZERO_TAG     1375
193 #endif
194
195 #include <signal.h>
196 void (*signal_int)(int);
197
198 /*
199 static int mpi_tag = TAG;
200 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
201 */
202
203 static int        _thread_provided = -1;
204 int               _Cmi_numpes;
205 int               _Cmi_mynode;    /* Which address space am I */
206 int               _Cmi_mynodesize;/* Number of processors in my address space */
207 int               _Cmi_numnodes;  /* Total number of address spaces */
208 int               _Cmi_numpes;    /* Total number of processors */
209 static int        Cmi_nodestart; /* First processor in this address space */
210 CpvDeclare(void*, CmiLocalQueue);
211
212 /*Network progress utility variables. Period controls the rate at
213   which the network poll is called */
214 CpvDeclare(unsigned , networkProgressCount);
215 int networkProgressPeriod;
216
217 int               idleblock = 0;
218
219 #define BLK_LEN  512
220
221 #if CMK_NODE_QUEUE_AVAILABLE
222 #define DGRAM_NODEMESSAGE   (0xFB)
223
224 #define NODE_BROADCAST_OTHERS (-1)
225 #define NODE_BROADCAST_ALL    (-2)
226 #endif
227
228 #if 0
229 static void **recdQueue_blk;
230 static unsigned int recdQueue_blk_len;
231 static unsigned int recdQueue_first;
232 static unsigned int recdQueue_len;
233 static void recdQueueInit(void);
234 static void recdQueueAddToBack(void *element);
235 static void *recdQueueRemoveFromFront(void);
236 #endif
237
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 typedef struct msg_list {
243      char *msg;
244      struct msg_list *next;
245      int size, destpe;
246
247 #if CMK_SMP_TRACE_COMMTHREAD
248         int srcpe;
249 #endif  
250         
251      MPI_Request req;
252 } SMSG_LIST;
253
254 int MsgQueueLen=0;
255 static int request_max;
256
257 static SMSG_LIST *sent_msgs=0;
258 static SMSG_LIST *end_sent=0;
259
260 static int Cmi_dim;
261
262 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
263
264 #if NODE_0_IS_CONVHOST
265 int inside_comm = 0;
266 #endif
267
268 void CmiAbort(const char *message);
269 static void PerrorExit(const char *msg);
270
271 void SendSpanningChildren(int size, char *msg);
272 void SendHypercube(int size, char *msg);
273
274 static void PerrorExit(const char *msg)
275 {
276   perror(msg);
277   exit(1);
278 }
279
280 extern unsigned char computeCheckSum(unsigned char *data, int len);
281
282 /**************************  TIMER FUNCTIONS **************************/
283
284 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
285
286 /* MPI calls are not threadsafe, even the timer on some machines */
287 static CmiNodeLock  timerLock = 0;
288 static int _absoluteTime = 0;
289 static double starttimer = 0;
290 static int _is_global = 0;
291
292 int CmiTimerIsSynchronized()
293 {
294   int  flag;
295   void *v;
296
297   /*  check if it using synchronized timer */
298   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
299     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
300   if (flag) {
301     _is_global = *(int*)v;
302     if (_is_global && CmiMyPe() == 0)
303       printf("Charm++> MPI timer is synchronized\n");
304   }
305   return _is_global;
306 }
307
308 int CmiTimerAbsolute()
309 {       
310   return _absoluteTime;
311 }
312
313 double CmiStartTimer()
314 {
315   return 0.0;
316 }
317
318 double CmiInitTime()
319 {
320   return starttimer;
321 }
322
323 void CmiTimerInit(char **argv)
324 {
325   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
326   if (_absoluteTime && CmiMyPe() == 0)
327       printf("Charm++> absolute MPI timer is used\n");
328
329   _is_global = CmiTimerIsSynchronized();
330
331   if (_is_global) {
332     if (CmiMyRank() == 0) {
333       double minTimer;
334 #if CMK_TIMER_USE_XT3_DCLOCK
335       starttimer = dclock();
336 #else
337       starttimer = MPI_Wtime();
338 #endif
339
340       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
341                                   MPI_COMM_WORLD );
342       starttimer = minTimer;
343     }
344   }
345   else {  /* we don't have a synchronous timer, set our own start time */
346     CmiBarrier();
347     CmiBarrier();
348     CmiBarrier();
349 #if CMK_TIMER_USE_XT3_DCLOCK
350     starttimer = dclock();
351 #else
352     starttimer = MPI_Wtime();
353 #endif
354   }
355
356 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
357   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
358     timerLock = CmiCreateLock();
359 #endif
360   CmiNodeAllBarrier();          /* for smp */
361 }
362
363 /**
364  * Since the timerLock is never created, and is
365  * always NULL, then all the if-condition inside
366  * the timer functions could be disabled right
367  * now in the case of SMP. --Chao Mei
368  */
369 double CmiTimer(void)
370 {
371   double t;
372 #if 0 && CMK_SMP
373   if (timerLock) CmiLock(timerLock);
374 #endif
375
376 #if CMK_TIMER_USE_XT3_DCLOCK
377   t = dclock();
378 #else
379   t = MPI_Wtime();
380 #endif
381
382 #if 0 && CMK_SMP
383   if (timerLock) CmiUnlock(timerLock);
384 #endif
385
386   return _absoluteTime?t: (t-starttimer);
387 }
388
389 double CmiWallTimer(void)
390 {
391   double t;
392 #if 0 && CMK_SMP
393   if (timerLock) CmiLock(timerLock);
394 #endif
395
396 #if CMK_TIMER_USE_XT3_DCLOCK
397   t = dclock();
398 #else
399   t = MPI_Wtime();
400 #endif
401
402 #if 0 && CMK_SMP
403   if (timerLock) CmiUnlock(timerLock);
404 #endif
405
406   return _absoluteTime? t: (t-starttimer);
407 }
408
409 double CmiCpuTimer(void)
410 {
411   double t;
412 #if 0 && CMK_SMP
413   if (timerLock) CmiLock(timerLock);
414 #endif
415 #if CMK_TIMER_USE_XT3_DCLOCK
416   t = dclock() - starttimer;
417 #else
418   t = MPI_Wtime() - starttimer;
419 #endif
420 #if 0 && CMK_SMP
421   if (timerLock) CmiUnlock(timerLock);
422 #endif
423   return t;
424 }
425
426 #endif
427
428 /* must be called on all ranks including comm thread in SMP */
429 int CmiBarrier()
430 {
431 #if CMK_SMP
432     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
433   CmiNodeAllBarrier();
434   if (CmiMyRank() == CmiMyNodeSize()) 
435 #else
436   if (CmiMyRank() == 0) 
437 #endif
438   {
439 /**
440  *  The call of CmiBarrier is usually before the initialization
441  *  of trace module of Charm++, therefore, the START_EVENT
442  *  and END_EVENT are disabled here. -Chao Mei
443  */     
444     /*START_EVENT();*/
445
446     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
447         CmiAbort("Timernit: MPI_Barrier failed!\n");
448
449     /*END_EVENT(10);*/
450   }
451   CmiNodeAllBarrier();
452   return 0;
453 }
454
455 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
456 int CmiBarrierZero()
457 {
458   int i;
459 #if CMK_SMP
460   if (CmiMyRank() == CmiMyNodeSize()) 
461 #else
462   if (CmiMyRank() == 0) 
463 #endif
464   {
465     char msg[1];
466     MPI_Status sts;
467     if (CmiMyNode() == 0)  {
468       for (i=0; i<CmiNumNodes()-1; i++) {
469          START_EVENT();
470
471          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
472             CmiPrintf("MPI_Recv failed!\n");
473
474          END_EVENT(30);
475       }
476     }
477     else {
478       START_EVENT();
479
480       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
481          printf("MPI_Send failed!\n");
482
483       END_EVENT(20);
484     }
485   }
486   CmiNodeAllBarrier();
487   return 0;
488 }
489
490 typedef struct ProcState {
491 #if MULTI_SENDQUEUE
492 PCQueue      sendMsgBuf;       /* per processor message sending queue */
493 #endif
494 CmiNodeLock  recvLock;              /* for cs->recv */
495 } ProcState;
496
497 static ProcState  *procState;
498
499 #if CMK_SMP
500
501 #if !MULTI_SENDQUEUE
502 static PCQueue sendMsgBuf;
503 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
504 #endif
505
506 #endif
507
508 /************************************************************
509  *
510  * Processor state structure
511  *
512  ************************************************************/
513
514 /* fake Cmi_charmrun_fd */
515 static int Cmi_charmrun_fd = 0;
516 #include "machine-smp.c"
517
518 CsvDeclare(CmiNodeState, NodeState);
519
520 #include "immediate.c"
521
522 #if CMK_SHARED_VARS_UNAVAILABLE
523 /************ non SMP **************/
524 static struct CmiStateStruct Cmi_state;
525 int _Cmi_mype;
526 int _Cmi_myrank;
527
528 void CmiMemLock() {}
529 void CmiMemUnlock() {}
530
531 #define CmiGetState() (&Cmi_state)
532 #define CmiGetStateN(n) (&Cmi_state)
533
534 void CmiYield(void) { sleep(0); }
535
536 static void CmiStartThreads(char **argv)
537 {
538   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
539   _Cmi_mype = Cmi_nodestart;
540   _Cmi_myrank = 0;
541 }
542 #endif  /* non smp */
543
544 /*Add a message to this processor's receive queue, pe is a rank */
545 void CmiPushPE(int pe,void *msg)
546 {
547   CmiState cs = CmiGetStateN(pe);
548   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
549 #if CMK_IMMEDIATE_MSG
550   if (CmiIsImmediate(msg)) {
551 /*
552 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
553     CmiHandleMessage(msg);
554 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
555 */
556     /**(CmiUInt2 *)msg = pe;*/
557     CMI_DEST_RANK(msg) = pe;
558     CmiPushImmediateMsg(msg);
559     return;
560   }
561 #endif
562
563 #if CMK_SMP
564   CmiLock(procState[pe].recvLock);
565 #endif
566   PCQueuePush(cs->recv,msg);
567 #if CMK_SMP
568   CmiUnlock(procState[pe].recvLock);
569 #endif
570   CmiIdleLock_addMessage(&cs->idle);
571   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
572 }
573
574 #if CMK_NODE_QUEUE_AVAILABLE
575 /*Add a message to this processor's receive queue */
576 static void CmiPushNode(void *msg)
577 {
578   MACHSTATE(3,"Pushing message into NodeRecv queue");
579 #if CMK_IMMEDIATE_MSG
580   if (CmiIsImmediate(msg)) {
581     CMI_DEST_RANK(msg) = 0;
582     CmiPushImmediateMsg(msg);
583     return;
584   }
585 #endif
586   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
587   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
588   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
589   {
590   CmiState cs=CmiGetStateN(0);
591   CmiIdleLock_addMessage(&cs->idle);
592   }
593 }
594 #endif
595
596 #ifndef CmiMyPe
597 int CmiMyPe(void)
598 {
599   return CmiGetState()->pe;
600 }
601 #endif
602
603 #ifndef CmiMyRank
604 int CmiMyRank(void)
605 {
606   return CmiGetState()->rank;
607 }
608 #endif
609
610 #ifndef CmiNodeFirst
611 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
612 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
613 #endif
614
615 #ifndef CmiNodeOf
616 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
617 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
618 #endif
619
620 static size_t CmiAllAsyncMsgsSent(void)
621 {
622    SMSG_LIST *msg_tmp = sent_msgs;
623    MPI_Status sts;
624    int done;
625
626    while(msg_tmp!=0) {
627     done = 0;
628     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
629       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
630     if(!done)
631       return 0;
632     msg_tmp = msg_tmp->next;
633 /*    MsgQueueLen--; ????? */
634    }
635    return 1;
636 }
637
638 int CmiAsyncMsgSent(CmiCommHandle c) {
639
640   SMSG_LIST *msg_tmp = sent_msgs;
641   int done;
642   MPI_Status sts;
643
644   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
645     msg_tmp = msg_tmp->next;
646   if(msg_tmp) {
647     done = 0;
648     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
649       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
650     return ((done)?1:0);
651   } else {
652     return 1;
653   }
654 }
655
656 void CmiReleaseCommHandle(CmiCommHandle c)
657 {
658   return;
659 }
660
661 #if CMK_BLUEGENEL
662 extern void MPID_Progress_test();
663 #endif
664
665 void CmiReleaseSentMessages(void)
666 {
667   SMSG_LIST *msg_tmp=sent_msgs;
668   SMSG_LIST *prev=0;
669   SMSG_LIST *temp;
670   int done;
671   MPI_Status sts;
672
673 #if CMK_BLUEGENEL
674   MPID_Progress_test();
675 #endif
676
677   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
678   while(msg_tmp!=0) {
679     done =0;
680 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
681     double startT = CmiWallTimer();
682 #endif
683     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
684       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
685     if(done) {
686       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
687       MsgQueueLen--;
688       /* Release the message */
689       temp = msg_tmp->next;
690       if(prev==0)  /* first message */
691         sent_msgs = temp;
692       else
693         prev->next = temp;
694       CmiFree(msg_tmp->msg);
695       CmiFree(msg_tmp);
696       msg_tmp = temp;
697     } else {
698       prev = msg_tmp;
699       msg_tmp = msg_tmp->next;
700     }
701 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
702     {
703     double endT = CmiWallTimer();
704     /* only record the event if it takes more than 1ms */
705     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
706     }
707 #endif
708   }
709   end_sent = prev;
710   MACHSTATE(2,"} CmiReleaseSentMessages end");
711 }
712
713 int PumpMsgs(void)
714 {
715   int nbytes, flg, res;
716   char *msg;
717   MPI_Status sts;
718   int recd=0;
719
720 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
721   int recvCnt=0;
722 #endif
723         
724 #if CMK_BLUEGENEL
725   MPID_Progress_test();
726 #endif
727
728   MACHSTATE(2,"PumpMsgs begin {");
729
730 #if CMI_DYNAMIC_EXERT_CAP
731   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
732 #endif
733         
734   while(1) {
735 #if CMI_EXERT_RECV_CAP
736         if(recvCnt==RECV_CAP) break;
737 #elif CMI_DYNAMIC_EXERT_CAP
738         if(recvCnt >= dynamicRecvCap) break;
739 #endif
740           
741     /* First check posted recvs then do  probe unmatched outstanding messages */
742 #if MPI_POST_RECV_COUNT > 0 
743     int completed_index=-1;
744     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
745         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
746     if(flg){
747         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
748             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
749
750                 recd = 1;
751         msg = (char *) CmiAlloc(nbytes);
752         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
753         /* and repost the recv */
754
755         START_EVENT();
756
757         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
758             MPI_POST_RECV_SIZE,
759             MPI_BYTE,
760             MPI_ANY_SOURCE,
761             POST_RECV_TAG,
762             MPI_COMM_WORLD,
763             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
764                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
765
766         END_EVENT(50);
767
768         CpvAccess(Cmi_posted_recv_total)++;
769     }
770     else {
771         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
772         if(res != MPI_SUCCESS)
773         CmiAbort("MPI_Iprobe failed\n");
774         if(!flg) break;
775         recd = 1;
776         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
777         msg = (char *) CmiAlloc(nbytes);
778
779         START_EVENT();
780
781         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
782             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
783
784         END_EVENT(30);
785
786         CpvAccess(Cmi_unposted_recv_total)++;
787     }
788 #else
789     /* Original version */
790 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
791   double startT = CmiWallTimer(); 
792 #endif
793     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
794     if(res != MPI_SUCCESS)
795       CmiAbort("MPI_Iprobe failed\n");
796
797     if(!flg) break;
798 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
799     {
800     double endT = CmiWallTimer();
801     /* only trace the probe that last longer than 1ms */
802     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
803     }
804 #endif
805
806     recd = 1;
807     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
808     msg = (char *) CmiAlloc(nbytes);
809     
810     START_EVENT();
811
812     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
813       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
814
815     /*END_EVENT(30);*/
816
817 #endif
818
819 #if CMK_SMP_TRACE_COMMTHREAD
820         traceBeginCommOp(msg);
821         traceChangeLastTimestamp(CpvAccess(projTraceStart));
822         traceEndCommOp(msg);
823         #if CMI_MPI_TRACE_MOREDETAILED
824         char tmp[32];
825         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
826         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
827         #endif
828 #elif CMK_TRACE_COMMOVERHEAD
829         char tmp[32];
830         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
831         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
832 #endif
833         
834         
835     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
836     CMI_CHECK_CHECKSUM(msg, nbytes);
837 #if CMK_ERROR_CHECKING
838     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
839       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
840       CmiFree(msg);
841       CmiAbort("Abort!\n");
842       continue;
843     }
844 #endif
845         
846 #if CMK_BROADCAST_SPANNING_TREE
847     if (CMI_BROADCAST_ROOT(msg))
848       SendSpanningChildren(nbytes, msg);
849 #elif CMK_BROADCAST_HYPERCUBE
850     if (CMI_BROADCAST_ROOT(msg))
851       SendHypercube(nbytes, msg);
852 #endif
853         
854         /* In SMP mode, this push operation needs to be executed
855      * after forwarding broadcast messages. If it is executed
856      * earlier, then during the bcast msg forwarding period,    
857          * the msg could be already freed on the worker thread.
858          * As a result, the forwarded message could be wrong! 
859          * --Chao Mei
860          */
861 #if CMK_NODE_QUEUE_AVAILABLE
862     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
863       CmiPushNode(msg);
864     else
865 #endif
866         CmiPushPE(CMI_DEST_RANK(msg), msg);     
867         
868 #if CMI_EXERT_RECV_CAP
869         recvCnt++;
870 #elif CMI_DYNAMIC_EXERT_CAP
871         recvCnt++;
872 #if CMK_SMP
873         /* check sendMsgBuf to get the  number of messages that have not been sent
874          * which is only available in SMP mode
875          * MsgQueueLen indicates the number of messages that have not been released 
876          * by MPI 
877          */
878         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
879                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
880                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
881         }
882 #else
883         /* MsgQueueLen indicates the number of messages that have not been released 
884          * by MPI 
885          */
886         if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
887                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
888         }
889 #endif
890
891 #endif  
892         
893   }
894
895   
896 #if CMK_IMMEDIATE_MSG && !CMK_SMP
897   CmiHandleImmediate();
898 #endif
899   
900   MACHSTATE(2,"} PumpMsgs end ");
901   return recd;
902 }
903
904 /* blocking version */
905 static void PumpMsgsBlocking(void)
906 {
907   static int maxbytes = 20000000;
908   static char *buf = NULL;
909   int nbytes, flg;
910   MPI_Status sts;
911   char *msg;
912   int recd=0;
913
914   if (!PCQueueEmpty(CmiGetState()->recv)) return;
915   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
916   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
917   if (sent_msgs)  return;
918
919 #if 0
920   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
921 #endif
922
923   if (buf == NULL) {
924     buf = (char *) CmiAlloc(maxbytes);
925     _MEMCHECK(buf);
926   }
927
928
929 #if MPI_POST_RECV_COUNT > 0
930 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
931 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
932 #endif
933
934   START_EVENT();
935
936   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
937       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
938
939   /*END_EVENT(30);*/
940     
941    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
942    msg = (char *) CmiAlloc(nbytes);
943    memcpy(msg, buf, nbytes);
944
945 #if CMK_SMP_TRACE_COMMTHREAD
946         traceBeginCommOp(msg);
947         traceChangeLastTimestamp(CpvAccess(projTraceStart));
948         traceEndCommOp(msg);
949         #if CMI_MPI_TRACE_MOREDETAILED
950         char tmp[32];
951         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
952         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
953         #endif
954 #endif
955
956 #if CMK_BROADCAST_SPANNING_TREE
957    if (CMI_BROADCAST_ROOT(msg))
958       SendSpanningChildren(nbytes, msg);
959 #elif CMK_BROADCAST_HYPERCUBE
960    if (CMI_BROADCAST_ROOT(msg))
961       SendHypercube(nbytes, msg);
962 #endif
963   
964         /* In SMP mode, this push operation needs to be executed
965      * after forwarding broadcast messages. If it is executed
966      * earlier, then during the bcast msg forwarding period,    
967          * the msg could be already freed on the worker thread.
968          * As a result, the forwarded message could be wrong! 
969          * --Chao Mei
970          */  
971 #if CMK_NODE_QUEUE_AVAILABLE
972    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
973       CmiPushNode(msg);
974    else
975 #endif
976       CmiPushPE(CMI_DEST_RANK(msg), msg);
977 }
978
979 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
980
981 #if CMK_SMP
982
983 static int inexit = 0;
984 static CmiNodeLock  exitLock = 0;
985
986 static int MsgQueueEmpty()
987 {
988   int i;
989 #if MULTI_SENDQUEUE
990   for (i=0; i<_Cmi_mynodesize; i++)
991     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
992 #else
993   return PCQueueEmpty(sendMsgBuf);
994 #endif
995   return 1;
996 }
997
998 static int SendMsgBuf();
999
1000 /* test if all processors recv queues are empty */
1001 static int RecvQueueEmpty()
1002 {
1003   int i;
1004   for (i=0; i<_Cmi_mynodesize; i++) {
1005     CmiState cs=CmiGetStateN(i);
1006     if (!PCQueueEmpty(cs->recv)) return 0;
1007   }
1008   return 1;
1009 }
1010
1011 /**
1012 CommunicationServer calls MPI to send messages in the queues and probe message from network.
1013 */
1014
1015 #define REPORT_COMM_METRICS 0
1016 #if REPORT_COMM_METRICS
1017 static double pumptime = 0.0;
1018 static double releasetime = 0.0;
1019 static double sendtime = 0.0;
1020 #endif
1021
1022 static void CommunicationServer(int sleepTime)
1023 {
1024   int static count=0;
1025 /*
1026   count ++;
1027   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1028 */
1029 #if REPORT_COMM_METRICS
1030   double t1, t2, t3, t4;
1031   t1 = CmiWallTimer();
1032 #endif
1033   PumpMsgs();
1034 #if REPORT_COMM_METRICS
1035   t2 = CmiWallTimer();
1036 #endif
1037   CmiReleaseSentMessages();
1038 #if REPORT_COMM_METRICS
1039   t3 = CmiWallTimer();
1040 #endif
1041   SendMsgBuf();
1042 #if REPORT_COMM_METRICS
1043   t4 = CmiWallTimer();
1044   pumptime += (t2-t1);
1045   releasetime += (t3-t2);
1046   sendtime += (t4-t3);
1047 #endif
1048 /*
1049   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1050 */
1051   if (inexit == CmiMyNodeSize()) {
1052     MACHSTATE(2, "CommunicationServer exiting {");
1053 #if 0
1054     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1055 #endif
1056     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1057       CmiReleaseSentMessages();
1058       SendMsgBuf();
1059       PumpMsgs();
1060     }
1061     MACHSTATE(2, "CommunicationServer barrier begin {");
1062
1063     START_EVENT();
1064
1065     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1066       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1067
1068     END_EVENT(10);
1069
1070     MACHSTATE(2, "} CommunicationServer barrier end");
1071 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1072     if (CmiMyNode() == 0){
1073       CmiPrintf("End of program\n");
1074           #if MPI_POST_RECV_COUNT > 0
1075         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1076           #endif
1077     }
1078 #endif
1079     MACHSTATE(2, "} CommunicationServer EXIT");
1080
1081     ConverseCommonExit();   
1082 #if REPORT_COMM_METRICS
1083     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1084 #endif
1085
1086 #if ! CMK_AUTOBUILD
1087     signal(SIGINT, signal_int);
1088     MPI_Finalize();
1089     #endif
1090     exit(0);
1091   }
1092 }
1093
1094 #endif
1095
1096 static void CommunicationServerThread(int sleepTime)
1097 {
1098 #if CMK_SMP
1099   CommunicationServer(sleepTime);
1100 #endif
1101 #if CMK_IMMEDIATE_MSG
1102   CmiHandleImmediate();
1103 #endif
1104 }
1105
1106 #if CMK_NODE_QUEUE_AVAILABLE
1107 char *CmiGetNonLocalNodeQ(void)
1108 {
1109   CmiState cs = CmiGetState();
1110   char *result = 0;
1111   CmiIdleLock_checkMessage(&cs->idle);
1112 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1113     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1114     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1115     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1116     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1117     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1118 /*  }  */
1119
1120   return result;
1121 }
1122 #endif
1123
1124 void *CmiGetNonLocal(void)
1125 {
1126   static int count=0;
1127   CmiState cs = CmiGetState();
1128   void *msg;
1129
1130 #if ! CMK_SMP
1131   if (CmiNumPes() == 1) return NULL;
1132 #endif
1133
1134   CmiIdleLock_checkMessage(&cs->idle);
1135   /* although it seems that lock is not needed, I found it crashes very often
1136      on mpi-smp without lock */
1137
1138 #if ! CMK_SMP
1139   CmiReleaseSentMessages();
1140   PumpMsgs();
1141 #endif
1142
1143   /* CmiLock(procState[cs->rank].recvLock); */
1144   msg =  PCQueuePop(cs->recv);
1145   /* CmiUnlock(procState[cs->rank].recvLock); */
1146
1147 /*
1148   if (msg) {
1149     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1150   else {
1151     count++;
1152     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1153   }
1154 */
1155 #if ! CMK_SMP
1156   if (no_outstanding_sends) {
1157     while (MsgQueueLen>0) {
1158       CmiReleaseSentMessages();
1159       PumpMsgs();
1160     }
1161   }
1162
1163   if(!msg) {
1164     CmiReleaseSentMessages();
1165     if (PumpMsgs())
1166       return  PCQueuePop(cs->recv);
1167     else
1168       return 0;
1169   }
1170 #endif
1171
1172   return msg;
1173 }
1174
1175 /* called in non-smp mode */
1176 void CmiNotifyIdle(void)
1177 {
1178   CmiReleaseSentMessages();
1179   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1180 }
1181
1182
1183 /********************************************************
1184     The call to probe immediate messages has been renamed to
1185     CmiMachineProgressImpl
1186 ******************************************************/
1187 /* user call to handle immediate message, only useful in non SMP version
1188    using polling method to schedule message.
1189 */
1190 /*
1191 #if CMK_IMMEDIATE_MSG
1192 void CmiProbeImmediateMsg()
1193 {
1194 #if !CMK_SMP
1195   PumpMsgs();
1196   CmiHandleImmediate();
1197 #endif
1198 }
1199 #endif
1200 */
1201
1202 /* Network progress function is used to poll the network when for
1203    messages. This flushes receive buffers on some  implementations*/
1204 #if CMK_MACHINE_PROGRESS_DEFINED
1205 void CmiMachineProgressImpl()
1206 {
1207 #if !CMK_SMP
1208     PumpMsgs();
1209 #if CMK_IMMEDIATE_MSG
1210     CmiHandleImmediate();
1211 #endif
1212 #else
1213     /*Not implemented yet. Communication server does not seem to be
1214       thread safe, so only communication thread call it */
1215     if (CmiMyRank() == CmiMyNodeSize())
1216         CommunicationServerThread(0);
1217 #endif
1218 }
1219 #endif
1220
1221 /********************* MESSAGE SEND FUNCTIONS ******************/
1222
1223 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1224
1225 static void CmiSendSelf(char *msg)
1226 {
1227 #if CMK_IMMEDIATE_MSG
1228     if (CmiIsImmediate(msg)) {
1229       /* CmiBecomeNonImmediate(msg); */
1230       CmiPushImmediateMsg(msg);
1231       CmiHandleImmediate();
1232       return;
1233     }
1234 #endif
1235     CQdCreate(CpvAccess(cQdState), 1);
1236     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1237 }
1238
1239 void CmiSyncSendFn(int destPE, int size, char *msg)
1240 {
1241   CmiState cs = CmiGetState();
1242   char *dupmsg = (char *) CmiAlloc(size);
1243   memcpy(dupmsg, msg, size);
1244
1245   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1246
1247   if (cs->pe==destPE) {
1248     CmiSendSelf(dupmsg);
1249   }
1250   else
1251     CmiAsyncSendFn_(destPE, size, dupmsg);
1252 }
1253
1254 #if CMK_SMP
1255
1256 /* called by communication thread in SMP */
1257 static int SendMsgBuf()
1258 {
1259   SMSG_LIST *msg_tmp;
1260   char *msg;
1261   int node, rank, size;
1262   int i;
1263   int sent = 0;
1264
1265 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1266         int sentCnt = 0;
1267 #endif  
1268         
1269 #if CMI_DYNAMIC_EXERT_CAP
1270         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1271 #endif
1272         
1273   MACHSTATE(2,"SendMsgBuf begin {");
1274 #if MULTI_SENDQUEUE
1275   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1276   {
1277     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1278     {
1279       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1280 #else
1281     /* single message sending queue */
1282     /* CmiLock(sendMsgBufLock); */
1283     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1284     /* CmiUnlock(sendMsgBufLock); */
1285     while (NULL != msg_tmp)
1286     {
1287 #endif
1288       node = msg_tmp->destpe;
1289       size = msg_tmp->size;
1290       msg = msg_tmp->msg;
1291       msg_tmp->next = 0;
1292                 
1293 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1294       while (MsgQueueLen > request_max) {
1295                 CmiReleaseSentMessages();
1296                 PumpMsgs();
1297       }
1298 #endif
1299           
1300       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1301 #if CMK_ERROR_CHECKING
1302       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1303 #endif
1304       CMI_SET_CHECKSUM(msg, size);
1305
1306 #if MPI_POST_RECV_COUNT > 0
1307         if(size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE){
1308           START_EVENT();
1309           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1310                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1311           END_EVENT(40);
1312         }
1313         else {
1314             START_EVENT();
1315             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1316                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1317             END_EVENT(40);
1318         }
1319 #else
1320         START_EVENT();
1321         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1322             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1323         /*END_EVENT(40);*/
1324 #endif
1325         
1326 #if CMK_SMP_TRACE_COMMTHREAD
1327         traceBeginCommOp(msg);
1328         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1329         /* traceSendMsgComm must execute after traceBeginCommOp because
1330          * we pretend we execute an entry method, and inside this we
1331          * pretend we will send another message. Otherwise how could
1332          * a message creation just before an entry method invocation?
1333          * If such logic is broken, the projections will not trace
1334          * messages correctly! -Chao Mei
1335          */
1336         traceSendMsgComm(msg);
1337         traceEndCommOp(msg);
1338         #if CMI_MPI_TRACE_MOREDETAILED
1339         char tmp[64];
1340         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1341         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1342         #endif
1343 #endif
1344                 
1345                 
1346       MACHSTATE(3,"}MPI_send end");
1347       MsgQueueLen++;
1348       if(sent_msgs==0)
1349         sent_msgs = msg_tmp;
1350       else
1351         end_sent->next = msg_tmp;
1352       end_sent = msg_tmp;
1353       sent=1;
1354           
1355 #if CMI_EXERT_SEND_CAP    
1356           if(++sentCnt == SEND_CAP) break;
1357 #elif CMI_DYNAMIC_EXERT_CAP
1358           if(++sentCnt >= dynamicSendCap) break;
1359           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1360                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1361 #endif    
1362           
1363 #if ! MULTI_SENDQUEUE
1364       /* CmiLock(sendMsgBufLock); */
1365       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1366       /* CmiUnlock(sendMsgBufLock); */
1367 #endif
1368     }
1369 #if MULTI_SENDQUEUE
1370   }
1371 #endif
1372   MACHSTATE(2,"}SendMsgBuf end ");
1373   return sent;
1374 }
1375
1376 void EnqueueMsg(void *m, int size, int node)
1377 {
1378   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1379   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1380   msg_tmp->msg = m;
1381   msg_tmp->size = size;
1382   msg_tmp->destpe = node;
1383         
1384 #if CMK_SMP_TRACE_COMMTHREAD
1385         msg_tmp->srcpe = CmiMyPe();
1386 #endif  
1387
1388 #if MULTI_SENDQUEUE
1389   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1390 #else
1391   CmiLock(sendMsgBufLock);
1392   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1393   CmiUnlock(sendMsgBufLock);
1394 #endif
1395         
1396   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1397 }
1398
1399 #endif
1400
1401 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1402 {
1403   CmiState cs = CmiGetState();
1404   SMSG_LIST *msg_tmp;
1405   CmiUInt2  rank, node;
1406
1407   if(destPE == cs->pe) {
1408     char *dupmsg = (char *) CmiAlloc(size);
1409     memcpy(dupmsg, msg, size);
1410     CmiSendSelf(dupmsg);
1411     return 0;
1412   }
1413   CQdCreate(CpvAccess(cQdState), 1);
1414 #if CMK_SMP
1415   node = CmiNodeOf(destPE);
1416   rank = CmiRankOf(destPE);
1417   if (node == CmiMyNode())  {
1418     CmiPushPE(rank, msg);
1419     return 0;
1420   }
1421   CMI_DEST_RANK(msg) = rank;
1422   EnqueueMsg(msg, size, node);
1423   return 0;
1424 #else
1425   /* non smp */
1426   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1427   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1428   msg_tmp->msg = msg;
1429   msg_tmp->next = 0;
1430   while (MsgQueueLen > request_max) {
1431         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1432         CmiReleaseSentMessages();
1433         PumpMsgs();
1434   }
1435 #if CMK_ERROR_CHECKING
1436   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1437 #endif
1438   CMI_SET_CHECKSUM(msg, size);
1439
1440 #if MPI_POST_RECV_COUNT > 0
1441                 if(size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE){
1442           START_EVENT();
1443           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1444                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1445           END_EVENT(40);
1446         }
1447         else {
1448           START_EVENT();
1449           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1450                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1451           END_EVENT(40);
1452         }
1453 #else
1454   START_EVENT();
1455   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1456     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1457   /*END_EVENT(40);*/
1458   #if CMK_TRACE_COMMOVERHEAD
1459         char tmp[64];
1460         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1461         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1462   #endif
1463 #endif
1464
1465   MsgQueueLen++;
1466   if(sent_msgs==0)
1467     sent_msgs = msg_tmp;
1468   else
1469     end_sent->next = msg_tmp;
1470   end_sent = msg_tmp;
1471   return (CmiCommHandle) &(msg_tmp->req);
1472 #endif              /* non-smp */
1473 }
1474
1475 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1476 {
1477   CMI_SET_BROADCAST_ROOT(msg, 0);
1478   CmiAsyncSendFn_(destPE, size, msg);
1479 }
1480
1481 void CmiFreeSendFn(int destPE, int size, char *msg)
1482 {
1483   CmiState cs = CmiGetState();
1484   CMI_SET_BROADCAST_ROOT(msg, 0);
1485
1486   if (cs->pe==destPE) {
1487     CmiSendSelf(msg);
1488   } else {
1489     CmiAsyncSendFn_(destPE, size, msg);
1490   }
1491 }
1492
1493 /*********************** BROADCAST FUNCTIONS **********************/
1494
1495 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1496 void CmiSyncSendFn1(int destPE, int size, char *msg)
1497 {
1498   CmiState cs = CmiGetState();
1499   char *dupmsg = (char *) CmiAlloc(size);
1500   memcpy(dupmsg, msg, size);
1501   if (cs->pe==destPE)
1502     CmiSendSelf(dupmsg);
1503   else
1504     CmiAsyncSendFn_(destPE, size, dupmsg);
1505 }
1506
1507 /* send msg to its spanning children in broadcast. G. Zheng */
1508 void SendSpanningChildren(int size, char *msg)
1509 {
1510   CmiState cs = CmiGetState();
1511   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1512   int startnode = CmiNodeOf(startpe);
1513   int i, exceptRank;
1514         
1515    /* first send msgs to other nodes */
1516   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1517   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1518     int nd = CmiMyNode()-startnode;
1519     if (nd<0) nd+=CmiNumNodes();
1520     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1521     if (nd > CmiNumNodes() - 1) break;
1522     nd += startnode;
1523     nd = nd%CmiNumNodes();
1524     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1525 #if CMK_SMP
1526       /* always send to the first rank of other nodes */
1527     char *newmsg = CmiCopyMsg(msg, size);
1528     CMI_DEST_RANK(newmsg) = 0;
1529     EnqueueMsg(newmsg, size, nd);
1530 #else
1531     CmiSyncSendFn1(nd, size, msg);
1532 #endif
1533   }
1534 #if CMK_SMP  
1535    /* second send msgs to my peers on this node */
1536   /* FIXME: now it's just a flat p2p send!! When node size is large,
1537    * it should also be sent in a tree
1538    */
1539    exceptRank = CMI_DEST_RANK(msg);
1540    for(i=0; i<exceptRank; i++){
1541            CmiPushPE(i, CmiCopyMsg(msg, size));
1542    }
1543    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1544            CmiPushPE(i, CmiCopyMsg(msg, size));
1545    }
1546 #endif
1547 }
1548
1549 #include <math.h>
1550
1551 /* send msg along the hypercube in broadcast. (Sameer) */
1552 void SendHypercube(int size, char *msg)
1553 {
1554   CmiState cs = CmiGetState();
1555   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1556   int startnode = CmiNodeOf(startpe);
1557   int i, exceptRank, cnt, tmp, relPE;
1558   int dims=0;
1559
1560   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1561   tmp = CmiNumNodes()-1;
1562   while(tmp>0){
1563           dims++;
1564           tmp = tmp >> 1;
1565   }
1566   if(CmiNumNodes()==1) dims=1;
1567   
1568    /* first send msgs to other nodes */  
1569   relPE = CmiMyNode()-startnode;
1570   if(relPE < 0) relPE += CmiNumNodes();
1571   cnt=0;
1572   tmp = relPE;
1573   /* count how many zeros (in binary format) relPE has */
1574   for(i=0; i<dims; i++, cnt++){
1575     if(tmp & 1 == 1) break;
1576     tmp = tmp >> 1;
1577   }
1578   
1579   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1580   for (i = cnt-1; i >= 0; i--) {
1581     int nd = relPE + (1 << i);
1582         if(nd >= CmiNumNodes()) continue;
1583         nd = (nd+startnode)%CmiNumNodes();
1584         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1585 #if CMK_SMP
1586     /* always send to the first rank of other nodes */
1587     char *newmsg = CmiCopyMsg(msg, size);
1588     CMI_DEST_RANK(newmsg) = 0;
1589     EnqueueMsg(newmsg, size, nd);
1590 #else
1591         CmiSyncSendFn1(nd, size, msg);
1592 #endif
1593   }
1594   
1595 #if CMK_SMP
1596    /* second send msgs to my peers on this node */
1597    /* FIXME: now it's just a flat p2p send!! When node size is large,
1598     * it should also be sent in a tree
1599     */
1600    exceptRank = CMI_DEST_RANK(msg);
1601    for(i=0; i<exceptRank; i++){
1602            CmiPushPE(i, CmiCopyMsg(msg, size));
1603    }
1604    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1605            CmiPushPE(i, CmiCopyMsg(msg, size));
1606    }
1607 #endif
1608 }
1609
1610 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1611 {
1612   CmiState cs = CmiGetState();
1613
1614 #if CMK_SMP     
1615   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1616   CMI_DEST_RANK(msg) = CmiMyRank();
1617 #endif
1618         
1619 #if CMK_BROADCAST_SPANNING_TREE
1620   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1621   SendSpanningChildren(size, msg);
1622
1623 #elif CMK_BROADCAST_HYPERCUBE
1624   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1625   SendHypercube(size, msg);
1626
1627 #else
1628   int i;
1629
1630   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1631     CmiSyncSendFn(i, size,msg) ;
1632   for ( i=0; i<cs->pe; i++ )
1633     CmiSyncSendFn(i, size,msg) ;
1634 #endif
1635
1636   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1637 }
1638
1639
1640 /*  FIXME: luckily async is never used  G. Zheng */
1641 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1642 {
1643   CmiState cs = CmiGetState();
1644   int i ;
1645
1646   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1647     CmiAsyncSendFn(i,size,msg) ;
1648   for ( i=0; i<cs->pe; i++ )
1649     CmiAsyncSendFn(i,size,msg) ;
1650
1651   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1652   CmiAbort("CmiAsyncBroadcastFn should never be called");
1653   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1654 }
1655
1656 void CmiFreeBroadcastFn(int size, char *msg)
1657 {
1658    CmiSyncBroadcastFn(size,msg);
1659    CmiFree(msg);
1660 }
1661
1662 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1663 {
1664
1665 #if CMK_SMP     
1666   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1667   CMI_DEST_RANK(msg) = CmiMyRank();
1668 #endif
1669
1670 #if CMK_BROADCAST_SPANNING_TREE
1671   CmiState cs = CmiGetState();
1672   CmiSyncSendFn(cs->pe, size,msg) ;
1673   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1674   SendSpanningChildren(size, msg);
1675
1676 #elif CMK_BROADCAST_HYPERCUBE
1677   CmiState cs = CmiGetState();
1678   CmiSyncSendFn(cs->pe, size,msg) ;
1679   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1680   SendHypercube(size, msg);
1681
1682 #else
1683     int i ;
1684
1685   for ( i=0; i<_Cmi_numpes; i++ )
1686     CmiSyncSendFn(i,size,msg) ;
1687 #endif
1688
1689   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1690 }
1691
1692 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1693 {
1694   int i ;
1695
1696   for ( i=1; i<_Cmi_numpes; i++ )
1697     CmiAsyncSendFn(i,size,msg) ;
1698
1699   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1700
1701   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1702 }
1703
1704 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1705 {
1706 #if CMK_SMP     
1707   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1708   CMI_DEST_RANK(msg) = CmiMyRank();
1709 #endif
1710
1711 #if CMK_BROADCAST_SPANNING_TREE
1712   CmiState cs = CmiGetState();
1713   CmiSyncSendFn(cs->pe, size,msg) ;
1714   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1715   SendSpanningChildren(size, msg);
1716
1717 #elif CMK_BROADCAST_HYPERCUBE
1718   CmiState cs = CmiGetState();
1719   CmiSyncSendFn(cs->pe, size,msg) ;
1720   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1721   SendHypercube(size, msg);
1722
1723 #else
1724   int i ;
1725
1726   for ( i=0; i<_Cmi_numpes; i++ )
1727     CmiSyncSendFn(i,size,msg) ;
1728 #endif
1729   CmiFree(msg) ;
1730   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1731 }
1732
1733 #if CMK_NODE_QUEUE_AVAILABLE
1734
1735 static void CmiSendNodeSelf(char *msg)
1736 {
1737 #if CMK_IMMEDIATE_MSG
1738 #if 0
1739     if (CmiIsImmediate(msg) && !_immRunning) {
1740       /*CmiHandleImmediateMessage(msg); */
1741       CmiPushImmediateMsg(msg);
1742       CmiHandleImmediate();
1743       return;
1744     }
1745 #endif
1746     if (CmiIsImmediate(msg))
1747     {
1748       CmiPushImmediateMsg(msg);
1749       if (!_immRunning) CmiHandleImmediate();
1750       return;
1751     }
1752 #endif
1753     CQdCreate(CpvAccess(cQdState), 1);
1754     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1755     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1756     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1757 }
1758
1759 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1760 {
1761   int i;
1762   SMSG_LIST *msg_tmp;
1763   char *dupmsg;
1764
1765   CMI_SET_BROADCAST_ROOT(msg, 0);
1766   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1767   switch (dstNode) {
1768   case NODE_BROADCAST_ALL:
1769     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1770   case NODE_BROADCAST_OTHERS:
1771     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1772     for (i=0; i<_Cmi_numnodes; i++)
1773       if (i!=_Cmi_mynode) {
1774         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1775       }
1776     break;
1777   default:
1778     dupmsg = (char *)CmiCopyMsg(msg,size);
1779     if(dstNode == _Cmi_mynode) {
1780       CmiSendNodeSelf(dupmsg);
1781     }
1782     else {
1783       CQdCreate(CpvAccess(cQdState), 1);
1784       EnqueueMsg(dupmsg, size, dstNode);
1785     }
1786   }
1787   return 0;
1788 }
1789
1790 void CmiSyncNodeSendFn(int p, int s, char *m)
1791 {
1792   CmiAsyncNodeSendFn(p, s, m);
1793 }
1794
1795 /* need */
1796 void CmiFreeNodeSendFn(int p, int s, char *m)
1797 {
1798   CmiAsyncNodeSendFn(p, s, m);
1799   CmiFree(m);
1800 }
1801
1802 /* need */
1803 void CmiSyncNodeBroadcastFn(int s, char *m)
1804 {
1805   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1806 }
1807
1808 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1809 {
1810 }
1811
1812 /* need */
1813 void CmiFreeNodeBroadcastFn(int s, char *m)
1814 {
1815   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1816   CmiFree(m);
1817 }
1818
1819 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1820 {
1821   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1822 }
1823
1824 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1825 {
1826   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1827 }
1828
1829 /* need */
1830 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1831 {
1832   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1833   CmiFree(m);
1834 }
1835 #endif
1836
1837 /************************** MAIN ***********************************/
1838 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1839
1840 void ConverseExit(void)
1841 {
1842 #if ! CMK_SMP
1843   while(!CmiAllAsyncMsgsSent()) {
1844     PumpMsgs();
1845     CmiReleaseSentMessages();
1846   }
1847   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1848     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1849
1850   ConverseCommonExit();
1851 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1852   if (CmiMyPe() == 0){
1853     CmiPrintf("End of program\n");
1854 #if MPI_POST_RECV_COUNT > 0
1855     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1856 #endif
1857 }
1858 #endif
1859 #if ! CMK_AUTOBUILD
1860   signal(SIGINT, signal_int);
1861   MPI_Finalize();
1862 #endif
1863   exit(0);
1864
1865 #else
1866     /* SMP version, communication thread will exit */
1867   ConverseCommonExit();
1868   /* atomic increment */
1869   CmiLock(exitLock);
1870   inexit++;
1871   CmiUnlock(exitLock);
1872   while (1) CmiYield();
1873 #endif
1874 }
1875
1876 static void registerMPITraceEvents() {
1877 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1878     traceRegisterUserEvent("MPI_Barrier", 10);
1879     traceRegisterUserEvent("MPI_Send", 20);
1880     traceRegisterUserEvent("MPI_Recv", 30);
1881     traceRegisterUserEvent("MPI_Isend", 40);
1882     traceRegisterUserEvent("MPI_Irecv", 50);
1883     traceRegisterUserEvent("MPI_Test", 60);
1884     traceRegisterUserEvent("MPI_Iprobe", 70);
1885 #endif
1886 }
1887
1888
1889 static char     **Cmi_argv;
1890 static char     **Cmi_argvcopy;
1891 static CmiStartFn Cmi_startfn;   /* The start function */
1892 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1893
1894 typedef struct {
1895   int sleepMs; /*Milliseconds to sleep while idle*/
1896   int nIdles; /*Number of times we've been idle in a row*/
1897   CmiState cs; /*Machine state*/
1898 } CmiIdleState;
1899
1900 static CmiIdleState *CmiNotifyGetState(void)
1901 {
1902   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1903   s->sleepMs=0;
1904   s->nIdles=0;
1905   s->cs=CmiGetState();
1906   return s;
1907 }
1908
1909 static void CmiNotifyBeginIdle(CmiIdleState *s)
1910 {
1911   s->sleepMs=0;
1912   s->nIdles=0;
1913 }
1914
1915 static void CmiNotifyStillIdle(CmiIdleState *s)
1916 {
1917 #if ! CMK_SMP
1918   CmiReleaseSentMessages();
1919   PumpMsgs();
1920 #else
1921 /*  CmiYield();  */
1922 #endif
1923
1924 #if 1
1925   {
1926   int nSpins=20; /*Number of times to spin before sleeping*/
1927   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1928   s->nIdles++;
1929   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1930     s->sleepMs+=2;
1931     if (s->sleepMs>10) s->sleepMs=10;
1932   }
1933   /*Comm. thread will listen on sockets-- just sleep*/
1934   if (s->sleepMs>0) {
1935     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1936     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1937     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1938   }
1939   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1940   }
1941 #endif
1942 }
1943
1944 #if MACHINE_DEBUG_LOG
1945 FILE *debugLog = NULL;
1946 #endif
1947
1948 static int machine_exit_idx;
1949 static void machine_exit(char *m) {
1950   EmergencyExit();
1951   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1952   fflush(stdout);
1953   CmiNodeBarrier();
1954   if (CmiMyRank() == 0) {
1955     MPI_Barrier(MPI_COMM_WORLD);
1956     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1957     MPI_Abort(MPI_COMM_WORLD, 1);
1958   } else {
1959     while (1) CmiYield();
1960   }
1961 }
1962
1963 static void KillOnAllSigs(int sigNo) {
1964   static int already_in_signal_handler = 0;
1965   char *m;
1966   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1967   already_in_signal_handler = 1;
1968 #if CMK_CCS_AVAILABLE
1969   if (CpvAccess(cmiArgDebugFlag)) {
1970     CpdNotify(CPD_SIGNAL, sigNo);
1971     CpdFreeze();
1972   }
1973 #endif
1974   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1975       "Signal: %d\n",CmiMyPe(),sigNo);
1976   CmiPrintStackTrace(1);
1977
1978   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1979   CmiSetHandler(m, machine_exit_idx);
1980   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1981   machine_exit(m);
1982 }
1983
1984 static void ConverseRunPE(int everReturn)
1985 {
1986   CmiIdleState *s=CmiNotifyGetState();
1987   CmiState cs;
1988   char** CmiMyArgv;
1989
1990 #if MPI_POST_RECV_COUNT > 0
1991         int doInit = 1;
1992         int i;
1993         
1994 #if CMK_SMP
1995         if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1996 #endif
1997         
1998         /* Currently, in mpi smp, the main thread will be the comm thread, so
1999      *  only the comm thread should post recvs. Cpvs, however, need to be
2000          * created on rank 0 (the ptrs to the actual cpv memory), while
2001      * other ranks are busy waiting for this to finish. So cpv initialize
2002          * routines have to be called on every ranks, although they are only
2003          * useful on comm thread (whose rank is not zero) -Chao Mei
2004          */
2005         CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2006         CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2007         CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2008         CpvInitialize(char*,CmiPostedRecvBuffers);
2009
2010         if(doInit){
2011                 /* Post some extra recvs to help out with incoming messages */
2012                 /* On some MPIs the messages are unexpected and thus slow */
2013
2014                 /* An array of request handles for posted recvs */
2015                 CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2016
2017                 /* An array of buffers for posted recvs */
2018                 CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2019
2020                 /* Post Recvs */
2021                 for(i=0; i<MPI_POST_RECV_COUNT; i++){
2022                         printf("Pre post recv %d\n", i);
2023                         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2024                                                 MPI_POST_RECV_SIZE,
2025                                                 MPI_BYTE,
2026                                                 MPI_ANY_SOURCE,
2027                                                 POST_RECV_TAG,
2028                                                 MPI_COMM_WORLD,
2029                                 &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2030                                         CmiAbort("MPI_Irecv failed\n");
2031                 }
2032         }
2033 #endif
2034         
2035   CmiNodeAllBarrier();
2036         
2037   cs = CmiGetState();
2038   CpvInitialize(void *,CmiLocalQueue);
2039   CpvAccess(CmiLocalQueue) = cs->localqueue;
2040
2041   if (CmiMyRank())
2042     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
2043   else
2044     CmiMyArgv=Cmi_argv;
2045
2046   CthInit(CmiMyArgv);
2047
2048   ConverseCommonInit(CmiMyArgv);
2049   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
2050
2051 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2052   CpvInitialize(double, projTraceStart);
2053   /* only PE 0 needs to care about registration (to generate sts file). */
2054   if (CmiMyPe() == 0) {
2055     registerMachineUserEventsFunction(&registerMPITraceEvents);
2056   }
2057 #endif
2058
2059   /* initialize the network progress counter*/
2060   /* Network progress function is used to poll the network when for
2061      messages. This flushes receive buffers on some  implementations*/
2062   CpvInitialize(unsigned , networkProgressCount);
2063   CpvAccess(networkProgressCount) = 0;
2064
2065 #if CMK_SMP
2066   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2067   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2068 #else
2069   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2070 #endif
2071
2072 #if MACHINE_DEBUG_LOG
2073   if (CmiMyRank() == 0) {
2074     char ln[200];
2075     sprintf(ln,"debugLog.%d",CmiMyNode());
2076     debugLog=fopen(ln,"w");
2077   }
2078 #endif
2079
2080   /* Converse initialization finishes, immediate messages can be processed.
2081      node barrier previously should take care of the node synchronization */
2082   _immediateReady = 1;
2083
2084   /* communication thread */
2085   if (CmiMyRank() == CmiMyNodeSize()) {
2086     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2087     while (1) CommunicationServerThread(5);
2088   }
2089   else {  /* worker thread */
2090   if (!everReturn) {
2091     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2092     if (Cmi_usrsched==0) CsdScheduler(-1);
2093     ConverseExit();
2094   }
2095   }
2096 }
2097
2098 static char *thread_level_tostring(int thread_level)
2099 {
2100 #if CMK_MPI_INIT_THREAD
2101   switch (thread_level) {
2102   case MPI_THREAD_SINGLE:
2103       return "MPI_THREAD_SINGLE";
2104   case MPI_THREAD_FUNNELED:
2105       return "MPI_THREAD_FUNNELED";
2106   case MPI_THREAD_SERIALIZED:
2107       return "MPI_THREAD_SERIALIZED";
2108   case MPI_THREAD_MULTIPLE :
2109       return "MPI_THREAD_MULTIPLE ";
2110   default: {
2111       char *str = (char*)malloc(5);
2112       sprintf(str,"%d", thread_level);
2113       return str;
2114       }
2115   }
2116   return  "unknown";
2117 #else
2118   char *str = (char*)malloc(5);
2119   sprintf(str,"%d", thread_level);
2120   return str;
2121 #endif
2122 }
2123
2124 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2125 {
2126   int n,i;
2127   int ver, subver;
2128   int provided;
2129   int thread_level;
2130
2131 #if MACHINE_DEBUG
2132   debugLog=NULL;
2133 #endif
2134 #if CMK_USE_HP_MAIN_FIX
2135 #if FOR_CPLUS
2136   _main(argc,argv);
2137 #endif
2138 #endif
2139
2140 #if CMK_MPI_INIT_THREAD
2141 #if CMK_SMP
2142   thread_level = MPI_THREAD_FUNNELED;
2143 #else
2144   thread_level = MPI_THREAD_SINGLE;
2145 #endif
2146   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2147   _thread_provided = provided;
2148 #else
2149   MPI_Init(&argc, &argv);
2150   thread_level = 0;
2151   provided = -1;
2152 #endif
2153   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2154   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2155
2156   MPI_Get_version(&ver, &subver);
2157   if (_Cmi_mynode == 0) {
2158     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2159   }
2160
2161   /* processor per node */
2162   _Cmi_mynodesize = 1;
2163   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2164     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2165 #if ! CMK_SMP
2166   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2167     CmiAbort("+ppn cannot be used in non SMP version!\n");
2168 #endif
2169   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2170   if (idleblock && _Cmi_mynode == 0) {
2171     printf("Charm++: Running in idle blocking mode.\n");
2172   }
2173
2174   /* setup signal handlers */
2175   signal(SIGSEGV, KillOnAllSigs);
2176   signal(SIGFPE, KillOnAllSigs);
2177   signal(SIGILL, KillOnAllSigs);
2178   signal_int = signal(SIGINT, KillOnAllSigs);
2179   signal(SIGTERM, KillOnAllSigs);
2180   signal(SIGABRT, KillOnAllSigs);
2181 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2182   signal(SIGQUIT, KillOnAllSigs);
2183   signal(SIGBUS, KillOnAllSigs);
2184 /*#     if CMK_HANDLE_SIGUSR
2185   signal(SIGUSR1, HandleUserSignals);
2186   signal(SIGUSR2, HandleUserSignals);
2187 #     endif*/
2188 #   endif /*UNIX*/
2189   
2190 #if CMK_NO_OUTSTANDING_SENDS
2191   no_outstanding_sends=1;
2192 #endif
2193   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2194     no_outstanding_sends = 1;
2195     if (_Cmi_mynode == 0)
2196       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2197         no_outstanding_sends?"":" not");
2198   }
2199   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2200   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2201   Cmi_argvcopy = CmiCopyArgs(argv);
2202   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2203   /* find dim = log2(numpes), to pretend we are a hypercube */
2204   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2205     Cmi_dim++ ;
2206  /* CmiSpanTreeInit();*/
2207   request_max=MAX_QLEN;
2208   CmiGetArgInt(argv,"+requestmax",&request_max);
2209   /*printf("request max=%d\n", request_max);*/
2210
2211   /* checksum flag */
2212   if (CmiGetArgFlag(argv,"+checksum")) {
2213 #if !CMK_OPTIMIZE
2214     checksum_flag = 1;
2215     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2216 #else
2217     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2218 #endif
2219   }
2220
2221   {
2222   int debug = CmiGetArgFlag(argv,"++debug");
2223   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2224   if (debug || debug_no_pause)
2225   {   /*Pause so user has a chance to start and attach debugger*/
2226 #if CMK_HAS_GETPID
2227     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2228     fflush(stdout);
2229     if (!debug_no_pause)
2230       sleep(15);
2231 #else
2232     printf("++debug ignored.\n");
2233 #endif
2234   }
2235   }
2236
2237   /* CmiTimerInit(); */
2238
2239 #if 0
2240   CthInit(argv);
2241   ConverseCommonInit(argv);
2242
2243   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2244   if (initret==0) {
2245     fn(CmiGetArgc(argv), argv);
2246     if (usched==0) CsdScheduler(-1);
2247     ConverseExit();
2248   }
2249 #endif
2250
2251   CsvInitialize(CmiNodeState, NodeState);
2252   CmiNodeStateInit(&CsvAccess(NodeState));
2253
2254   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2255
2256   for (i=0; i<_Cmi_mynodesize+1; i++) {
2257 #if MULTI_SENDQUEUE
2258     procState[i].sendMsgBuf = PCQueueCreate();
2259 #endif
2260     procState[i].recvLock = CmiCreateLock();
2261   }
2262 #if CMK_SMP
2263 #if !MULTI_SENDQUEUE
2264   sendMsgBuf = PCQueueCreate();
2265   sendMsgBufLock = CmiCreateLock();
2266 #endif
2267   exitLock = CmiCreateLock();            /* exit count lock */
2268 #endif
2269
2270   /* Network progress function is used to poll the network when for
2271      messages. This flushes receive buffers on some  implementations*/
2272   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2273   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2274
2275   CmiStartThreads(argv);
2276   ConverseRunPE(initret);
2277 }
2278
2279 /***********************************************************************
2280  *
2281  * Abort function:
2282  *
2283  ************************************************************************/
2284
2285 void CmiAbort(const char *message)
2286 {
2287   char *m;
2288   /* if CharmDebug is attached simply try to send a message to it */
2289 #if CMK_CCS_AVAILABLE
2290   if (CpvAccess(cmiArgDebugFlag)) {
2291     CpdNotify(CPD_ABORT, message);
2292     CpdFreeze();
2293   }
2294 #endif  
2295   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2296         "Reason: %s\n",CmiMyPe(),message);
2297  /*  CmiError(message); */
2298   CmiPrintStackTrace(0);
2299   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2300   CmiSetHandler(m, machine_exit_idx);
2301   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2302   machine_exit(m);
2303   /* Program never reaches here */
2304   MPI_Abort(MPI_COMM_WORLD, 1);
2305 }
2306
2307
2308 #if 0
2309
2310 /* ****************************************************************** */
2311 /*    The following internal functions implement recd msg queue       */
2312 /* ****************************************************************** */
2313
2314 static void ** AllocBlock(unsigned int len)
2315 {
2316   void ** blk;
2317
2318   blk=(void **)CmiAlloc(len*sizeof(void *));
2319   if(blk==(void **)0) {
2320     CmiError("Cannot Allocate Memory!\n");
2321     MPI_Abort(MPI_COMM_WORLD, 1);
2322   }
2323   return blk;
2324 }
2325
2326 static void
2327 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2328 {
2329   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2330   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2331 }
2332
2333 void recdQueueInit(void)
2334 {
2335   recdQueue_blk = AllocBlock(BLK_LEN);
2336   recdQueue_blk_len = BLK_LEN;
2337   recdQueue_first = 0;
2338   recdQueue_len = 0;
2339 }
2340
2341 void recdQueueAddToBack(void *element)
2342 {
2343 #if NODE_0_IS_CONVHOST
2344   inside_comm = 1;
2345 #endif
2346   if(recdQueue_len==recdQueue_blk_len) {
2347     void **blk;
2348     recdQueue_blk_len *= 3;
2349     blk = AllocBlock(recdQueue_blk_len);
2350     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2351     CmiFree(recdQueue_blk);
2352     recdQueue_blk = blk;
2353     recdQueue_first = 0;
2354   }
2355   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2356 #if NODE_0_IS_CONVHOST
2357   inside_comm = 0;
2358 #endif
2359 }
2360
2361
2362 void * recdQueueRemoveFromFront(void)
2363 {
2364   if(recdQueue_len) {
2365     void *element;
2366     element = recdQueue_blk[recdQueue_first++];
2367     recdQueue_first %= recdQueue_blk_len;
2368     recdQueue_len--;
2369     return element;
2370   }
2371   return 0;
2372 }
2373
2374 #endif
2375
2376 /*@}*/