d1fc184172220f398d33a1abd6f1c3440021757b
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #if CMK_SMP
96 #define CMK_BROADCAST_SPANNING_TREE    0
97 #else
98 #define CMK_BROADCAST_SPANNING_TREE    1
99 #define CMK_BROADCAST_HYPERCUBE        0
100 #endif
101
102 #define BROADCAST_SPANNING_FACTOR      4
103
104 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
105 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
106
107 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
108 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
109
110 /* FIXME: need a random number that everyone agrees ! */
111 #define CHARM_MAGIC_NUMBER               126
112
113 #if !CMK_OPTIMIZE
114 static int checksum_flag = 0;
115 #define CMI_SET_CHECKSUM(msg, len)      \
116         if (checksum_flag)  {   \
117           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
118           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
119         }
120 #define CMI_CHECK_CHECKSUM(msg, len)    \
121         if (checksum_flag)      \
122           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
123             CmiAbort("Fatal error: checksum doesn't agree!\n");
124 #else
125 #define CMI_SET_CHECKSUM(msg, len)
126 #define CMI_CHECK_CHECKSUM(msg, len)
127 #endif
128
129 #if CMK_BROADCAST_SPANNING_TREE
130 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
131 #else
132 #  define CMI_SET_BROADCAST_ROOT(msg, root)
133 #endif
134
135 #if CMK_BROADCAST_HYPERCUBE
136 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
137 #else
138 #  define CMI_SET_CYCLE(msg, cycle)
139 #endif
140
141
142 /** 
143     If MPI_POST_RECV is defined, we provide default values for size 
144     and number of posted recieves. If MPI_POST_RECV_COUNT is set
145     then a default value for MPI_POST_RECV_SIZE is used if not specified
146     by the user.
147 */
148 #ifdef MPI_POST_RECV
149 #define MPI_POST_RECV_COUNT 10
150 #undef MPI_POST_RECV
151 #endif
152 #if MPI_POST_RECV_COUNT > 0
153 #warning "Using MPI posted receives which have not yet been tested"
154 #ifndef MPI_POST_RECV_SIZE
155 #define MPI_POST_RECV_SIZE 200
156 #endif
157 /* #undef  MPI_POST_RECV_DEBUG  */
158 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
159 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
160 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
161 CpvDeclare(char*,CmiPostedRecvBuffers);
162 #endif
163
164 /*
165  to avoid MPI's in order delivery, changing MPI Tag all the time
166 */
167 #define TAG     1375
168
169 #if MPI_POST_RECV_COUNT > 0
170 #define POST_RECV_TAG TAG+1
171 #define BARRIER_ZERO_TAG TAG
172 #else
173 #define BARRIER_ZERO_TAG     1375
174 #endif
175
176 #include <signal.h>
177 void (*signal_int)(int);
178
179 /*
180 static int mpi_tag = TAG;
181 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
182 */
183
184 static int        _thread_provided = -1;
185 int               _Cmi_numpes;
186 int               _Cmi_mynode;    /* Which address space am I */
187 int               _Cmi_mynodesize;/* Number of processors in my address space */
188 int               _Cmi_numnodes;  /* Total number of address spaces */
189 int               _Cmi_numpes;    /* Total number of processors */
190 static int        Cmi_nodestart; /* First processor in this address space */
191 CpvDeclare(void*, CmiLocalQueue);
192
193 /*Network progress utility variables. Period controls the rate at
194   which the network poll is called */
195 CpvDeclare(unsigned , networkProgressCount);
196 int networkProgressPeriod;
197
198 int               idleblock = 0;
199
200 #define BLK_LEN  512
201
202 #if CMK_NODE_QUEUE_AVAILABLE
203 #define DGRAM_NODEMESSAGE   (0xFB)
204
205 #define NODE_BROADCAST_OTHERS (-1)
206 #define NODE_BROADCAST_ALL    (-2)
207 #endif
208
209 #if 0
210 static void **recdQueue_blk;
211 static unsigned int recdQueue_blk_len;
212 static unsigned int recdQueue_first;
213 static unsigned int recdQueue_len;
214 static void recdQueueInit(void);
215 static void recdQueueAddToBack(void *element);
216 static void *recdQueueRemoveFromFront(void);
217 #endif
218
219 static void ConverseRunPE(int everReturn);
220 static void CommunicationServer(int sleepTime);
221 static void CommunicationServerThread(int sleepTime);
222
223 typedef struct msg_list {
224      char *msg;
225      struct msg_list *next;
226      int size, destpe;
227
228 #if CMK_SMP_TRACE_COMMTHREAD
229         int srcpe;
230 #endif  
231         
232      MPI_Request req;
233 } SMSG_LIST;
234
235 int MsgQueueLen=0;
236 static int request_max;
237
238 static SMSG_LIST *sent_msgs=0;
239 static SMSG_LIST *end_sent=0;
240
241 static int Cmi_dim;
242
243 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
244
245 #if NODE_0_IS_CONVHOST
246 int inside_comm = 0;
247 #endif
248
249 void CmiAbort(const char *message);
250 static void PerrorExit(const char *msg);
251
252 void SendSpanningChildren(int size, char *msg);
253 void SendHypercube(int size, char *msg);
254
255 static void PerrorExit(const char *msg)
256 {
257   perror(msg);
258   exit(1);
259 }
260
261 extern unsigned char computeCheckSum(unsigned char *data, int len);
262
263 /**************************  TIMER FUNCTIONS **************************/
264
265 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
266
267 /* MPI calls are not threadsafe, even the timer on some machines */
268 static CmiNodeLock  timerLock = 0;
269 static double starttimer = 0;
270 static int _is_global = 0;
271
272 int CmiTimerIsSynchronized()
273 {
274   int  flag;
275   void *v;
276
277   /*  check if it using synchronized timer */
278   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
279     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
280   if (flag) {
281     _is_global = *(int*)v;
282     if (_is_global && CmiMyPe() == 0)
283       printf("Charm++> MPI timer is synchronized!\n");
284   }
285   return _is_global;
286 }
287
288 void CmiTimerInit()
289 {
290   _is_global = CmiTimerIsSynchronized();
291
292   if (_is_global) {
293     if (CmiMyRank() == 0) {
294       double minTimer;
295 #if CMK_TIMER_USE_XT3_DCLOCK
296       starttimer = dclock();
297 #else
298       starttimer = MPI_Wtime();
299 #endif
300
301       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
302                                   MPI_COMM_WORLD );
303       starttimer = minTimer;
304     }
305   }
306   else {  /* we don't have a synchronous timer, set our own start time */
307     CmiBarrier();
308     CmiBarrier();
309     CmiBarrier();
310 #if CMK_TIMER_USE_XT3_DCLOCK
311     starttimer = dclock();
312 #else
313     starttimer = MPI_Wtime();
314 #endif
315   }
316
317 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
318   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
319     timerLock = CmiCreateLock();
320 #endif
321   CmiNodeAllBarrier();          /* for smp */
322 }
323
324 /**
325  * Since the timerLock is never created, and is
326  * always NULL, then all the if-condition inside
327  * the timer functions could be disabled right
328  * now in the case of SMP. --Chao Mei
329  */
330 double CmiTimer(void)
331 {
332   double t;
333 #if 0 && CMK_SMP
334   if (timerLock) CmiLock(timerLock);
335 #endif
336 #if CMK_TIMER_USE_XT3_DCLOCK
337   t = dclock() - starttimer;
338 #else
339   t = MPI_Wtime() - starttimer;
340 #endif
341
342 #if 0 && CMK_SMP
343   if (timerLock) CmiUnlock(timerLock);
344 #endif
345
346   return t;
347 }
348
349 double CmiWallTimer(void)
350 {
351   double t;
352 #if 0 && CMK_SMP
353   if (timerLock) CmiLock(timerLock);
354 #endif
355 #if CMK_TIMER_USE_XT3_DCLOCK
356   t = dclock() - starttimer;
357 #else
358   t = MPI_Wtime() - starttimer;
359 #endif
360 #if 0 && CMK_SMP
361   if (timerLock) CmiUnlock(timerLock);
362 #endif
363   return t;
364 }
365
366 double CmiCpuTimer(void)
367 {
368   double t;
369 #if 0 && CMK_SMP
370   if (timerLock) CmiLock(timerLock);
371 #endif
372 #if CMK_TIMER_USE_XT3_DCLOCK
373   t = dclock() - starttimer;
374 #else
375   t = MPI_Wtime() - starttimer;
376 #endif
377 #if 0 && CMK_SMP
378   if (timerLock) CmiUnlock(timerLock);
379 #endif
380   return t;
381 }
382
383 #endif
384
385 /* must be called on all ranks including comm thread in SMP */
386 int CmiBarrier()
387 {
388 #if CMK_SMP
389     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
390   CmiNodeAllBarrier();
391   if (CmiMyRank() == CmiMyNodeSize()) 
392 #else
393   if (CmiMyRank() == 0) 
394 #endif
395   {
396 /**
397  *  The call of CmiBarrier is usually before the initialization
398  *  of trace module of Charm++, therefore, the START_EVENT
399  *  and END_EVENT are disabled here. -Chao Mei
400  */     
401     /*START_EVENT();*/
402
403     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
404         CmiAbort("Timernit: MPI_Barrier failed!\n");
405
406     /*END_EVENT(10);*/
407   }
408   CmiNodeAllBarrier();
409   return 0;
410 }
411
412 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
413 int CmiBarrierZero()
414 {
415   int i;
416 #if CMK_SMP
417   if (CmiMyRank() == CmiMyNodeSize()) 
418 #else
419   if (CmiMyRank() == 0) 
420 #endif
421   {
422     char msg[1];
423     MPI_Status sts;
424     if (CmiMyNode() == 0)  {
425       for (i=0; i<CmiNumNodes()-1; i++) {
426          START_EVENT();
427
428          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
429             CmiPrintf("MPI_Recv failed!\n");
430
431          END_EVENT(30);
432       }
433     }
434     else {
435       START_EVENT();
436
437       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
438          printf("MPI_Send failed!\n");
439
440       END_EVENT(20);
441     }
442   }
443   CmiNodeAllBarrier();
444   return 0;
445 }
446
447 typedef struct ProcState {
448 #if MULTI_SENDQUEUE
449 PCQueue      sendMsgBuf;       /* per processor message sending queue */
450 #endif
451 CmiNodeLock  recvLock;              /* for cs->recv */
452 } ProcState;
453
454 static ProcState  *procState;
455
456 #if CMK_SMP
457
458 #if !MULTI_SENDQUEUE
459 static PCQueue sendMsgBuf;
460 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
461 #endif
462
463 #endif
464
465 /************************************************************
466  *
467  * Processor state structure
468  *
469  ************************************************************/
470
471 /* fake Cmi_charmrun_fd */
472 static int Cmi_charmrun_fd = 0;
473 #include "machine-smp.c"
474
475 CsvDeclare(CmiNodeState, NodeState);
476
477 #include "immediate.c"
478
479 #if CMK_SHARED_VARS_UNAVAILABLE
480 /************ non SMP **************/
481 static struct CmiStateStruct Cmi_state;
482 int _Cmi_mype;
483 int _Cmi_myrank;
484
485 void CmiMemLock() {}
486 void CmiMemUnlock() {}
487
488 #define CmiGetState() (&Cmi_state)
489 #define CmiGetStateN(n) (&Cmi_state)
490
491 void CmiYield(void) { sleep(0); }
492
493 static void CmiStartThreads(char **argv)
494 {
495   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
496   _Cmi_mype = Cmi_nodestart;
497   _Cmi_myrank = 0;
498 }
499 #endif  /* non smp */
500
501 /*Add a message to this processor's receive queue, pe is a rank */
502 void CmiPushPE(int pe,void *msg)
503 {
504   CmiState cs = CmiGetStateN(pe);
505   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
506 #if CMK_IMMEDIATE_MSG
507   if (CmiIsImmediate(msg)) {
508 /*
509 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
510     CmiHandleMessage(msg);
511 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
512 */
513     /**(CmiUInt2 *)msg = pe;*/
514     CMI_DEST_RANK(msg) = pe;
515     CmiPushImmediateMsg(msg);
516     return;
517   }
518 #endif
519
520 #if CMK_SMP
521   CmiLock(procState[pe].recvLock);
522 #endif
523   PCQueuePush(cs->recv,msg);
524 #if CMK_SMP
525   CmiUnlock(procState[pe].recvLock);
526 #endif
527   CmiIdleLock_addMessage(&cs->idle);
528   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
529 }
530
531 #if CMK_NODE_QUEUE_AVAILABLE
532 /*Add a message to this processor's receive queue */
533 static void CmiPushNode(void *msg)
534 {
535   MACHSTATE(3,"Pushing message into NodeRecv queue");
536 #if CMK_IMMEDIATE_MSG
537   if (CmiIsImmediate(msg)) {
538     CMI_DEST_RANK(msg) = 0;
539     CmiPushImmediateMsg(msg);
540     return;
541   }
542 #endif
543   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
544   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
545   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
546   {
547   CmiState cs=CmiGetStateN(0);
548   CmiIdleLock_addMessage(&cs->idle);
549   }
550 }
551 #endif
552
553 #ifndef CmiMyPe
554 int CmiMyPe(void)
555 {
556   return CmiGetState()->pe;
557 }
558 #endif
559
560 #ifndef CmiMyRank
561 int CmiMyRank(void)
562 {
563   return CmiGetState()->rank;
564 }
565 #endif
566
567 #ifndef CmiNodeFirst
568 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
569 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
570 #endif
571
572 #ifndef CmiNodeOf
573 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
574 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
575 #endif
576
577 static size_t CmiAllAsyncMsgsSent(void)
578 {
579    SMSG_LIST *msg_tmp = sent_msgs;
580    MPI_Status sts;
581    int done;
582
583    while(msg_tmp!=0) {
584     done = 0;
585     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
586       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
587     if(!done)
588       return 0;
589     msg_tmp = msg_tmp->next;
590 /*    MsgQueueLen--; ????? */
591    }
592    return 1;
593 }
594
595 int CmiAsyncMsgSent(CmiCommHandle c) {
596
597   SMSG_LIST *msg_tmp = sent_msgs;
598   int done;
599   MPI_Status sts;
600
601   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
602     msg_tmp = msg_tmp->next;
603   if(msg_tmp) {
604     done = 0;
605     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
606       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
607     return ((done)?1:0);
608   } else {
609     return 1;
610   }
611 }
612
613 void CmiReleaseCommHandle(CmiCommHandle c)
614 {
615   return;
616 }
617
618 #if CMK_BLUEGENEL
619 extern void MPID_Progress_test();
620 #endif
621
622 void CmiReleaseSentMessages(void)
623 {
624   SMSG_LIST *msg_tmp=sent_msgs;
625   SMSG_LIST *prev=0;
626   SMSG_LIST *temp;
627   int done;
628   MPI_Status sts;
629
630 #if CMK_BLUEGENEL
631   MPID_Progress_test();
632 #endif
633
634   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
635   while(msg_tmp!=0) {
636     done =0;
637 #if CMK_SMP_TRACE_COMMTHREAD
638     double startT = CmiWallTimer();
639 #endif
640     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
641       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
642     if(done) {
643       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
644       MsgQueueLen--;
645       /* Release the message */
646       temp = msg_tmp->next;
647       if(prev==0)  /* first message */
648         sent_msgs = temp;
649       else
650         prev->next = temp;
651       CmiFree(msg_tmp->msg);
652       CmiFree(msg_tmp);
653       msg_tmp = temp;
654     } else {
655       prev = msg_tmp;
656       msg_tmp = msg_tmp->next;
657     }
658 #if CMK_SMP_TRACE_COMMTHREAD
659     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
660 #endif
661   }
662   end_sent = prev;
663   MACHSTATE(2,"} CmiReleaseSentMessages end");
664 }
665
666 int PumpMsgs(void)
667 {
668   int nbytes, flg, res;
669   char *msg;
670   MPI_Status sts;
671   int recd=0;
672
673 #if CMI_EXERT_RECV_CAP
674   int recvCnt=0;
675 #endif
676         
677 #if CMK_BLUEGENEL
678   MPID_Progress_test();
679 #endif
680
681   MACHSTATE(2,"PumpMsgs begin {");
682
683         
684   while(1) {
685 #if CMI_EXERT_RECV_CAP
686         if(recvCnt==RECV_CAP) break;
687 #endif
688           
689     /* First check posted recvs then do  probe unmatched outstanding messages */
690 #if MPI_POST_RECV_COUNT > 0 
691     int completed_index=-1;
692     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
693         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
694     if(flg){
695         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
696             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
697
698         recd = 1;
699         msg = (char *) CmiAlloc(nbytes);
700         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
701         /* and repost the recv */
702
703         START_EVENT();
704
705         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
706             MPI_POST_RECV_SIZE,
707             MPI_BYTE,
708             MPI_ANY_SOURCE,
709             POST_RECV_TAG,
710             MPI_COMM_WORLD,
711             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
712                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
713
714         END_EVENT(50);
715
716         CpvAccess(Cmi_posted_recv_total)++;
717     }
718     else {
719         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
720         if(res != MPI_SUCCESS)
721         CmiAbort("MPI_Iprobe failed\n");
722         if(!flg) break;
723         recd = 1;
724         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
725         msg = (char *) CmiAlloc(nbytes);
726
727         START_EVENT();
728
729         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
730             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
731
732         END_EVENT(30);
733
734         CpvAccess(Cmi_unposted_recv_total)++;
735     }
736 #else
737     /* Original version */
738 #if CMK_SMP_TRACE_COMMTHREAD
739   double startT = CmiWallTimer(); 
740 #endif
741     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
742     if(res != MPI_SUCCESS)
743       CmiAbort("MPI_Iprobe failed\n");
744
745     if(!flg) break;
746 #if CMK_SMP_TRACE_COMMTHREAD
747     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
748 #endif
749
750     recd = 1;
751     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
752     msg = (char *) CmiAlloc(nbytes);
753     
754     START_EVENT();
755
756     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
757       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
758
759     /*END_EVENT(30);*/
760
761 #endif
762
763 #if CMK_SMP_TRACE_COMMTHREAD
764         traceBeginCommOp(msg);
765         traceChangeLastTimestamp(CpvAccess(projTraceStart));
766         traceEndCommOp(msg);
767         #if CMI_MPI_TRACE_MOREDETAILED
768         char tmp[32];
769         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
770         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
771         #endif
772 #endif  
773         
774     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
775     CMI_CHECK_CHECKSUM(msg, nbytes);
776     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
777       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
778       CmiFree(msg);
779       CmiAbort("Abort!\n");
780       continue;
781     }
782 #if CMK_NODE_QUEUE_AVAILABLE
783     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
784       CmiPushNode(msg);
785     else
786 #endif
787       CmiPushPE(CMI_DEST_RANK(msg), msg);
788
789 #if CMK_BROADCAST_SPANNING_TREE
790     if (CMI_BROADCAST_ROOT(msg))
791       SendSpanningChildren(nbytes, msg);
792 #elif CMK_BROADCAST_HYPERCUBE
793     if (CMI_GET_CYCLE(msg))
794       SendHypercube(nbytes, msg);
795 #endif
796         
797 #if CMI_EXERT_RECV_CAP
798         recvCnt++;
799 #endif  
800   }
801
802   
803 #if CMK_IMMEDIATE_MSG && !CMK_SMP
804   CmiHandleImmediate();
805 #endif
806   
807   MACHSTATE(2,"} PumpMsgs end ");
808   return recd;
809 }
810
811 /* blocking version */
812 static void PumpMsgsBlocking(void)
813 {
814   static int maxbytes = 20000000;
815   static char *buf = NULL;
816   int nbytes, flg;
817   MPI_Status sts;
818   char *msg;
819   int recd=0;
820
821   if (!PCQueueEmpty(CmiGetState()->recv)) return;
822   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
823   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
824   if (sent_msgs)  return;
825
826 #if 0
827   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
828 #endif
829
830   if (buf == NULL) {
831     buf = (char *) CmiAlloc(maxbytes);
832     _MEMCHECK(buf);
833   }
834
835
836 #if MPI_POST_RECV_COUNT > 0
837 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
838 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
839 #endif
840
841   START_EVENT();
842
843   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
844       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
845
846   /*END_EVENT(30);*/
847     
848    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
849    msg = (char *) CmiAlloc(nbytes);
850    memcpy(msg, buf, nbytes);
851
852 #if CMK_SMP_TRACE_COMMTHREAD
853         traceBeginCommOp(msg);
854         traceChangeLastTimestamp(CpvAccess(projTraceStart));
855         traceEndCommOp(msg);
856         #if CMI_MPI_TRACE_MOREDETAILED
857         char tmp[32];
858         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
859         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
860         #endif
861 #endif
862   
863 #if CMK_NODE_QUEUE_AVAILABLE
864    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
865       CmiPushNode(msg);
866    else
867 #endif
868       CmiPushPE(CMI_DEST_RANK(msg), msg);
869
870 #if CMK_BROADCAST_SPANNING_TREE
871    if (CMI_BROADCAST_ROOT(msg))
872       SendSpanningChildren(nbytes, msg);
873 #elif CMK_BROADCAST_HYPERCUBE
874    if (CMI_GET_CYCLE(msg))
875       SendHypercube(nbytes, msg);
876 #endif
877 }
878
879 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
880
881 #if CMK_SMP
882
883 static int inexit = 0;
884 static CmiNodeLock  exitLock = 0;
885
886 static int MsgQueueEmpty()
887 {
888   int i;
889 #if MULTI_SENDQUEUE
890   for (i=0; i<_Cmi_mynodesize; i++)
891     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
892 #else
893   return PCQueueEmpty(sendMsgBuf);
894 #endif
895   return 1;
896 }
897
898 static int SendMsgBuf();
899
900 /* test if all processors recv queues are empty */
901 static int RecvQueueEmpty()
902 {
903   int i;
904   for (i=0; i<_Cmi_mynodesize; i++) {
905     CmiState cs=CmiGetStateN(i);
906     if (!PCQueueEmpty(cs->recv)) return 0;
907   }
908   return 1;
909 }
910
911 /**
912 CommunicationServer calls MPI to send messages in the queues and probe message from network.
913 */
914
915 #define REPORT_COMM_METRICS 0
916 #if REPORT_COMM_METRICS
917 static double pumptime = 0.0;
918 static double releasetime = 0.0;
919 static double sendtime = 0.0;
920 #endif
921
922 static void CommunicationServer(int sleepTime)
923 {
924   int static count=0;
925 /*
926   count ++;
927   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
928 */
929 #if REPORT_COMM_METRICS
930   double t1, t2, t3, t4;
931   t1 = CmiWallTimer();
932 #endif
933   PumpMsgs();
934 #if REPORT_COMM_METRICS
935   t2 = CmiWallTimer();
936 #endif
937   CmiReleaseSentMessages();
938 #if REPORT_COMM_METRICS
939   t3 = CmiWallTimer();
940 #endif
941   SendMsgBuf();
942 #if REPORT_COMM_METRICS
943   t4 = CmiWallTimer();
944   pumptime += (t2-t1);
945   releasetime += (t3-t2);
946   sendtime += (t4-t3);
947 #endif
948 /*
949   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
950 */
951   if (inexit == CmiMyNodeSize()) {
952     MACHSTATE(2, "CommunicationServer exiting {");
953 #if 0
954     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
955 #endif
956     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
957       CmiReleaseSentMessages();
958       SendMsgBuf();
959       PumpMsgs();
960     }
961     MACHSTATE(2, "CommunicationServer barrier begin {");
962
963     START_EVENT();
964
965     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
966       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
967
968     END_EVENT(10);
969
970     MACHSTATE(2, "} CommunicationServer barrier end");
971 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
972     if (CmiMyNode() == 0){
973       CmiPrintf("End of program\n");
974     }
975 #endif
976     MACHSTATE(2, "} CommunicationServer EXIT");
977
978     ConverseCommonExit();   
979 #if REPORT_COMM_METRICS
980     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
981 #endif
982
983 #if ! CMK_AUTOBUILD
984     signal(SIGINT, signal_int);
985     MPI_Finalize();
986     #endif
987     exit(0);
988   }
989 }
990
991 #endif
992
993 static void CommunicationServerThread(int sleepTime)
994 {
995 #if CMK_SMP
996   CommunicationServer(sleepTime);
997 #endif
998 #if CMK_IMMEDIATE_MSG
999   CmiHandleImmediate();
1000 #endif
1001 }
1002
1003 #if CMK_NODE_QUEUE_AVAILABLE
1004 char *CmiGetNonLocalNodeQ(void)
1005 {
1006   CmiState cs = CmiGetState();
1007   char *result = 0;
1008   CmiIdleLock_checkMessage(&cs->idle);
1009 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1010     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1011     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1012     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1013     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1014     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1015 /*  }  */
1016   return result;
1017 }
1018 #endif
1019
1020 void *CmiGetNonLocal(void)
1021 {
1022   static int count=0;
1023   CmiState cs = CmiGetState();
1024   void *msg;
1025
1026 #if ! CMK_SMP
1027   if (CmiNumPes() == 1) return NULL;
1028 #endif
1029
1030   CmiIdleLock_checkMessage(&cs->idle);
1031   /* although it seems that lock is not needed, I found it crashes very often
1032      on mpi-smp without lock */
1033
1034 #if ! CMK_SMP
1035   CmiReleaseSentMessages();
1036   PumpMsgs();
1037 #endif
1038
1039   /* CmiLock(procState[cs->rank].recvLock); */
1040   msg =  PCQueuePop(cs->recv);
1041   /* CmiUnlock(procState[cs->rank].recvLock); */
1042
1043 /*
1044   if (msg) {
1045     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1046   else {
1047     count++;
1048     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1049   }
1050 */
1051 #if ! CMK_SMP
1052   if (no_outstanding_sends) {
1053     while (MsgQueueLen>0) {
1054       CmiReleaseSentMessages();
1055       PumpMsgs();
1056     }
1057   }
1058
1059   if(!msg) {
1060     CmiReleaseSentMessages();
1061     if (PumpMsgs())
1062       return  PCQueuePop(cs->recv);
1063     else
1064       return 0;
1065   }
1066 #endif
1067   return msg;
1068 }
1069
1070 /* called in non-smp mode */
1071 void CmiNotifyIdle(void)
1072 {
1073   CmiReleaseSentMessages();
1074   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1075 }
1076
1077
1078 /********************************************************
1079     The call to probe immediate messages has been renamed to
1080     CmiMachineProgressImpl
1081 ******************************************************/
1082 /* user call to handle immediate message, only useful in non SMP version
1083    using polling method to schedule message.
1084 */
1085 /*
1086 #if CMK_IMMEDIATE_MSG
1087 void CmiProbeImmediateMsg()
1088 {
1089 #if !CMK_SMP
1090   PumpMsgs();
1091   CmiHandleImmediate();
1092 #endif
1093 }
1094 #endif
1095 */
1096
1097 /* Network progress function is used to poll the network when for
1098    messages. This flushes receive buffers on some  implementations*/
1099 #if CMK_MACHINE_PROGRESS_DEFINED
1100 void CmiMachineProgressImpl()
1101 {
1102 #if !CMK_SMP
1103     PumpMsgs();
1104 #if CMK_IMMEDIATE_MSG
1105     CmiHandleImmediate();
1106 #endif
1107 #else
1108     /*Not implemented yet. Communication server does not seem to be
1109       thread safe, so only communication thread call it */
1110     if (CmiMyRank() == CmiMyNodeSize())
1111         CommunicationServerThread(0);
1112 #endif
1113 }
1114 #endif
1115
1116 /********************* MESSAGE SEND FUNCTIONS ******************/
1117
1118 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1119
1120 static void CmiSendSelf(char *msg)
1121 {
1122 #if CMK_IMMEDIATE_MSG
1123     if (CmiIsImmediate(msg)) {
1124       /* CmiBecomeNonImmediate(msg); */
1125       CmiPushImmediateMsg(msg);
1126       CmiHandleImmediate();
1127       return;
1128     }
1129 #endif
1130     CQdCreate(CpvAccess(cQdState), 1);
1131     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1132 }
1133
1134 void CmiSyncSendFn(int destPE, int size, char *msg)
1135 {
1136   CmiState cs = CmiGetState();
1137   char *dupmsg = (char *) CmiAlloc(size);
1138   memcpy(dupmsg, msg, size);
1139
1140   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1141
1142   if (cs->pe==destPE) {
1143     CmiSendSelf(dupmsg);
1144   }
1145   else
1146     CmiAsyncSendFn_(destPE, size, dupmsg);
1147 }
1148
1149 #if CMK_SMP
1150
1151 /* called by communication thread in SMP */
1152 static int SendMsgBuf()
1153 {
1154   SMSG_LIST *msg_tmp;
1155   char *msg;
1156   int node, rank, size;
1157   int i;
1158   int sent = 0;
1159
1160 #if CMI_EXERT_SEND_CAP
1161         int sentCnt = 0;
1162 #endif  
1163         
1164   MACHSTATE(2,"SendMsgBuf begin {");
1165 #if MULTI_SENDQUEUE
1166   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1167   {
1168     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1169     {
1170       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1171 #else
1172     /* single message sending queue */
1173     /* CmiLock(sendMsgBufLock); */
1174     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1175     /* CmiUnlock(sendMsgBufLock); */
1176     while (NULL != msg_tmp)
1177     {
1178 #endif
1179       node = msg_tmp->destpe;
1180       size = msg_tmp->size;
1181       msg = msg_tmp->msg;
1182       msg_tmp->next = 0;
1183       while (MsgQueueLen > request_max) {
1184         CmiReleaseSentMessages();
1185         PumpMsgs();
1186       }
1187       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1188       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1189       CMI_SET_CHECKSUM(msg, size);
1190
1191 #if MPI_POST_RECV_COUNT > 0
1192         if(size <= MPI_POST_RECV_SIZE){
1193
1194           START_EVENT();
1195           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1196                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1197
1198           STOP_EVENT(40);
1199         }
1200         else {
1201             START_EVENT();
1202             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1203                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1204             STOP_EVENT(40);
1205         }
1206 #else
1207         START_EVENT();
1208         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1209             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1210         /*END_EVENT(40);*/
1211 #endif
1212         
1213 #if CMK_SMP_TRACE_COMMTHREAD
1214         traceBeginCommOp(msg);
1215         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1216         /* traceSendMsgComm must execute after traceBeginCommOp because
1217          * we pretend we execute an entry method, and inside this we
1218          * pretend we will send another message. Otherwise how could
1219          * a message creation just before an entry method invocation?
1220          * If such logic is broken, the projections will not trace
1221          * messages correctly! -Chao Mei
1222          */
1223         traceSendMsgComm(msg);
1224         traceEndCommOp(msg);
1225         #if CMI_MPI_TRACE_MOREDETAILED
1226         char tmp[64];
1227         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1228         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1229         #endif
1230 #endif
1231                 
1232                 
1233       MACHSTATE(3,"}MPI_send end");
1234       MsgQueueLen++;
1235       if(sent_msgs==0)
1236         sent_msgs = msg_tmp;
1237       else
1238         end_sent->next = msg_tmp;
1239       end_sent = msg_tmp;
1240       sent=1;
1241           
1242 #if CMI_EXERT_SEND_CAP    
1243           if(++sentCnt == SEND_CAP) break;
1244 #endif    
1245           
1246 #if ! MULTI_SENDQUEUE
1247       /* CmiLock(sendMsgBufLock); */
1248       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1249       /* CmiUnlock(sendMsgBufLock); */
1250 #endif
1251     }
1252 #if MULTI_SENDQUEUE
1253   }
1254 #endif
1255   MACHSTATE(2,"}SendMsgBuf end ");
1256   return sent;
1257 }
1258
1259 void EnqueueMsg(void *m, int size, int node)
1260 {
1261   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1262   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1263   msg_tmp->msg = m;
1264   msg_tmp->size = size;
1265   msg_tmp->destpe = node;
1266         
1267 #if CMK_SMP_TRACE_COMMTHREAD
1268         msg_tmp->srcpe = CmiMyPe();
1269 #endif  
1270
1271 #if MULTI_SENDQUEUE
1272   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1273 #else
1274   CmiLock(sendMsgBufLock);
1275   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1276   CmiUnlock(sendMsgBufLock);
1277 #endif
1278         
1279   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1280 }
1281
1282 #endif
1283
1284 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1285 {
1286   CmiState cs = CmiGetState();
1287   SMSG_LIST *msg_tmp;
1288   CmiUInt2  rank, node;
1289
1290   if(destPE == cs->pe) {
1291     char *dupmsg = (char *) CmiAlloc(size);
1292     memcpy(dupmsg, msg, size);
1293     CmiSendSelf(dupmsg);
1294     return 0;
1295   }
1296   CQdCreate(CpvAccess(cQdState), 1);
1297 #if CMK_SMP
1298   node = CmiNodeOf(destPE);
1299   rank = CmiRankOf(destPE);
1300   if (node == CmiMyNode())  {
1301     CmiPushPE(rank, msg);
1302     return 0;
1303   }
1304   CMI_DEST_RANK(msg) = rank;
1305   EnqueueMsg(msg, size, node);
1306   return 0;
1307 #else
1308   /* non smp */
1309   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1310   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1311   msg_tmp->msg = msg;
1312   msg_tmp->next = 0;
1313   while (MsgQueueLen > request_max) {
1314         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1315         CmiReleaseSentMessages();
1316         PumpMsgs();
1317   }
1318   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1319   CMI_SET_CHECKSUM(msg, size);
1320
1321 #if MPI_POST_RECV_COUNT > 0
1322         if(size <= MPI_POST_RECV_SIZE){
1323
1324           START_EVENT();
1325           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1326                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1327           END_EVENT(40);
1328         }
1329         else {
1330           START_EVENT();
1331           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1332                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1333           END_EVENT(40);
1334         }
1335 #else
1336   START_EVENT();
1337   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1338     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1339   END_EVENT(40);
1340 #endif
1341
1342   MsgQueueLen++;
1343   if(sent_msgs==0)
1344     sent_msgs = msg_tmp;
1345   else
1346     end_sent->next = msg_tmp;
1347   end_sent = msg_tmp;
1348   return (CmiCommHandle) &(msg_tmp->req);
1349 #endif              /* non-smp */
1350 }
1351
1352 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1353 {
1354   CMI_SET_BROADCAST_ROOT(msg, 0);
1355   CmiAsyncSendFn_(destPE, size, msg);
1356 }
1357
1358 void CmiFreeSendFn(int destPE, int size, char *msg)
1359 {
1360   CmiState cs = CmiGetState();
1361   CMI_SET_BROADCAST_ROOT(msg, 0);
1362
1363   if (cs->pe==destPE) {
1364     CmiSendSelf(msg);
1365   } else {
1366     CmiAsyncSendFn_(destPE, size, msg);
1367   }
1368 }
1369
1370 /*********************** BROADCAST FUNCTIONS **********************/
1371
1372 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1373 void CmiSyncSendFn1(int destPE, int size, char *msg)
1374 {
1375   CmiState cs = CmiGetState();
1376   char *dupmsg = (char *) CmiAlloc(size);
1377   memcpy(dupmsg, msg, size);
1378   if (cs->pe==destPE)
1379     CmiSendSelf(dupmsg);
1380   else
1381     CmiAsyncSendFn_(destPE, size, dupmsg);
1382 }
1383
1384 /* send msg to its spanning children in broadcast. G. Zheng */
1385 void SendSpanningChildren(int size, char *msg)
1386 {
1387   CmiState cs = CmiGetState();
1388   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1389   int i;
1390
1391   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1392
1393   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1394     int p = cs->pe-startpe;
1395     if (p<0) p+=_Cmi_numpes;
1396     p = BROADCAST_SPANNING_FACTOR*p + i;
1397     if (p > _Cmi_numpes - 1) break;
1398     p += startpe;
1399     p = p%_Cmi_numpes;
1400     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1401     CmiSyncSendFn1(p, size, msg);
1402   }
1403 }
1404
1405 #include <math.h>
1406
1407 /* send msg along the hypercube in broadcast. (Sameer) */
1408 void SendHypercube(int size, char *msg)
1409 {
1410   CmiState cs = CmiGetState();
1411   int curcycle = CMI_GET_CYCLE(msg);
1412   int i;
1413
1414   double logp = CmiNumPes();
1415   logp = log(logp)/log(2.0);
1416   logp = ceil(logp);
1417
1418   /*  CmiPrintf("In hypercube\n"); */
1419
1420   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1421
1422   for (i = curcycle; i < logp; i++) {
1423     int p = cs->pe ^ (1 << i);
1424
1425     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1426
1427     if(p < CmiNumPes()) {
1428       CMI_SET_CYCLE(msg, i + 1);
1429       CmiSyncSendFn1(p, size, msg);
1430     }
1431   }
1432 }
1433
1434 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1435 {
1436   CmiState cs = CmiGetState();
1437 #if CMK_BROADCAST_SPANNING_TREE
1438   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1439   SendSpanningChildren(size, msg);
1440
1441 #elif CMK_BROADCAST_HYPERCUBE
1442   CMI_SET_CYCLE(msg, 0);
1443   SendHypercube(size, msg);
1444
1445 #else
1446   int i;
1447
1448   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1449     CmiSyncSendFn(i, size,msg) ;
1450   for ( i=0; i<cs->pe; i++ )
1451     CmiSyncSendFn(i, size,msg) ;
1452 #endif
1453
1454   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1455 }
1456
1457
1458 /*  FIXME: luckily async is never used  G. Zheng */
1459 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1460 {
1461   CmiState cs = CmiGetState();
1462   int i ;
1463
1464   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1465     CmiAsyncSendFn(i,size,msg) ;
1466   for ( i=0; i<cs->pe; i++ )
1467     CmiAsyncSendFn(i,size,msg) ;
1468
1469   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1470   CmiAbort("CmiAsyncBroadcastFn should never be called");
1471   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1472 }
1473
1474 void CmiFreeBroadcastFn(int size, char *msg)
1475 {
1476    CmiSyncBroadcastFn(size,msg);
1477    CmiFree(msg);
1478 }
1479
1480 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1481 {
1482
1483 #if CMK_BROADCAST_SPANNING_TREE
1484   CmiState cs = CmiGetState();
1485   CmiSyncSendFn(cs->pe, size,msg) ;
1486   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1487   SendSpanningChildren(size, msg);
1488
1489 #elif CMK_BROADCAST_HYPERCUBE
1490   CmiState cs = CmiGetState();
1491   CmiSyncSendFn(cs->pe, size,msg) ;
1492   CMI_SET_CYCLE(msg, 0);
1493   SendHypercube(size, msg);
1494
1495 #else
1496     int i ;
1497
1498   for ( i=0; i<_Cmi_numpes; i++ )
1499     CmiSyncSendFn(i,size,msg) ;
1500 #endif
1501
1502   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1503 }
1504
1505 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1506 {
1507   int i ;
1508
1509   for ( i=1; i<_Cmi_numpes; i++ )
1510     CmiAsyncSendFn(i,size,msg) ;
1511
1512   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1513
1514   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1515 }
1516
1517 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1518 {
1519
1520 #if CMK_BROADCAST_SPANNING_TREE
1521   CmiState cs = CmiGetState();
1522   CmiSyncSendFn(cs->pe, size,msg) ;
1523   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1524   SendSpanningChildren(size, msg);
1525
1526 #elif CMK_BROADCAST_HYPERCUBE
1527   CmiState cs = CmiGetState();
1528   CmiSyncSendFn(cs->pe, size,msg) ;
1529   CMI_SET_CYCLE(msg, 0);
1530   SendHypercube(size, msg);
1531
1532 #else
1533   int i ;
1534
1535   for ( i=0; i<_Cmi_numpes; i++ )
1536     CmiSyncSendFn(i,size,msg) ;
1537 #endif
1538   CmiFree(msg) ;
1539   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1540 }
1541
1542 #if CMK_NODE_QUEUE_AVAILABLE
1543
1544 static void CmiSendNodeSelf(char *msg)
1545 {
1546 #if CMK_IMMEDIATE_MSG
1547 #if 0
1548     if (CmiIsImmediate(msg) && !_immRunning) {
1549       /*CmiHandleImmediateMessage(msg); */
1550       CmiPushImmediateMsg(msg);
1551       CmiHandleImmediate();
1552       return;
1553     }
1554 #endif
1555     if (CmiIsImmediate(msg))
1556     {
1557       CmiPushImmediateMsg(msg);
1558       if (!_immRunning) CmiHandleImmediate();
1559       return;
1560     }
1561 #endif
1562     CQdCreate(CpvAccess(cQdState), 1);
1563     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1564     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1565     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1566 }
1567
1568 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1569 {
1570   int i;
1571   SMSG_LIST *msg_tmp;
1572   char *dupmsg;
1573
1574   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1575   switch (dstNode) {
1576   case NODE_BROADCAST_ALL:
1577     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1578   case NODE_BROADCAST_OTHERS:
1579     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1580     for (i=0; i<_Cmi_numnodes; i++)
1581       if (i!=_Cmi_mynode) {
1582         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1583       }
1584     break;
1585   default:
1586     dupmsg = (char *)CmiCopyMsg(msg,size);
1587     if(dstNode == _Cmi_mynode) {
1588       CmiSendNodeSelf(dupmsg);
1589     }
1590     else {
1591       CQdCreate(CpvAccess(cQdState), 1);
1592       EnqueueMsg(dupmsg, size, dstNode);
1593     }
1594   }
1595   return 0;
1596 }
1597
1598 void CmiSyncNodeSendFn(int p, int s, char *m)
1599 {
1600   CmiAsyncNodeSendFn(p, s, m);
1601 }
1602
1603 /* need */
1604 void CmiFreeNodeSendFn(int p, int s, char *m)
1605 {
1606   CmiAsyncNodeSendFn(p, s, m);
1607   CmiFree(m);
1608 }
1609
1610 /* need */
1611 void CmiSyncNodeBroadcastFn(int s, char *m)
1612 {
1613   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1614 }
1615
1616 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1617 {
1618 }
1619
1620 /* need */
1621 void CmiFreeNodeBroadcastFn(int s, char *m)
1622 {
1623   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1624   CmiFree(m);
1625 }
1626
1627 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1628 {
1629   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1630 }
1631
1632 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1633 {
1634   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1635 }
1636
1637 /* need */
1638 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1639 {
1640   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1641   CmiFree(m);
1642 }
1643 #endif
1644
1645 /************************** MAIN ***********************************/
1646 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1647
1648 void ConverseExit(void)
1649 {
1650 #if ! CMK_SMP
1651   while(!CmiAllAsyncMsgsSent()) {
1652     PumpMsgs();
1653     CmiReleaseSentMessages();
1654   }
1655   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1656     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1657
1658   ConverseCommonExit();
1659 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1660   if (CmiMyPe() == 0){
1661     CmiPrintf("End of program\n");
1662 #if MPI_POST_RECV_COUNT > 0
1663     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1664 #endif
1665 }
1666 #endif
1667 #if ! CMK_AUTOBUILD
1668   signal(SIGINT, signal_int);
1669   MPI_Finalize();
1670 #endif
1671   exit(0);
1672
1673 #else
1674     /* SMP version, communication thread will exit */
1675   ConverseCommonExit();
1676   /* atomic increment */
1677   CmiLock(exitLock);
1678   inexit++;
1679   CmiUnlock(exitLock);
1680   while (1) CmiYield();
1681 #endif
1682 }
1683
1684 static void registerMPITraceEvents() {
1685 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1686     traceRegisterUserEvent("MPI_Barrier", 10);
1687     traceRegisterUserEvent("MPI_Send", 20);
1688     traceRegisterUserEvent("MPI_Recv", 30);
1689     traceRegisterUserEvent("MPI_Isend", 40);
1690     traceRegisterUserEvent("MPI_Irecv", 50);
1691     traceRegisterUserEvent("MPI_Test", 60);
1692     traceRegisterUserEvent("MPI_Iprobe", 70);
1693 #endif
1694 }
1695
1696
1697 static char     **Cmi_argv;
1698 static char     **Cmi_argvcopy;
1699 static CmiStartFn Cmi_startfn;   /* The start function */
1700 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1701
1702 typedef struct {
1703   int sleepMs; /*Milliseconds to sleep while idle*/
1704   int nIdles; /*Number of times we've been idle in a row*/
1705   CmiState cs; /*Machine state*/
1706 } CmiIdleState;
1707
1708 static CmiIdleState *CmiNotifyGetState(void)
1709 {
1710   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1711   s->sleepMs=0;
1712   s->nIdles=0;
1713   s->cs=CmiGetState();
1714   return s;
1715 }
1716
1717 static void CmiNotifyBeginIdle(CmiIdleState *s)
1718 {
1719   s->sleepMs=0;
1720   s->nIdles=0;
1721 }
1722
1723 static void CmiNotifyStillIdle(CmiIdleState *s)
1724 {
1725 #if ! CMK_SMP
1726   CmiReleaseSentMessages();
1727   PumpMsgs();
1728 #else
1729 /*  CmiYield();  */
1730 #endif
1731
1732 #if 1
1733   {
1734   int nSpins=20; /*Number of times to spin before sleeping*/
1735   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1736   s->nIdles++;
1737   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1738     s->sleepMs+=2;
1739     if (s->sleepMs>10) s->sleepMs=10;
1740   }
1741   /*Comm. thread will listen on sockets-- just sleep*/
1742   if (s->sleepMs>0) {
1743     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1744     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1745     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1746   }
1747   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1748   }
1749 #endif
1750 }
1751
1752 #if MACHINE_DEBUG_LOG
1753 FILE *debugLog = NULL;
1754 #endif
1755
1756 static int machine_exit_idx;
1757 static void machine_exit(char *m) {
1758   EmergencyExit();
1759   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1760   fflush(stdout);
1761   CmiNodeBarrier();
1762   if (CmiMyRank() == 0) {
1763     MPI_Barrier(MPI_COMM_WORLD);
1764     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1765     MPI_Abort(MPI_COMM_WORLD, 1);
1766   } else {
1767     while (1) CmiYield();
1768   }
1769 }
1770
1771 static void KillOnAllSigs(int sigNo) {
1772   static int already_in_signal_handler = 0;
1773   char *m;
1774   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1775   already_in_signal_handler = 1;
1776 #if CMK_CCS_AVAILABLE
1777   if (CpvAccess(cmiArgDebugFlag)) {
1778     CpdNotify(CPD_SIGNAL, sigNo);
1779     CpdFreeze();
1780   }
1781 #endif
1782   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1783       "Signal: %d\n",CmiMyPe(),sigNo);
1784   CmiPrintStackTrace(1);
1785
1786   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1787   CmiSetHandler(m, machine_exit_idx);
1788   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1789   machine_exit(m);
1790 }
1791
1792 static void ConverseRunPE(int everReturn)
1793 {
1794   CmiIdleState *s=CmiNotifyGetState();
1795   CmiState cs;
1796   char** CmiMyArgv;
1797
1798   CmiNodeAllBarrier();
1799
1800   cs = CmiGetState();
1801   CpvInitialize(void *,CmiLocalQueue);
1802   CpvAccess(CmiLocalQueue) = cs->localqueue;
1803
1804   if (CmiMyRank())
1805     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1806   else
1807     CmiMyArgv=Cmi_argv;
1808
1809   CthInit(CmiMyArgv);
1810
1811   ConverseCommonInit(CmiMyArgv);
1812   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1813
1814 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1815   CpvInitialize(double, projTraceStart);
1816   /* only PE 0 needs to care about registration (to generate sts file). */
1817   if (CmiMyPe() == 0) {
1818     registerMachineUserEventsFunction(&registerMPITraceEvents);
1819   }
1820 #endif
1821
1822   /* initialize the network progress counter*/
1823   /* Network progress function is used to poll the network when for
1824      messages. This flushes receive buffers on some  implementations*/
1825   CpvInitialize(int , networkProgressCount);
1826   CpvAccess(networkProgressCount) = 0;
1827
1828 #if CMK_SMP
1829   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1830   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1831 #else
1832   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1833 #endif
1834
1835 #if MACHINE_DEBUG_LOG
1836   if (CmiMyRank() == 0) {
1837     char ln[200];
1838     sprintf(ln,"debugLog.%d",CmiMyNode());
1839     debugLog=fopen(ln,"w");
1840   }
1841 #endif
1842
1843   /* Converse initialization finishes, immediate messages can be processed.
1844      node barrier previously should take care of the node synchronization */
1845   _immediateReady = 1;
1846
1847   /* communication thread */
1848   if (CmiMyRank() == CmiMyNodeSize()) {
1849     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1850     while (1) CommunicationServerThread(5);
1851   }
1852   else {  /* worker thread */
1853   if (!everReturn) {
1854     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1855     if (Cmi_usrsched==0) CsdScheduler(-1);
1856     ConverseExit();
1857   }
1858   }
1859 }
1860
1861 static char *thread_level_tostring(int thread_level)
1862 {
1863 #if CMK_MPI_INIT_THREAD
1864   switch (thread_level) {
1865   case MPI_THREAD_SINGLE:
1866       return "MPI_THREAD_SINGLE";
1867   case MPI_THREAD_FUNNELED:
1868       return "MPI_THREAD_FUNNELED";
1869   case MPI_THREAD_SERIALIZED:
1870       return "MPI_THREAD_SERIALIZED";
1871   case MPI_THREAD_MULTIPLE :
1872       return "MPI_THREAD_MULTIPLE ";
1873   default: {
1874       char *str = (char*)malloc(5);
1875       sprintf(str,"%d", thread_level);
1876       return str;
1877       }
1878   }
1879   return  "unknown";
1880 #else
1881   char *str = (char*)malloc(5);
1882   sprintf(str,"%d", thread_level);
1883   return str;
1884 #endif
1885 }
1886
1887 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1888 {
1889   int n,i;
1890   int ver, subver;
1891   int provided;
1892   int thread_level;
1893
1894 #if MACHINE_DEBUG
1895   debugLog=NULL;
1896 #endif
1897 #if CMK_USE_HP_MAIN_FIX
1898 #if FOR_CPLUS
1899   _main(argc,argv);
1900 #endif
1901 #endif
1902
1903 #if CMK_MPI_INIT_THREAD
1904 #if CMK_SMP
1905   thread_level = MPI_THREAD_FUNNELED;
1906 #else
1907   thread_level = MPI_THREAD_SINGLE;
1908 #endif
1909   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1910   _thread_provided = provided;
1911 #else
1912   MPI_Init(&argc, &argv);
1913   thread_level = 0;
1914   provided = -1;
1915 #endif
1916   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1917   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1918
1919   MPI_Get_version(&ver, &subver);
1920   if (_Cmi_mynode == 0) {
1921     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1922   }
1923
1924   /* processor per node */
1925   _Cmi_mynodesize = 1;
1926   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
1927     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1928 #if ! CMK_SMP
1929   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
1930     CmiAbort("+ppn cannot be used in non SMP version!\n");
1931 #endif
1932   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1933   if (idleblock && _Cmi_mynode == 0) {
1934     printf("Charm++: Running in idle blocking mode.\n");
1935   }
1936
1937   /* setup signal handlers */
1938   signal(SIGSEGV, KillOnAllSigs);
1939   signal(SIGFPE, KillOnAllSigs);
1940   signal(SIGILL, KillOnAllSigs);
1941   signal_int = signal(SIGINT, KillOnAllSigs);
1942   signal(SIGTERM, KillOnAllSigs);
1943   signal(SIGABRT, KillOnAllSigs);
1944 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1945   signal(SIGQUIT, KillOnAllSigs);
1946   signal(SIGBUS, KillOnAllSigs);
1947 /*#     if CMK_HANDLE_SIGUSR
1948   signal(SIGUSR1, HandleUserSignals);
1949   signal(SIGUSR2, HandleUserSignals);
1950 #     endif*/
1951 #   endif /*UNIX*/
1952   
1953 #if CMK_NO_OUTSTANDING_SENDS
1954   no_outstanding_sends=1;
1955 #endif
1956   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1957     no_outstanding_sends = 1;
1958     if (_Cmi_mynode == 0)
1959       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1960         no_outstanding_sends?"":" not");
1961   }
1962   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1963   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1964   Cmi_argvcopy = CmiCopyArgs(argv);
1965   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1966   /* find dim = log2(numpes), to pretend we are a hypercube */
1967   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1968     Cmi_dim++ ;
1969  /* CmiSpanTreeInit();*/
1970   request_max=MAX_QLEN;
1971   CmiGetArgInt(argv,"+requestmax",&request_max);
1972   /*printf("request max=%d\n", request_max);*/
1973
1974   /* checksum flag */
1975   if (CmiGetArgFlag(argv,"+checksum")) {
1976 #if !CMK_OPTIMIZE
1977     checksum_flag = 1;
1978     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1979 #else
1980     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1981 #endif
1982   }
1983
1984   {
1985   int debug = CmiGetArgFlag(argv,"++debug");
1986   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
1987   if (debug || debug_no_pause)
1988   {   /*Pause so user has a chance to start and attach debugger*/
1989 #if CMK_HAS_GETPID
1990     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
1991     fflush(stdout);
1992     if (!debug_no_pause)
1993       sleep(15);
1994 #else
1995     printf("++debug ignored.\n");
1996 #endif
1997   }
1998   }
1999
2000 #if MPI_POST_RECV_COUNT > 0
2001
2002   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2003   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2004   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2005   CpvInitialize(char*,CmiPostedRecvBuffers);
2006
2007     /* Post some extra recvs to help out with incoming messages */
2008     /* On some MPIs the messages are unexpected and thus slow */
2009
2010     /* An array of request handles for posted recvs */
2011     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2012
2013     /* An array of buffers for posted recvs */
2014     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2015
2016     /* Post Recvs */
2017     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2018         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2019                     MPI_POST_RECV_SIZE,
2020                     MPI_BYTE,
2021                     MPI_ANY_SOURCE,
2022                     POST_RECV_TAG,
2023                     MPI_COMM_WORLD,
2024                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2025           CmiAbort("MPI_Irecv failed\n");
2026     }
2027
2028 #endif
2029
2030
2031
2032   /* CmiTimerInit(); */
2033
2034 #if 0
2035   CthInit(argv);
2036   ConverseCommonInit(argv);
2037
2038   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2039   if (initret==0) {
2040     fn(CmiGetArgc(argv), argv);
2041     if (usched==0) CsdScheduler(-1);
2042     ConverseExit();
2043   }
2044 #endif
2045
2046   CsvInitialize(CmiNodeState, NodeState);
2047   CmiNodeStateInit(&CsvAccess(NodeState));
2048
2049   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2050
2051   for (i=0; i<_Cmi_mynodesize+1; i++) {
2052 #if MULTI_SENDQUEUE
2053     procState[i].sendMsgBuf = PCQueueCreate();
2054 #endif
2055     procState[i].recvLock = CmiCreateLock();
2056   }
2057 #if CMK_SMP
2058 #if !MULTI_SENDQUEUE
2059   sendMsgBuf = PCQueueCreate();
2060   sendMsgBufLock = CmiCreateLock();
2061 #endif
2062   exitLock = CmiCreateLock();            /* exit count lock */
2063 #endif
2064
2065   /* Network progress function is used to poll the network when for
2066      messages. This flushes receive buffers on some  implementations*/
2067   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2068   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2069
2070   CmiStartThreads(argv);
2071   ConverseRunPE(initret);
2072 }
2073
2074 /***********************************************************************
2075  *
2076  * Abort function:
2077  *
2078  ************************************************************************/
2079
2080 void CmiAbort(const char *message)
2081 {
2082   char *m;
2083   /* if CharmDebug is attached simply try to send a message to it */
2084 #if CMK_CCS_AVAILABLE
2085   if (CpvAccess(cmiArgDebugFlag)) {
2086     CpdNotify(CPD_ABORT, message);
2087     CpdFreeze();
2088   }
2089 #endif  
2090   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2091         "Reason: %s\n",CmiMyPe(),message);
2092  /*  CmiError(message); */
2093   CmiPrintStackTrace(0);
2094   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2095   CmiSetHandler(m, machine_exit_idx);
2096   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2097   machine_exit(m);
2098   /* Program never reaches here */
2099   MPI_Abort(MPI_COMM_WORLD, 1);
2100 }
2101
2102
2103 #if 0
2104
2105 /* ****************************************************************** */
2106 /*    The following internal functions implement recd msg queue       */
2107 /* ****************************************************************** */
2108
2109 static void ** AllocBlock(unsigned int len)
2110 {
2111   void ** blk;
2112
2113   blk=(void **)CmiAlloc(len*sizeof(void *));
2114   if(blk==(void **)0) {
2115     CmiError("Cannot Allocate Memory!\n");
2116     MPI_Abort(MPI_COMM_WORLD, 1);
2117   }
2118   return blk;
2119 }
2120
2121 static void
2122 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2123 {
2124   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2125   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2126 }
2127
2128 void recdQueueInit(void)
2129 {
2130   recdQueue_blk = AllocBlock(BLK_LEN);
2131   recdQueue_blk_len = BLK_LEN;
2132   recdQueue_first = 0;
2133   recdQueue_len = 0;
2134 }
2135
2136 void recdQueueAddToBack(void *element)
2137 {
2138 #if NODE_0_IS_CONVHOST
2139   inside_comm = 1;
2140 #endif
2141   if(recdQueue_len==recdQueue_blk_len) {
2142     void **blk;
2143     recdQueue_blk_len *= 3;
2144     blk = AllocBlock(recdQueue_blk_len);
2145     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2146     CmiFree(recdQueue_blk);
2147     recdQueue_blk = blk;
2148     recdQueue_first = 0;
2149   }
2150   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2151 #if NODE_0_IS_CONVHOST
2152   inside_comm = 0;
2153 #endif
2154 }
2155
2156
2157 void * recdQueueRemoveFromFront(void)
2158 {
2159   if(recdQueue_len) {
2160     void *element;
2161     element = recdQueue_blk[recdQueue_first++];
2162     recdQueue_first %= recdQueue_blk_len;
2163     recdQueue_len--;
2164     return element;
2165   }
2166   return 0;
2167 }
2168
2169 #endif
2170
2171 /*@}*/