c01d809d5d6699ba113d785a24eaf75c399d73b4
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #if CMK_SMP
51 /* currently only considering the smp case */
52 #define CMI_DYNAMIC_EXERT_CAP 0
53 /* This macro defines the max number of msgs in the sender msg buffer 
54  * that is allowed for recving operation to continue
55  */
56 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 8
57 #define CMI_DYNAMIC_MAXCAPSIZE 1000
58 #define CMI_DYNAMIC_SEND_CAPSIZE 6
59 #define CMI_DYNAMIC_RECV_CAPSIZE 6
60 /* initial values, -1 indiates there's no cap */
61 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
62 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
63 #endif
64
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 3
67 #endif
68
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
72
73
74 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
75 #define CMI_MPI_TRACE_MOREDETAILED 0
76 #undef CMI_MPI_TRACE_USEREVENTS
77 #define CMI_MPI_TRACE_USEREVENTS 1
78 #else
79 #undef CMK_SMP_TRACE_COMMTHREAD
80 #define CMK_SMP_TRACE_COMMTHREAD 0
81 #endif
82
83 #define CMK_TRACE_COMMOVERHEAD 0
84 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
85 #undef CMI_MPI_TRACE_USEREVENTS
86 #define CMI_MPI_TRACE_USEREVENTS 1
87 #else
88 #undef CMK_TRACE_COMMOVERHEAD
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #endif
91
92 #include "machine.h"
93
94 #include "pcqueue.h"
95
96 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
97
98 #if CMK_BLUEGENEL
99 #define MAX_QLEN 8
100 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
101 #else
102 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
103 #define MAX_QLEN 200
104 #endif
105
106 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 # define  START_EVENT()
112 # define  END_EVENT(x)
113 #endif
114
115 /*
116     To reduce the buffer used in broadcast and distribute the load from
117   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
118   spanning tree broadcast algorithm.
119     This will use the fourth short in message as an indicator of spanning tree
120   root.
121 */
122 #define CMK_BROADCAST_SPANNING_TREE    1
123 #define CMK_BROADCAST_HYPERCUBE        0
124
125 #define BROADCAST_SPANNING_FACTOR      4
126 /* The number of children used when a msg is broadcast inside a node */
127 #define BROADCAST_SPANNING_INTRA_FACTOR      8
128
129 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
130 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
131
132 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
133 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
134
135 /* FIXME: need a random number that everyone agrees ! */
136 #define CHARM_MAGIC_NUMBER               126
137
138 #if CMK_ERROR_CHECKING
139 static int checksum_flag = 0;
140 #define CMI_SET_CHECKSUM(msg, len)      \
141         if (checksum_flag)  {   \
142           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
143           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
144         }
145 #define CMI_CHECK_CHECKSUM(msg, len)    \
146         if (checksum_flag)      \
147           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
148             CmiAbort("Fatal error: checksum doesn't agree!\n");
149 #else
150 #define CMI_SET_CHECKSUM(msg, len)
151 #define CMI_CHECK_CHECKSUM(msg, len)
152 #endif
153
154 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
155 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
156 #else
157 #define CMI_SET_BROADCAST_ROOT(msg, root) 
158 #endif
159
160
161 /** 
162     If MPI_POST_RECV is defined, we provide default values for size 
163     and number of posted recieves. If MPI_POST_RECV_COUNT is set
164     then a default value for MPI_POST_RECV_SIZE is used if not specified
165     by the user.
166 */
167 #ifdef MPI_POST_RECV
168 #define MPI_POST_RECV_COUNT 10
169 #undef MPI_POST_RECV
170 #endif
171 #if MPI_POST_RECV_COUNT > 0
172 #warning "Using MPI posted receives which have not yet been tested"
173 #ifndef MPI_POST_RECV_SIZE
174 #define MPI_POST_RECV_SIZE 200
175 #endif
176 /* #undef  MPI_POST_RECV_DEBUG  */
177 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
178 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
179 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
180 CpvDeclare(char*,CmiPostedRecvBuffers);
181 #endif
182
183 /*
184  to avoid MPI's in order delivery, changing MPI Tag all the time
185 */
186 #define TAG     1375
187
188 #if MPI_POST_RECV_COUNT > 0
189 #define POST_RECV_TAG TAG+1
190 #define BARRIER_ZERO_TAG TAG
191 #else
192 #define BARRIER_ZERO_TAG     1375
193 #endif
194
195 #include <signal.h>
196 void (*signal_int)(int);
197
198 /*
199 static int mpi_tag = TAG;
200 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
201 */
202
203 static int        _thread_provided = -1;
204 int               _Cmi_numpes;
205 int               _Cmi_mynode;    /* Which address space am I */
206 int               _Cmi_mynodesize;/* Number of processors in my address space */
207 int               _Cmi_numnodes;  /* Total number of address spaces */
208 int               _Cmi_numpes;    /* Total number of processors */
209 static int        Cmi_nodestart; /* First processor in this address space */
210 CpvDeclare(void*, CmiLocalQueue);
211
212 /*Network progress utility variables. Period controls the rate at
213   which the network poll is called */
214 CpvDeclare(unsigned , networkProgressCount);
215 int networkProgressPeriod;
216
217 int               idleblock = 0;
218
219 #define BLK_LEN  512
220
221 #if CMK_NODE_QUEUE_AVAILABLE
222 #define DGRAM_NODEMESSAGE   (0xFB)
223
224 #define NODE_BROADCAST_OTHERS (-1)
225 #define NODE_BROADCAST_ALL    (-2)
226 #endif
227
228 #if 0
229 static void **recdQueue_blk;
230 static unsigned int recdQueue_blk_len;
231 static unsigned int recdQueue_first;
232 static unsigned int recdQueue_len;
233 static void recdQueueInit(void);
234 static void recdQueueAddToBack(void *element);
235 static void *recdQueueRemoveFromFront(void);
236 #endif
237
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 typedef struct msg_list {
243      char *msg;
244      struct msg_list *next;
245      int size, destpe;
246
247 #if CMK_SMP_TRACE_COMMTHREAD
248         int srcpe;
249 #endif  
250         
251      MPI_Request req;
252 } SMSG_LIST;
253
254 int MsgQueueLen=0;
255 static int request_max;
256
257 static SMSG_LIST *sent_msgs=0;
258 static SMSG_LIST *end_sent=0;
259
260 static int Cmi_dim;
261
262 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
263
264 #if NODE_0_IS_CONVHOST
265 int inside_comm = 0;
266 #endif
267
268 void CmiAbort(const char *message);
269 static void PerrorExit(const char *msg);
270
271 void SendSpanningChildren(int size, char *msg);
272 void SendHypercube(int size, char *msg);
273
274 static void PerrorExit(const char *msg)
275 {
276   perror(msg);
277   exit(1);
278 }
279
280 extern unsigned char computeCheckSum(unsigned char *data, int len);
281
282 /**************************  TIMER FUNCTIONS **************************/
283
284 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
285
286 /* MPI calls are not threadsafe, even the timer on some machines */
287 static CmiNodeLock  timerLock = 0;
288 static int _absoluteTime = 0;
289 static double starttimer = 0;
290 static int _is_global = 0;
291
292 int CmiTimerIsSynchronized()
293 {
294   int  flag;
295   void *v;
296
297   /*  check if it using synchronized timer */
298   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
299     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
300   if (flag) {
301     _is_global = *(int*)v;
302     if (_is_global && CmiMyPe() == 0)
303       printf("Charm++> MPI timer is synchronized!\n");
304   }
305   return _is_global;
306 }
307
308 int CmiTimerAbsolute()
309 {       
310   return _absoluteTime;
311 }
312
313 double CmiStartTimer()
314 {
315   return 0.0;
316 }
317
318 double CmiInitTime()
319 {
320   return starttimer;
321 }
322
323 void CmiTimerInit(char **argv)
324 {
325   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
326
327   _is_global = CmiTimerIsSynchronized();
328
329   if (_is_global) {
330     if (CmiMyRank() == 0) {
331       double minTimer;
332 #if CMK_TIMER_USE_XT3_DCLOCK
333       starttimer = dclock();
334 #else
335       starttimer = MPI_Wtime();
336 #endif
337
338       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
339                                   MPI_COMM_WORLD );
340       starttimer = minTimer;
341     }
342   }
343   else {  /* we don't have a synchronous timer, set our own start time */
344     CmiBarrier();
345     CmiBarrier();
346     CmiBarrier();
347 #if CMK_TIMER_USE_XT3_DCLOCK
348     starttimer = dclock();
349 #else
350     starttimer = MPI_Wtime();
351 #endif
352   }
353
354 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
355   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
356     timerLock = CmiCreateLock();
357 #endif
358   CmiNodeAllBarrier();          /* for smp */
359 }
360
361 /**
362  * Since the timerLock is never created, and is
363  * always NULL, then all the if-condition inside
364  * the timer functions could be disabled right
365  * now in the case of SMP. --Chao Mei
366  */
367 double CmiTimer(void)
368 {
369   double t;
370 #if 0 && CMK_SMP
371   if (timerLock) CmiLock(timerLock);
372 #endif
373
374 #if CMK_TIMER_USE_XT3_DCLOCK
375   t = dclock();
376 #else
377   t = MPI_Wtime();
378 #endif
379
380 #if 0 && CMK_SMP
381   if (timerLock) CmiUnlock(timerLock);
382 #endif
383
384   return _absoluteTime?t: (t-starttimer);
385 }
386
387 double CmiWallTimer(void)
388 {
389   double t;
390 #if 0 && CMK_SMP
391   if (timerLock) CmiLock(timerLock);
392 #endif
393
394 #if CMK_TIMER_USE_XT3_DCLOCK
395   t = dclock();
396 #else
397   t = MPI_Wtime();
398 #endif
399
400 #if 0 && CMK_SMP
401   if (timerLock) CmiUnlock(timerLock);
402 #endif
403
404   return _absoluteTime? t: (t-starttimer);
405 }
406
407 double CmiCpuTimer(void)
408 {
409   double t;
410 #if 0 && CMK_SMP
411   if (timerLock) CmiLock(timerLock);
412 #endif
413 #if CMK_TIMER_USE_XT3_DCLOCK
414   t = dclock() - starttimer;
415 #else
416   t = MPI_Wtime() - starttimer;
417 #endif
418 #if 0 && CMK_SMP
419   if (timerLock) CmiUnlock(timerLock);
420 #endif
421   return t;
422 }
423
424 #endif
425
426 /* must be called on all ranks including comm thread in SMP */
427 int CmiBarrier()
428 {
429 #if CMK_SMP
430     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
431   CmiNodeAllBarrier();
432   if (CmiMyRank() == CmiMyNodeSize()) 
433 #else
434   if (CmiMyRank() == 0) 
435 #endif
436   {
437 /**
438  *  The call of CmiBarrier is usually before the initialization
439  *  of trace module of Charm++, therefore, the START_EVENT
440  *  and END_EVENT are disabled here. -Chao Mei
441  */     
442     /*START_EVENT();*/
443
444     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
445         CmiAbort("Timernit: MPI_Barrier failed!\n");
446
447     /*END_EVENT(10);*/
448   }
449   CmiNodeAllBarrier();
450   return 0;
451 }
452
453 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
454 int CmiBarrierZero()
455 {
456   int i;
457 #if CMK_SMP
458   if (CmiMyRank() == CmiMyNodeSize()) 
459 #else
460   if (CmiMyRank() == 0) 
461 #endif
462   {
463     char msg[1];
464     MPI_Status sts;
465     if (CmiMyNode() == 0)  {
466       for (i=0; i<CmiNumNodes()-1; i++) {
467          START_EVENT();
468
469          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
470             CmiPrintf("MPI_Recv failed!\n");
471
472          END_EVENT(30);
473       }
474     }
475     else {
476       START_EVENT();
477
478       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
479          printf("MPI_Send failed!\n");
480
481       END_EVENT(20);
482     }
483   }
484   CmiNodeAllBarrier();
485   return 0;
486 }
487
488 typedef struct ProcState {
489 #if MULTI_SENDQUEUE
490 PCQueue      sendMsgBuf;       /* per processor message sending queue */
491 #endif
492 CmiNodeLock  recvLock;              /* for cs->recv */
493 } ProcState;
494
495 static ProcState  *procState;
496
497 #if CMK_SMP
498
499 #if !MULTI_SENDQUEUE
500 static PCQueue sendMsgBuf;
501 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
502 #endif
503
504 #endif
505
506 /************************************************************
507  *
508  * Processor state structure
509  *
510  ************************************************************/
511
512 /* fake Cmi_charmrun_fd */
513 static int Cmi_charmrun_fd = 0;
514 #include "machine-smp.c"
515
516 CsvDeclare(CmiNodeState, NodeState);
517
518 #include "immediate.c"
519
520 #if CMK_SHARED_VARS_UNAVAILABLE
521 /************ non SMP **************/
522 static struct CmiStateStruct Cmi_state;
523 int _Cmi_mype;
524 int _Cmi_myrank;
525
526 void CmiMemLock() {}
527 void CmiMemUnlock() {}
528
529 #define CmiGetState() (&Cmi_state)
530 #define CmiGetStateN(n) (&Cmi_state)
531
532 void CmiYield(void) { sleep(0); }
533
534 static void CmiStartThreads(char **argv)
535 {
536   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
537   _Cmi_mype = Cmi_nodestart;
538   _Cmi_myrank = 0;
539 }
540 #endif  /* non smp */
541
542 /*Add a message to this processor's receive queue, pe is a rank */
543 void CmiPushPE(int pe,void *msg)
544 {
545   CmiState cs = CmiGetStateN(pe);
546   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
547 #if CMK_IMMEDIATE_MSG
548   if (CmiIsImmediate(msg)) {
549 /*
550 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
551     CmiHandleMessage(msg);
552 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
553 */
554     /**(CmiUInt2 *)msg = pe;*/
555     CMI_DEST_RANK(msg) = pe;
556     CmiPushImmediateMsg(msg);
557     return;
558   }
559 #endif
560
561 #if CMK_SMP
562   CmiLock(procState[pe].recvLock);
563 #endif
564   PCQueuePush(cs->recv,msg);
565 #if CMK_SMP
566   CmiUnlock(procState[pe].recvLock);
567 #endif
568   CmiIdleLock_addMessage(&cs->idle);
569   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
570 }
571
572 #if CMK_NODE_QUEUE_AVAILABLE
573 /*Add a message to this processor's receive queue */
574 static void CmiPushNode(void *msg)
575 {
576   MACHSTATE(3,"Pushing message into NodeRecv queue");
577 #if CMK_IMMEDIATE_MSG
578   if (CmiIsImmediate(msg)) {
579     CMI_DEST_RANK(msg) = 0;
580     CmiPushImmediateMsg(msg);
581     return;
582   }
583 #endif
584   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
585   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
586   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
587   {
588   CmiState cs=CmiGetStateN(0);
589   CmiIdleLock_addMessage(&cs->idle);
590   }
591 }
592 #endif
593
594 #ifndef CmiMyPe
595 int CmiMyPe(void)
596 {
597   return CmiGetState()->pe;
598 }
599 #endif
600
601 #ifndef CmiMyRank
602 int CmiMyRank(void)
603 {
604   return CmiGetState()->rank;
605 }
606 #endif
607
608 #ifndef CmiNodeFirst
609 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
610 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
611 #endif
612
613 #ifndef CmiNodeOf
614 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
615 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
616 #endif
617
618 static size_t CmiAllAsyncMsgsSent(void)
619 {
620    SMSG_LIST *msg_tmp = sent_msgs;
621    MPI_Status sts;
622    int done;
623
624    while(msg_tmp!=0) {
625     done = 0;
626     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
627       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
628     if(!done)
629       return 0;
630     msg_tmp = msg_tmp->next;
631 /*    MsgQueueLen--; ????? */
632    }
633    return 1;
634 }
635
636 int CmiAsyncMsgSent(CmiCommHandle c) {
637
638   SMSG_LIST *msg_tmp = sent_msgs;
639   int done;
640   MPI_Status sts;
641
642   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
643     msg_tmp = msg_tmp->next;
644   if(msg_tmp) {
645     done = 0;
646     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
647       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
648     return ((done)?1:0);
649   } else {
650     return 1;
651   }
652 }
653
654 void CmiReleaseCommHandle(CmiCommHandle c)
655 {
656   return;
657 }
658
659 #if CMK_BLUEGENEL
660 extern void MPID_Progress_test();
661 #endif
662
663 void CmiReleaseSentMessages(void)
664 {
665   SMSG_LIST *msg_tmp=sent_msgs;
666   SMSG_LIST *prev=0;
667   SMSG_LIST *temp;
668   int done;
669   MPI_Status sts;
670
671 #if CMK_BLUEGENEL
672   MPID_Progress_test();
673 #endif
674
675   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
676   while(msg_tmp!=0) {
677     done =0;
678 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
679     double startT = CmiWallTimer();
680 #endif
681     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
682       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
683     if(done) {
684       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
685       MsgQueueLen--;
686       /* Release the message */
687       temp = msg_tmp->next;
688       if(prev==0)  /* first message */
689         sent_msgs = temp;
690       else
691         prev->next = temp;
692       CmiFree(msg_tmp->msg);
693       CmiFree(msg_tmp);
694       msg_tmp = temp;
695     } else {
696       prev = msg_tmp;
697       msg_tmp = msg_tmp->next;
698     }
699 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
700     {
701     double endT = CmiWallTimer();
702     /* only record the event if it takes more than 1ms */
703     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
704     }
705 #endif
706   }
707   end_sent = prev;
708   MACHSTATE(2,"} CmiReleaseSentMessages end");
709 }
710
711 int PumpMsgs(void)
712 {
713   int nbytes, flg, res;
714   char *msg;
715   MPI_Status sts;
716   int recd=0;
717
718 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
719   int recvCnt=0;
720 #endif
721         
722 #if CMK_BLUEGENEL
723   MPID_Progress_test();
724 #endif
725
726   MACHSTATE(2,"PumpMsgs begin {");
727
728 #if CMI_DYNAMIC_EXERT_CAP
729   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
730 #endif
731         
732   while(1) {
733 #if CMI_EXERT_RECV_CAP
734         if(recvCnt==RECV_CAP) break;
735 #elif CMI_DYNAMIC_EXERT_CAP
736         if(recvCnt >= dynamicRecvCap) break;
737 #endif
738           
739     /* First check posted recvs then do  probe unmatched outstanding messages */
740 #if MPI_POST_RECV_COUNT > 0 
741     int completed_index=-1;
742     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
743         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
744     if(flg){
745         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
746             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
747
748         recd = 1;
749         msg = (char *) CmiAlloc(nbytes);
750         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
751         /* and repost the recv */
752
753         START_EVENT();
754
755         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
756             MPI_POST_RECV_SIZE,
757             MPI_BYTE,
758             MPI_ANY_SOURCE,
759             POST_RECV_TAG,
760             MPI_COMM_WORLD,
761             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
762                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
763
764         END_EVENT(50);
765
766         CpvAccess(Cmi_posted_recv_total)++;
767     }
768     else {
769         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
770         if(res != MPI_SUCCESS)
771         CmiAbort("MPI_Iprobe failed\n");
772         if(!flg) break;
773         recd = 1;
774         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
775         msg = (char *) CmiAlloc(nbytes);
776
777         START_EVENT();
778
779         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
780             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
781
782         END_EVENT(30);
783
784         CpvAccess(Cmi_unposted_recv_total)++;
785     }
786 #else
787     /* Original version */
788 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
789   double startT = CmiWallTimer(); 
790 #endif
791     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
792     if(res != MPI_SUCCESS)
793       CmiAbort("MPI_Iprobe failed\n");
794
795     if(!flg) break;
796 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
797     {
798     double endT = CmiWallTimer();
799     /* only trace the probe that last longer than 1ms */
800     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
801     }
802 #endif
803
804     recd = 1;
805     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
806     msg = (char *) CmiAlloc(nbytes);
807     
808     START_EVENT();
809
810     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
811       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
812
813     /*END_EVENT(30);*/
814
815 #endif
816
817 #if CMK_SMP_TRACE_COMMTHREAD
818         traceBeginCommOp(msg);
819         traceChangeLastTimestamp(CpvAccess(projTraceStart));
820         traceEndCommOp(msg);
821         #if CMI_MPI_TRACE_MOREDETAILED
822         char tmp[32];
823         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
824         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
825         #endif
826 #elif CMK_TRACE_COMMOVERHEAD
827         char tmp[32];
828         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
829         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
830 #endif
831         
832         
833     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
834     CMI_CHECK_CHECKSUM(msg, nbytes);
835 #if CMK_ERROR_CHECKING
836     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
837       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
838       CmiFree(msg);
839       CmiAbort("Abort!\n");
840       continue;
841     }
842 #endif
843         
844 #if CMK_BROADCAST_SPANNING_TREE
845     if (CMI_BROADCAST_ROOT(msg))
846       SendSpanningChildren(nbytes, msg);
847 #elif CMK_BROADCAST_HYPERCUBE
848     if (CMI_BROADCAST_ROOT(msg))
849       SendHypercube(nbytes, msg);
850 #endif
851         
852         /* In SMP mode, this push operation needs to be executed
853      * after forwarding broadcast messages. If it is executed
854      * earlier, then during the bcast msg forwarding period,    
855          * the msg could be already freed on the worker thread.
856          * As a result, the forwarded message could be wrong! 
857          * --Chao Mei
858          */
859 #if CMK_NODE_QUEUE_AVAILABLE
860     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
861       CmiPushNode(msg);
862     else
863 #endif
864         CmiPushPE(CMI_DEST_RANK(msg), msg);     
865         
866 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
867         recvCnt++;
868         /* check sendMsgBuf  to get the  number of messages that have not been sent */
869         /* MsgQueueLen indicates the number of messages that have not been released by MPI */
870         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
871                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
872                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
873         }
874 #endif  
875         
876   }
877
878   
879 #if CMK_IMMEDIATE_MSG && !CMK_SMP
880   CmiHandleImmediate();
881 #endif
882   
883   MACHSTATE(2,"} PumpMsgs end ");
884   return recd;
885 }
886
887 /* blocking version */
888 static void PumpMsgsBlocking(void)
889 {
890   static int maxbytes = 20000000;
891   static char *buf = NULL;
892   int nbytes, flg;
893   MPI_Status sts;
894   char *msg;
895   int recd=0;
896
897   if (!PCQueueEmpty(CmiGetState()->recv)) return;
898   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
899   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
900   if (sent_msgs)  return;
901
902 #if 0
903   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
904 #endif
905
906   if (buf == NULL) {
907     buf = (char *) CmiAlloc(maxbytes);
908     _MEMCHECK(buf);
909   }
910
911
912 #if MPI_POST_RECV_COUNT > 0
913 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
914 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
915 #endif
916
917   START_EVENT();
918
919   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
920       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
921
922   /*END_EVENT(30);*/
923     
924    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
925    msg = (char *) CmiAlloc(nbytes);
926    memcpy(msg, buf, nbytes);
927
928 #if CMK_SMP_TRACE_COMMTHREAD
929         traceBeginCommOp(msg);
930         traceChangeLastTimestamp(CpvAccess(projTraceStart));
931         traceEndCommOp(msg);
932         #if CMI_MPI_TRACE_MOREDETAILED
933         char tmp[32];
934         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
935         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
936         #endif
937 #endif
938
939 #if CMK_BROADCAST_SPANNING_TREE
940    if (CMI_BROADCAST_ROOT(msg))
941       SendSpanningChildren(nbytes, msg);
942 #elif CMK_BROADCAST_HYPERCUBE
943    if (CMI_BROADCAST_ROOT(msg))
944       SendHypercube(nbytes, msg);
945 #endif
946   
947         /* In SMP mode, this push operation needs to be executed
948      * after forwarding broadcast messages. If it is executed
949      * earlier, then during the bcast msg forwarding period,    
950          * the msg could be already freed on the worker thread.
951          * As a result, the forwarded message could be wrong! 
952          * --Chao Mei
953          */  
954 #if CMK_NODE_QUEUE_AVAILABLE
955    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
956       CmiPushNode(msg);
957    else
958 #endif
959       CmiPushPE(CMI_DEST_RANK(msg), msg);
960 }
961
962 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
963
964 #if CMK_SMP
965
966 static int inexit = 0;
967 static CmiNodeLock  exitLock = 0;
968
969 static int MsgQueueEmpty()
970 {
971   int i;
972 #if MULTI_SENDQUEUE
973   for (i=0; i<_Cmi_mynodesize; i++)
974     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
975 #else
976   return PCQueueEmpty(sendMsgBuf);
977 #endif
978   return 1;
979 }
980
981 static int SendMsgBuf();
982
983 /* test if all processors recv queues are empty */
984 static int RecvQueueEmpty()
985 {
986   int i;
987   for (i=0; i<_Cmi_mynodesize; i++) {
988     CmiState cs=CmiGetStateN(i);
989     if (!PCQueueEmpty(cs->recv)) return 0;
990   }
991   return 1;
992 }
993
994 /**
995 CommunicationServer calls MPI to send messages in the queues and probe message from network.
996 */
997
998 #define REPORT_COMM_METRICS 0
999 #if REPORT_COMM_METRICS
1000 static double pumptime = 0.0;
1001 static double releasetime = 0.0;
1002 static double sendtime = 0.0;
1003 #endif
1004
1005 static void CommunicationServer(int sleepTime)
1006 {
1007   int static count=0;
1008 /*
1009   count ++;
1010   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1011 */
1012 #if REPORT_COMM_METRICS
1013   double t1, t2, t3, t4;
1014   t1 = CmiWallTimer();
1015 #endif
1016   PumpMsgs();
1017 #if REPORT_COMM_METRICS
1018   t2 = CmiWallTimer();
1019 #endif
1020   CmiReleaseSentMessages();
1021 #if REPORT_COMM_METRICS
1022   t3 = CmiWallTimer();
1023 #endif
1024   SendMsgBuf();
1025 #if REPORT_COMM_METRICS
1026   t4 = CmiWallTimer();
1027   pumptime += (t2-t1);
1028   releasetime += (t3-t2);
1029   sendtime += (t4-t3);
1030 #endif
1031 /*
1032   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1033 */
1034   if (inexit == CmiMyNodeSize()) {
1035     MACHSTATE(2, "CommunicationServer exiting {");
1036 #if 0
1037     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1038 #endif
1039     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1040       CmiReleaseSentMessages();
1041       SendMsgBuf();
1042       PumpMsgs();
1043     }
1044     MACHSTATE(2, "CommunicationServer barrier begin {");
1045
1046     START_EVENT();
1047
1048     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1049       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1050
1051     END_EVENT(10);
1052
1053     MACHSTATE(2, "} CommunicationServer barrier end");
1054 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1055     if (CmiMyNode() == 0){
1056       CmiPrintf("End of program\n");
1057     }
1058 #endif
1059     MACHSTATE(2, "} CommunicationServer EXIT");
1060
1061     ConverseCommonExit();   
1062 #if REPORT_COMM_METRICS
1063     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1064 #endif
1065
1066 #if ! CMK_AUTOBUILD
1067     signal(SIGINT, signal_int);
1068     MPI_Finalize();
1069     #endif
1070     exit(0);
1071   }
1072 }
1073
1074 #endif
1075
1076 static void CommunicationServerThread(int sleepTime)
1077 {
1078 #if CMK_SMP
1079   CommunicationServer(sleepTime);
1080 #endif
1081 #if CMK_IMMEDIATE_MSG
1082   CmiHandleImmediate();
1083 #endif
1084 }
1085
1086 #if CMK_NODE_QUEUE_AVAILABLE
1087 char *CmiGetNonLocalNodeQ(void)
1088 {
1089   CmiState cs = CmiGetState();
1090   char *result = 0;
1091   CmiIdleLock_checkMessage(&cs->idle);
1092 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1093     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1094     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1095     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1096     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1097     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1098 /*  }  */
1099
1100   return result;
1101 }
1102 #endif
1103
1104 void *CmiGetNonLocal(void)
1105 {
1106   static int count=0;
1107   CmiState cs = CmiGetState();
1108   void *msg;
1109
1110 #if ! CMK_SMP
1111   if (CmiNumPes() == 1) return NULL;
1112 #endif
1113
1114   CmiIdleLock_checkMessage(&cs->idle);
1115   /* although it seems that lock is not needed, I found it crashes very often
1116      on mpi-smp without lock */
1117
1118 #if ! CMK_SMP
1119   CmiReleaseSentMessages();
1120   PumpMsgs();
1121 #endif
1122
1123   /* CmiLock(procState[cs->rank].recvLock); */
1124   msg =  PCQueuePop(cs->recv);
1125   /* CmiUnlock(procState[cs->rank].recvLock); */
1126
1127 /*
1128   if (msg) {
1129     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1130   else {
1131     count++;
1132     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1133   }
1134 */
1135 #if ! CMK_SMP
1136   if (no_outstanding_sends) {
1137     while (MsgQueueLen>0) {
1138       CmiReleaseSentMessages();
1139       PumpMsgs();
1140     }
1141   }
1142
1143   if(!msg) {
1144     CmiReleaseSentMessages();
1145     if (PumpMsgs())
1146       return  PCQueuePop(cs->recv);
1147     else
1148       return 0;
1149   }
1150 #endif
1151
1152   return msg;
1153 }
1154
1155 /* called in non-smp mode */
1156 void CmiNotifyIdle(void)
1157 {
1158   CmiReleaseSentMessages();
1159   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1160 }
1161
1162
1163 /********************************************************
1164     The call to probe immediate messages has been renamed to
1165     CmiMachineProgressImpl
1166 ******************************************************/
1167 /* user call to handle immediate message, only useful in non SMP version
1168    using polling method to schedule message.
1169 */
1170 /*
1171 #if CMK_IMMEDIATE_MSG
1172 void CmiProbeImmediateMsg()
1173 {
1174 #if !CMK_SMP
1175   PumpMsgs();
1176   CmiHandleImmediate();
1177 #endif
1178 }
1179 #endif
1180 */
1181
1182 /* Network progress function is used to poll the network when for
1183    messages. This flushes receive buffers on some  implementations*/
1184 #if CMK_MACHINE_PROGRESS_DEFINED
1185 void CmiMachineProgressImpl()
1186 {
1187 #if !CMK_SMP
1188     PumpMsgs();
1189 #if CMK_IMMEDIATE_MSG
1190     CmiHandleImmediate();
1191 #endif
1192 #else
1193     /*Not implemented yet. Communication server does not seem to be
1194       thread safe, so only communication thread call it */
1195     if (CmiMyRank() == CmiMyNodeSize())
1196         CommunicationServerThread(0);
1197 #endif
1198 }
1199 #endif
1200
1201 /********************* MESSAGE SEND FUNCTIONS ******************/
1202
1203 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1204
1205 static void CmiSendSelf(char *msg)
1206 {
1207 #if CMK_IMMEDIATE_MSG
1208     if (CmiIsImmediate(msg)) {
1209       /* CmiBecomeNonImmediate(msg); */
1210       CmiPushImmediateMsg(msg);
1211       CmiHandleImmediate();
1212       return;
1213     }
1214 #endif
1215     CQdCreate(CpvAccess(cQdState), 1);
1216     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1217 }
1218
1219 void CmiSyncSendFn(int destPE, int size, char *msg)
1220 {
1221   CmiState cs = CmiGetState();
1222   char *dupmsg = (char *) CmiAlloc(size);
1223   memcpy(dupmsg, msg, size);
1224
1225   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1226
1227   if (cs->pe==destPE) {
1228     CmiSendSelf(dupmsg);
1229   }
1230   else
1231     CmiAsyncSendFn_(destPE, size, dupmsg);
1232 }
1233
1234 #if CMK_SMP
1235
1236 /* called by communication thread in SMP */
1237 static int SendMsgBuf()
1238 {
1239   SMSG_LIST *msg_tmp;
1240   char *msg;
1241   int node, rank, size;
1242   int i;
1243   int sent = 0;
1244
1245 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1246         int sentCnt = 0;
1247 #endif  
1248         
1249 #if CMI_DYNAMIC_EXERT_CAP
1250         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1251 #endif
1252         
1253   MACHSTATE(2,"SendMsgBuf begin {");
1254 #if MULTI_SENDQUEUE
1255   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1256   {
1257     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1258     {
1259       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1260 #else
1261     /* single message sending queue */
1262     /* CmiLock(sendMsgBufLock); */
1263     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1264     /* CmiUnlock(sendMsgBufLock); */
1265     while (NULL != msg_tmp)
1266     {
1267 #endif
1268       node = msg_tmp->destpe;
1269       size = msg_tmp->size;
1270       msg = msg_tmp->msg;
1271       msg_tmp->next = 0;
1272                 
1273 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1274       while (MsgQueueLen > request_max) {
1275                 CmiReleaseSentMessages();
1276                 PumpMsgs();
1277       }
1278 #endif
1279           
1280       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1281 #if CMK_ERROR_CHECKING
1282       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1283 #endif
1284       CMI_SET_CHECKSUM(msg, size);
1285
1286 #if MPI_POST_RECV_COUNT > 0
1287         if(size <= MPI_POST_RECV_SIZE){
1288
1289           START_EVENT();
1290           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1291                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1292
1293           STOP_EVENT(40);
1294         }
1295         else {
1296             START_EVENT();
1297             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1298                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1299             STOP_EVENT(40);
1300         }
1301 #else
1302         START_EVENT();
1303         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1304             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1305         /*END_EVENT(40);*/
1306 #endif
1307         
1308 #if CMK_SMP_TRACE_COMMTHREAD
1309         traceBeginCommOp(msg);
1310         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1311         /* traceSendMsgComm must execute after traceBeginCommOp because
1312          * we pretend we execute an entry method, and inside this we
1313          * pretend we will send another message. Otherwise how could
1314          * a message creation just before an entry method invocation?
1315          * If such logic is broken, the projections will not trace
1316          * messages correctly! -Chao Mei
1317          */
1318         traceSendMsgComm(msg);
1319         traceEndCommOp(msg);
1320         #if CMI_MPI_TRACE_MOREDETAILED
1321         char tmp[64];
1322         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1323         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1324         #endif
1325 #endif
1326                 
1327                 
1328       MACHSTATE(3,"}MPI_send end");
1329       MsgQueueLen++;
1330       if(sent_msgs==0)
1331         sent_msgs = msg_tmp;
1332       else
1333         end_sent->next = msg_tmp;
1334       end_sent = msg_tmp;
1335       sent=1;
1336           
1337 #if CMI_EXERT_SEND_CAP    
1338           if(++sentCnt == SEND_CAP) break;
1339 #elif CMI_DYNAMIC_EXERT_CAP
1340           if(++sentCnt >= dynamicSendCap) break;
1341           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1342                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1343 #endif    
1344           
1345 #if ! MULTI_SENDQUEUE
1346       /* CmiLock(sendMsgBufLock); */
1347       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1348       /* CmiUnlock(sendMsgBufLock); */
1349 #endif
1350     }
1351 #if MULTI_SENDQUEUE
1352   }
1353 #endif
1354   MACHSTATE(2,"}SendMsgBuf end ");
1355   return sent;
1356 }
1357
1358 void EnqueueMsg(void *m, int size, int node)
1359 {
1360   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1361   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1362   msg_tmp->msg = m;
1363   msg_tmp->size = size;
1364   msg_tmp->destpe = node;
1365         
1366 #if CMK_SMP_TRACE_COMMTHREAD
1367         msg_tmp->srcpe = CmiMyPe();
1368 #endif  
1369
1370 #if MULTI_SENDQUEUE
1371   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1372 #else
1373   CmiLock(sendMsgBufLock);
1374   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1375   CmiUnlock(sendMsgBufLock);
1376 #endif
1377         
1378   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1379 }
1380
1381 #endif
1382
1383 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1384 {
1385   CmiState cs = CmiGetState();
1386   SMSG_LIST *msg_tmp;
1387   CmiUInt2  rank, node;
1388
1389   if(destPE == cs->pe) {
1390     char *dupmsg = (char *) CmiAlloc(size);
1391     memcpy(dupmsg, msg, size);
1392     CmiSendSelf(dupmsg);
1393     return 0;
1394   }
1395   CQdCreate(CpvAccess(cQdState), 1);
1396 #if CMK_SMP
1397   node = CmiNodeOf(destPE);
1398   rank = CmiRankOf(destPE);
1399   if (node == CmiMyNode())  {
1400     CmiPushPE(rank, msg);
1401     return 0;
1402   }
1403   CMI_DEST_RANK(msg) = rank;
1404   EnqueueMsg(msg, size, node);
1405   return 0;
1406 #else
1407   /* non smp */
1408   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1409   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1410   msg_tmp->msg = msg;
1411   msg_tmp->next = 0;
1412   while (MsgQueueLen > request_max) {
1413         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1414         CmiReleaseSentMessages();
1415         PumpMsgs();
1416   }
1417 #if CMK_ERROR_CHECKING
1418   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1419 #endif
1420   CMI_SET_CHECKSUM(msg, size);
1421
1422 #if MPI_POST_RECV_COUNT > 0
1423         if(size <= MPI_POST_RECV_SIZE){
1424
1425           START_EVENT();
1426           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1427                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1428           END_EVENT(40);
1429         }
1430         else {
1431           START_EVENT();
1432           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1433                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1434           END_EVENT(40);
1435         }
1436 #else
1437   START_EVENT();
1438   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1439     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1440   /*END_EVENT(40);*/
1441   #if CMK_TRACE_COMMOVERHEAD
1442         char tmp[64];
1443         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1444         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1445   #endif
1446 #endif
1447
1448   MsgQueueLen++;
1449   if(sent_msgs==0)
1450     sent_msgs = msg_tmp;
1451   else
1452     end_sent->next = msg_tmp;
1453   end_sent = msg_tmp;
1454   return (CmiCommHandle) &(msg_tmp->req);
1455 #endif              /* non-smp */
1456 }
1457
1458 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1459 {
1460   CMI_SET_BROADCAST_ROOT(msg, 0);
1461   CmiAsyncSendFn_(destPE, size, msg);
1462 }
1463
1464 void CmiFreeSendFn(int destPE, int size, char *msg)
1465 {
1466   CmiState cs = CmiGetState();
1467   CMI_SET_BROADCAST_ROOT(msg, 0);
1468
1469   if (cs->pe==destPE) {
1470     CmiSendSelf(msg);
1471   } else {
1472     CmiAsyncSendFn_(destPE, size, msg);
1473   }
1474 }
1475
1476 /*********************** BROADCAST FUNCTIONS **********************/
1477
1478 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1479 void CmiSyncSendFn1(int destPE, int size, char *msg)
1480 {
1481   CmiState cs = CmiGetState();
1482   char *dupmsg = (char *) CmiAlloc(size);
1483   memcpy(dupmsg, msg, size);
1484   if (cs->pe==destPE)
1485     CmiSendSelf(dupmsg);
1486   else
1487     CmiAsyncSendFn_(destPE, size, dupmsg);
1488 }
1489
1490 /* send msg to its spanning children in broadcast. G. Zheng */
1491 void SendSpanningChildren(int size, char *msg)
1492 {
1493   CmiState cs = CmiGetState();
1494   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1495   int startnode = CmiNodeOf(startpe);
1496   int i, exceptRank;
1497         
1498    /* first send msgs to other nodes */
1499   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1500   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1501     int nd = CmiMyNode()-startnode;
1502     if (nd<0) nd+=CmiNumNodes();
1503     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1504     if (nd > CmiNumNodes() - 1) break;
1505     nd += startnode;
1506     nd = nd%CmiNumNodes();
1507     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1508         #if CMK_SMP
1509         /* always send to the first rank of other nodes */
1510         char *newmsg = CmiCopyMsg(msg, size);
1511         CMI_DEST_RANK(newmsg) = 0;
1512     EnqueueMsg(newmsg, size, nd);
1513         #else
1514         CmiSyncSendFn1(nd, size, msg);
1515         #endif
1516   }
1517 #if CMK_SMP  
1518    /* second send msgs to my peers on this node */
1519   /* FIXME: now it's just a flat p2p send!! When node size is large,
1520    * it should also be sent in a tree
1521    */
1522    exceptRank = CMI_DEST_RANK(msg);
1523    for(i=0; i<exceptRank; i++){
1524            CmiPushPE(i, CmiCopyMsg(msg, size));
1525    }
1526    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1527            CmiPushPE(i, CmiCopyMsg(msg, size));
1528    }
1529 #endif
1530 }
1531
1532 #include <math.h>
1533
1534 /* send msg along the hypercube in broadcast. (Sameer) */
1535 void SendHypercube(int size, char *msg)
1536 {
1537   CmiState cs = CmiGetState();
1538   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1539   int startnode = CmiNodeOf(startpe);
1540   int i, exceptRank, cnt, tmp, relPE;
1541   int dims=0;
1542
1543   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1544   tmp = CmiNumNodes()-1;
1545   while(tmp>0){
1546           dims++;
1547           tmp = tmp >> 1;
1548   }
1549   if(CmiNumNodes()==1) dims=1;
1550   
1551    /* first send msgs to other nodes */  
1552   relPE = CmiMyNode()-startnode;
1553   if(relPE < 0) relPE += CmiNumNodes();
1554   cnt=0;
1555   tmp = relPE;
1556   /* count how many zeros (in binary format) relPE has */
1557   for(i=0; i<dims; i++, cnt++){
1558     if(tmp & 1 == 1) break;
1559     tmp = tmp >> 1;
1560   }
1561   
1562   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1563   for (i = cnt-1; i >= 0; i--) {
1564     int nd = relPE + (1 << i);
1565         if(nd >= CmiNumNodes()) continue;
1566         nd = (nd+startnode)%CmiNumNodes();
1567         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1568 #if CMK_SMP
1569     /* always send to the first rank of other nodes */
1570     char *newmsg = CmiCopyMsg(msg, size);
1571     CMI_DEST_RANK(newmsg) = 0;
1572     EnqueueMsg(newmsg, size, nd);
1573 #else
1574         CmiSyncSendFn1(nd, size, msg);
1575 #endif
1576   }
1577   
1578 #if CMK_SMP
1579    /* second send msgs to my peers on this node */
1580    /* FIXME: now it's just a flat p2p send!! When node size is large,
1581     * it should also be sent in a tree
1582     */
1583    exceptRank = CMI_DEST_RANK(msg);
1584    for(i=0; i<exceptRank; i++){
1585            CmiPushPE(i, CmiCopyMsg(msg, size));
1586    }
1587    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1588            CmiPushPE(i, CmiCopyMsg(msg, size));
1589    }
1590 #endif
1591 }
1592
1593 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1594 {
1595   CmiState cs = CmiGetState();
1596
1597 #if CMK_SMP     
1598   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1599   CMI_DEST_RANK(msg) = CmiMyRank();
1600 #endif
1601         
1602 #if CMK_BROADCAST_SPANNING_TREE
1603   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1604   SendSpanningChildren(size, msg);
1605
1606 #elif CMK_BROADCAST_HYPERCUBE
1607   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1608   SendHypercube(size, msg);
1609
1610 #else
1611   int i;
1612
1613   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1614     CmiSyncSendFn(i, size,msg) ;
1615   for ( i=0; i<cs->pe; i++ )
1616     CmiSyncSendFn(i, size,msg) ;
1617 #endif
1618
1619   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1620 }
1621
1622
1623 /*  FIXME: luckily async is never used  G. Zheng */
1624 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1625 {
1626   CmiState cs = CmiGetState();
1627   int i ;
1628
1629   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1630     CmiAsyncSendFn(i,size,msg) ;
1631   for ( i=0; i<cs->pe; i++ )
1632     CmiAsyncSendFn(i,size,msg) ;
1633
1634   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1635   CmiAbort("CmiAsyncBroadcastFn should never be called");
1636   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1637 }
1638
1639 void CmiFreeBroadcastFn(int size, char *msg)
1640 {
1641    CmiSyncBroadcastFn(size,msg);
1642    CmiFree(msg);
1643 }
1644
1645 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1646 {
1647
1648 #if CMK_SMP     
1649   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1650   CMI_DEST_RANK(msg) = CmiMyRank();
1651 #endif
1652
1653 #if CMK_BROADCAST_SPANNING_TREE
1654   CmiState cs = CmiGetState();
1655   CmiSyncSendFn(cs->pe, size,msg) ;
1656   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1657   SendSpanningChildren(size, msg);
1658
1659 #elif CMK_BROADCAST_HYPERCUBE
1660   CmiState cs = CmiGetState();
1661   CmiSyncSendFn(cs->pe, size,msg) ;
1662   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1663   SendHypercube(size, msg);
1664
1665 #else
1666     int i ;
1667
1668   for ( i=0; i<_Cmi_numpes; i++ )
1669     CmiSyncSendFn(i,size,msg) ;
1670 #endif
1671
1672   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1673 }
1674
1675 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1676 {
1677   int i ;
1678
1679   for ( i=1; i<_Cmi_numpes; i++ )
1680     CmiAsyncSendFn(i,size,msg) ;
1681
1682   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1683
1684   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1685 }
1686
1687 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1688 {
1689 #if CMK_SMP     
1690   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1691   CMI_DEST_RANK(msg) = CmiMyRank();
1692 #endif
1693
1694 #if CMK_BROADCAST_SPANNING_TREE
1695   CmiState cs = CmiGetState();
1696   CmiSyncSendFn(cs->pe, size,msg) ;
1697   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1698   SendSpanningChildren(size, msg);
1699
1700 #elif CMK_BROADCAST_HYPERCUBE
1701   CmiState cs = CmiGetState();
1702   CmiSyncSendFn(cs->pe, size,msg) ;
1703   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1704   SendHypercube(size, msg);
1705
1706 #else
1707   int i ;
1708
1709   for ( i=0; i<_Cmi_numpes; i++ )
1710     CmiSyncSendFn(i,size,msg) ;
1711 #endif
1712   CmiFree(msg) ;
1713   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1714 }
1715
1716 #if CMK_NODE_QUEUE_AVAILABLE
1717
1718 static void CmiSendNodeSelf(char *msg)
1719 {
1720 #if CMK_IMMEDIATE_MSG
1721 #if 0
1722     if (CmiIsImmediate(msg) && !_immRunning) {
1723       /*CmiHandleImmediateMessage(msg); */
1724       CmiPushImmediateMsg(msg);
1725       CmiHandleImmediate();
1726       return;
1727     }
1728 #endif
1729     if (CmiIsImmediate(msg))
1730     {
1731       CmiPushImmediateMsg(msg);
1732       if (!_immRunning) CmiHandleImmediate();
1733       return;
1734     }
1735 #endif
1736     CQdCreate(CpvAccess(cQdState), 1);
1737     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1738     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1739     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1740 }
1741
1742 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1743 {
1744   int i;
1745   SMSG_LIST *msg_tmp;
1746   char *dupmsg;
1747
1748   CMI_SET_BROADCAST_ROOT(msg, 0);
1749   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1750   switch (dstNode) {
1751   case NODE_BROADCAST_ALL:
1752     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1753   case NODE_BROADCAST_OTHERS:
1754     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1755     for (i=0; i<_Cmi_numnodes; i++)
1756       if (i!=_Cmi_mynode) {
1757         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1758       }
1759     break;
1760   default:
1761     dupmsg = (char *)CmiCopyMsg(msg,size);
1762     if(dstNode == _Cmi_mynode) {
1763       CmiSendNodeSelf(dupmsg);
1764     }
1765     else {
1766       CQdCreate(CpvAccess(cQdState), 1);
1767       EnqueueMsg(dupmsg, size, dstNode);
1768     }
1769   }
1770   return 0;
1771 }
1772
1773 void CmiSyncNodeSendFn(int p, int s, char *m)
1774 {
1775   CmiAsyncNodeSendFn(p, s, m);
1776 }
1777
1778 /* need */
1779 void CmiFreeNodeSendFn(int p, int s, char *m)
1780 {
1781   CmiAsyncNodeSendFn(p, s, m);
1782   CmiFree(m);
1783 }
1784
1785 /* need */
1786 void CmiSyncNodeBroadcastFn(int s, char *m)
1787 {
1788   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1789 }
1790
1791 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1792 {
1793 }
1794
1795 /* need */
1796 void CmiFreeNodeBroadcastFn(int s, char *m)
1797 {
1798   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1799   CmiFree(m);
1800 }
1801
1802 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1803 {
1804   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1805 }
1806
1807 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1808 {
1809   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1810 }
1811
1812 /* need */
1813 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1814 {
1815   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1816   CmiFree(m);
1817 }
1818 #endif
1819
1820 /************************** MAIN ***********************************/
1821 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1822
1823 void ConverseExit(void)
1824 {
1825 #if ! CMK_SMP
1826   while(!CmiAllAsyncMsgsSent()) {
1827     PumpMsgs();
1828     CmiReleaseSentMessages();
1829   }
1830   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1831     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1832
1833   ConverseCommonExit();
1834 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1835   if (CmiMyPe() == 0){
1836     CmiPrintf("End of program\n");
1837 #if MPI_POST_RECV_COUNT > 0
1838     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1839 #endif
1840 }
1841 #endif
1842 #if ! CMK_AUTOBUILD
1843   signal(SIGINT, signal_int);
1844   MPI_Finalize();
1845 #endif
1846   exit(0);
1847
1848 #else
1849     /* SMP version, communication thread will exit */
1850   ConverseCommonExit();
1851   /* atomic increment */
1852   CmiLock(exitLock);
1853   inexit++;
1854   CmiUnlock(exitLock);
1855   while (1) CmiYield();
1856 #endif
1857 }
1858
1859 static void registerMPITraceEvents() {
1860 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1861     traceRegisterUserEvent("MPI_Barrier", 10);
1862     traceRegisterUserEvent("MPI_Send", 20);
1863     traceRegisterUserEvent("MPI_Recv", 30);
1864     traceRegisterUserEvent("MPI_Isend", 40);
1865     traceRegisterUserEvent("MPI_Irecv", 50);
1866     traceRegisterUserEvent("MPI_Test", 60);
1867     traceRegisterUserEvent("MPI_Iprobe", 70);
1868 #endif
1869 }
1870
1871
1872 static char     **Cmi_argv;
1873 static char     **Cmi_argvcopy;
1874 static CmiStartFn Cmi_startfn;   /* The start function */
1875 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1876
1877 typedef struct {
1878   int sleepMs; /*Milliseconds to sleep while idle*/
1879   int nIdles; /*Number of times we've been idle in a row*/
1880   CmiState cs; /*Machine state*/
1881 } CmiIdleState;
1882
1883 static CmiIdleState *CmiNotifyGetState(void)
1884 {
1885   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1886   s->sleepMs=0;
1887   s->nIdles=0;
1888   s->cs=CmiGetState();
1889   return s;
1890 }
1891
1892 static void CmiNotifyBeginIdle(CmiIdleState *s)
1893 {
1894   s->sleepMs=0;
1895   s->nIdles=0;
1896 }
1897
1898 static void CmiNotifyStillIdle(CmiIdleState *s)
1899 {
1900 #if ! CMK_SMP
1901   CmiReleaseSentMessages();
1902   PumpMsgs();
1903 #else
1904 /*  CmiYield();  */
1905 #endif
1906
1907 #if 1
1908   {
1909   int nSpins=20; /*Number of times to spin before sleeping*/
1910   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1911   s->nIdles++;
1912   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1913     s->sleepMs+=2;
1914     if (s->sleepMs>10) s->sleepMs=10;
1915   }
1916   /*Comm. thread will listen on sockets-- just sleep*/
1917   if (s->sleepMs>0) {
1918     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1919     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1920     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1921   }
1922   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1923   }
1924 #endif
1925 }
1926
1927 #if MACHINE_DEBUG_LOG
1928 FILE *debugLog = NULL;
1929 #endif
1930
1931 static int machine_exit_idx;
1932 static void machine_exit(char *m) {
1933   EmergencyExit();
1934   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1935   fflush(stdout);
1936   CmiNodeBarrier();
1937   if (CmiMyRank() == 0) {
1938     MPI_Barrier(MPI_COMM_WORLD);
1939     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1940     MPI_Abort(MPI_COMM_WORLD, 1);
1941   } else {
1942     while (1) CmiYield();
1943   }
1944 }
1945
1946 static void KillOnAllSigs(int sigNo) {
1947   static int already_in_signal_handler = 0;
1948   char *m;
1949   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1950   already_in_signal_handler = 1;
1951 #if CMK_CCS_AVAILABLE
1952   if (CpvAccess(cmiArgDebugFlag)) {
1953     CpdNotify(CPD_SIGNAL, sigNo);
1954     CpdFreeze();
1955   }
1956 #endif
1957   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1958       "Signal: %d\n",CmiMyPe(),sigNo);
1959   CmiPrintStackTrace(1);
1960
1961   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1962   CmiSetHandler(m, machine_exit_idx);
1963   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1964   machine_exit(m);
1965 }
1966
1967 static void ConverseRunPE(int everReturn)
1968 {
1969   CmiIdleState *s=CmiNotifyGetState();
1970   CmiState cs;
1971   char** CmiMyArgv;
1972
1973   CmiNodeAllBarrier();
1974
1975   cs = CmiGetState();
1976   CpvInitialize(void *,CmiLocalQueue);
1977   CpvAccess(CmiLocalQueue) = cs->localqueue;
1978
1979   if (CmiMyRank())
1980     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1981   else
1982     CmiMyArgv=Cmi_argv;
1983
1984   CthInit(CmiMyArgv);
1985
1986   ConverseCommonInit(CmiMyArgv);
1987   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1988
1989 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1990   CpvInitialize(double, projTraceStart);
1991   /* only PE 0 needs to care about registration (to generate sts file). */
1992   if (CmiMyPe() == 0) {
1993     registerMachineUserEventsFunction(&registerMPITraceEvents);
1994   }
1995 #endif
1996
1997   /* initialize the network progress counter*/
1998   /* Network progress function is used to poll the network when for
1999      messages. This flushes receive buffers on some  implementations*/
2000   CpvInitialize(int , networkProgressCount);
2001   CpvAccess(networkProgressCount) = 0;
2002
2003 #if CMK_SMP
2004   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2005   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2006 #else
2007   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2008 #endif
2009
2010 #if MACHINE_DEBUG_LOG
2011   if (CmiMyRank() == 0) {
2012     char ln[200];
2013     sprintf(ln,"debugLog.%d",CmiMyNode());
2014     debugLog=fopen(ln,"w");
2015   }
2016 #endif
2017
2018   /* Converse initialization finishes, immediate messages can be processed.
2019      node barrier previously should take care of the node synchronization */
2020   _immediateReady = 1;
2021
2022   /* communication thread */
2023   if (CmiMyRank() == CmiMyNodeSize()) {
2024     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2025     while (1) CommunicationServerThread(5);
2026   }
2027   else {  /* worker thread */
2028   if (!everReturn) {
2029     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2030     if (Cmi_usrsched==0) CsdScheduler(-1);
2031     ConverseExit();
2032   }
2033   }
2034 }
2035
2036 static char *thread_level_tostring(int thread_level)
2037 {
2038 #if CMK_MPI_INIT_THREAD
2039   switch (thread_level) {
2040   case MPI_THREAD_SINGLE:
2041       return "MPI_THREAD_SINGLE";
2042   case MPI_THREAD_FUNNELED:
2043       return "MPI_THREAD_FUNNELED";
2044   case MPI_THREAD_SERIALIZED:
2045       return "MPI_THREAD_SERIALIZED";
2046   case MPI_THREAD_MULTIPLE :
2047       return "MPI_THREAD_MULTIPLE ";
2048   default: {
2049       char *str = (char*)malloc(5);
2050       sprintf(str,"%d", thread_level);
2051       return str;
2052       }
2053   }
2054   return  "unknown";
2055 #else
2056   char *str = (char*)malloc(5);
2057   sprintf(str,"%d", thread_level);
2058   return str;
2059 #endif
2060 }
2061
2062 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2063 {
2064   int n,i;
2065   int ver, subver;
2066   int provided;
2067   int thread_level;
2068
2069 #if MACHINE_DEBUG
2070   debugLog=NULL;
2071 #endif
2072 #if CMK_USE_HP_MAIN_FIX
2073 #if FOR_CPLUS
2074   _main(argc,argv);
2075 #endif
2076 #endif
2077
2078 #if CMK_MPI_INIT_THREAD
2079 #if CMK_SMP
2080   thread_level = MPI_THREAD_FUNNELED;
2081 #else
2082   thread_level = MPI_THREAD_SINGLE;
2083 #endif
2084   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2085   _thread_provided = provided;
2086 #else
2087   MPI_Init(&argc, &argv);
2088   thread_level = 0;
2089   provided = -1;
2090 #endif
2091   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2092   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2093
2094   MPI_Get_version(&ver, &subver);
2095   if (_Cmi_mynode == 0) {
2096     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2097   }
2098
2099   /* processor per node */
2100   _Cmi_mynodesize = 1;
2101   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2102     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2103 #if ! CMK_SMP
2104   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2105     CmiAbort("+ppn cannot be used in non SMP version!\n");
2106 #endif
2107   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2108   if (idleblock && _Cmi_mynode == 0) {
2109     printf("Charm++: Running in idle blocking mode.\n");
2110   }
2111
2112   /* setup signal handlers */
2113   signal(SIGSEGV, KillOnAllSigs);
2114   signal(SIGFPE, KillOnAllSigs);
2115   signal(SIGILL, KillOnAllSigs);
2116   signal_int = signal(SIGINT, KillOnAllSigs);
2117   signal(SIGTERM, KillOnAllSigs);
2118   signal(SIGABRT, KillOnAllSigs);
2119 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2120   signal(SIGQUIT, KillOnAllSigs);
2121   signal(SIGBUS, KillOnAllSigs);
2122 /*#     if CMK_HANDLE_SIGUSR
2123   signal(SIGUSR1, HandleUserSignals);
2124   signal(SIGUSR2, HandleUserSignals);
2125 #     endif*/
2126 #   endif /*UNIX*/
2127   
2128 #if CMK_NO_OUTSTANDING_SENDS
2129   no_outstanding_sends=1;
2130 #endif
2131   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2132     no_outstanding_sends = 1;
2133     if (_Cmi_mynode == 0)
2134       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2135         no_outstanding_sends?"":" not");
2136   }
2137   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2138   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2139   Cmi_argvcopy = CmiCopyArgs(argv);
2140   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2141   /* find dim = log2(numpes), to pretend we are a hypercube */
2142   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2143     Cmi_dim++ ;
2144  /* CmiSpanTreeInit();*/
2145   request_max=MAX_QLEN;
2146   CmiGetArgInt(argv,"+requestmax",&request_max);
2147   /*printf("request max=%d\n", request_max);*/
2148
2149   /* checksum flag */
2150   if (CmiGetArgFlag(argv,"+checksum")) {
2151 #if !CMK_OPTIMIZE
2152     checksum_flag = 1;
2153     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2154 #else
2155     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2156 #endif
2157   }
2158
2159   {
2160   int debug = CmiGetArgFlag(argv,"++debug");
2161   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2162   if (debug || debug_no_pause)
2163   {   /*Pause so user has a chance to start and attach debugger*/
2164 #if CMK_HAS_GETPID
2165     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2166     fflush(stdout);
2167     if (!debug_no_pause)
2168       sleep(15);
2169 #else
2170     printf("++debug ignored.\n");
2171 #endif
2172   }
2173   }
2174
2175 #if MPI_POST_RECV_COUNT > 0
2176
2177   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2178   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2179   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2180   CpvInitialize(char*,CmiPostedRecvBuffers);
2181
2182     /* Post some extra recvs to help out with incoming messages */
2183     /* On some MPIs the messages are unexpected and thus slow */
2184
2185     /* An array of request handles for posted recvs */
2186     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2187
2188     /* An array of buffers for posted recvs */
2189     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2190
2191     /* Post Recvs */
2192     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2193         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2194                     MPI_POST_RECV_SIZE,
2195                     MPI_BYTE,
2196                     MPI_ANY_SOURCE,
2197                     POST_RECV_TAG,
2198                     MPI_COMM_WORLD,
2199                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2200           CmiAbort("MPI_Irecv failed\n");
2201     }
2202
2203 #endif
2204
2205
2206
2207   /* CmiTimerInit(); */
2208
2209 #if 0
2210   CthInit(argv);
2211   ConverseCommonInit(argv);
2212
2213   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2214   if (initret==0) {
2215     fn(CmiGetArgc(argv), argv);
2216     if (usched==0) CsdScheduler(-1);
2217     ConverseExit();
2218   }
2219 #endif
2220
2221   CsvInitialize(CmiNodeState, NodeState);
2222   CmiNodeStateInit(&CsvAccess(NodeState));
2223
2224   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2225
2226   for (i=0; i<_Cmi_mynodesize+1; i++) {
2227 #if MULTI_SENDQUEUE
2228     procState[i].sendMsgBuf = PCQueueCreate();
2229 #endif
2230     procState[i].recvLock = CmiCreateLock();
2231   }
2232 #if CMK_SMP
2233 #if !MULTI_SENDQUEUE
2234   sendMsgBuf = PCQueueCreate();
2235   sendMsgBufLock = CmiCreateLock();
2236 #endif
2237   exitLock = CmiCreateLock();            /* exit count lock */
2238 #endif
2239
2240   /* Network progress function is used to poll the network when for
2241      messages. This flushes receive buffers on some  implementations*/
2242   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2243   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2244
2245   CmiStartThreads(argv);
2246   ConverseRunPE(initret);
2247 }
2248
2249 /***********************************************************************
2250  *
2251  * Abort function:
2252  *
2253  ************************************************************************/
2254
2255 void CmiAbort(const char *message)
2256 {
2257   char *m;
2258   /* if CharmDebug is attached simply try to send a message to it */
2259 #if CMK_CCS_AVAILABLE
2260   if (CpvAccess(cmiArgDebugFlag)) {
2261     CpdNotify(CPD_ABORT, message);
2262     CpdFreeze();
2263   }
2264 #endif  
2265   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2266         "Reason: %s\n",CmiMyPe(),message);
2267  /*  CmiError(message); */
2268   CmiPrintStackTrace(0);
2269   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2270   CmiSetHandler(m, machine_exit_idx);
2271   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2272   machine_exit(m);
2273   /* Program never reaches here */
2274   MPI_Abort(MPI_COMM_WORLD, 1);
2275 }
2276
2277
2278 #if 0
2279
2280 /* ****************************************************************** */
2281 /*    The following internal functions implement recd msg queue       */
2282 /* ****************************************************************** */
2283
2284 static void ** AllocBlock(unsigned int len)
2285 {
2286   void ** blk;
2287
2288   blk=(void **)CmiAlloc(len*sizeof(void *));
2289   if(blk==(void **)0) {
2290     CmiError("Cannot Allocate Memory!\n");
2291     MPI_Abort(MPI_COMM_WORLD, 1);
2292   }
2293   return blk;
2294 }
2295
2296 static void
2297 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2298 {
2299   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2300   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2301 }
2302
2303 void recdQueueInit(void)
2304 {
2305   recdQueue_blk = AllocBlock(BLK_LEN);
2306   recdQueue_blk_len = BLK_LEN;
2307   recdQueue_first = 0;
2308   recdQueue_len = 0;
2309 }
2310
2311 void recdQueueAddToBack(void *element)
2312 {
2313 #if NODE_0_IS_CONVHOST
2314   inside_comm = 1;
2315 #endif
2316   if(recdQueue_len==recdQueue_blk_len) {
2317     void **blk;
2318     recdQueue_blk_len *= 3;
2319     blk = AllocBlock(recdQueue_blk_len);
2320     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2321     CmiFree(recdQueue_blk);
2322     recdQueue_blk = blk;
2323     recdQueue_first = 0;
2324   }
2325   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2326 #if NODE_0_IS_CONVHOST
2327   inside_comm = 0;
2328 #endif
2329 }
2330
2331
2332 void * recdQueueRemoveFromFront(void)
2333 {
2334   if(recdQueue_len) {
2335     void *element;
2336     element = recdQueue_blk[recdQueue_first++];
2337     recdQueue_first %= recdQueue_blk_len;
2338     recdQueue_len--;
2339     return element;
2340   }
2341   return 0;
2342 }
2343
2344 #endif
2345
2346 /*@}*/