Cleaned the codes for SendSpanningChildren so that the non-SMP version shares as...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #include "machine.h"
66
67 #include "pcqueue.h"
68
69 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
70
71 #if CMK_BLUEGENEL
72 #define MAX_QLEN 8
73 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
74 #else
75 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
76 #define MAX_QLEN 200
77 #endif
78
79 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && ! CMK_TRACE_IN_CHARM
80 CpvStaticDeclare(double, projTraceStart);
81 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
82 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
83 #else
84 # define  START_EVENT()
85 # define  END_EVENT(x)
86 #endif
87
88 /*
89     To reduce the buffer used in broadcast and distribute the load from
90   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
91   spanning tree broadcast algorithm.
92     This will use the fourth short in message as an indicator of spanning tree
93   root.
94 */
95 #define CMK_BROADCAST_SPANNING_TREE    1
96 #define CMK_BROADCAST_HYPERCUBE        0
97
98 #define BROADCAST_SPANNING_FACTOR      4
99 /* The number of children used when a msg is broadcast inside a node */
100 #define BROADCAST_SPANNING_INTRA_FACTOR      8
101
102 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
103 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
104
105 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
106 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
107
108 /* FIXME: need a random number that everyone agrees ! */
109 #define CHARM_MAGIC_NUMBER               126
110
111 #if !CMK_OPTIMIZE
112 static int checksum_flag = 0;
113 #define CMI_SET_CHECKSUM(msg, len)      \
114         if (checksum_flag)  {   \
115           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
116           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
117         }
118 #define CMI_CHECK_CHECKSUM(msg, len)    \
119         if (checksum_flag)      \
120           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
121             CmiAbort("Fatal error: checksum doesn't agree!\n");
122 #else
123 #define CMI_SET_CHECKSUM(msg, len)
124 #define CMI_CHECK_CHECKSUM(msg, len)
125 #endif
126
127 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
128 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
129 #endif
130
131
132 /** 
133     If MPI_POST_RECV is defined, we provide default values for size 
134     and number of posted recieves. If MPI_POST_RECV_COUNT is set
135     then a default value for MPI_POST_RECV_SIZE is used if not specified
136     by the user.
137 */
138 #ifdef MPI_POST_RECV
139 #define MPI_POST_RECV_COUNT 10
140 #undef MPI_POST_RECV
141 #endif
142 #if MPI_POST_RECV_COUNT > 0
143 #warning "Using MPI posted receives which have not yet been tested"
144 #ifndef MPI_POST_RECV_SIZE
145 #define MPI_POST_RECV_SIZE 200
146 #endif
147 /* #undef  MPI_POST_RECV_DEBUG  */
148 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
149 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
150 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
151 CpvDeclare(char*,CmiPostedRecvBuffers);
152 #endif
153
154 /*
155  to avoid MPI's in order delivery, changing MPI Tag all the time
156 */
157 #define TAG     1375
158
159 #if MPI_POST_RECV_COUNT > 0
160 #define POST_RECV_TAG TAG+1
161 #define BARRIER_ZERO_TAG TAG
162 #else
163 #define BARRIER_ZERO_TAG     1375
164 #endif
165
166 #include <signal.h>
167 void (*signal_int)(int);
168
169 /*
170 static int mpi_tag = TAG;
171 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
172 */
173
174 static int        _thread_provided = -1;
175 int               _Cmi_numpes;
176 int               _Cmi_mynode;    /* Which address space am I */
177 int               _Cmi_mynodesize;/* Number of processors in my address space */
178 int               _Cmi_numnodes;  /* Total number of address spaces */
179 int               _Cmi_numpes;    /* Total number of processors */
180 static int        Cmi_nodestart; /* First processor in this address space */
181 CpvDeclare(void*, CmiLocalQueue);
182
183 /*Network progress utility variables. Period controls the rate at
184   which the network poll is called */
185 CpvDeclare(unsigned , networkProgressCount);
186 int networkProgressPeriod;
187
188 int               idleblock = 0;
189
190 #define BLK_LEN  512
191
192 #if CMK_NODE_QUEUE_AVAILABLE
193 #define DGRAM_NODEMESSAGE   (0xFB)
194
195 #define NODE_BROADCAST_OTHERS (-1)
196 #define NODE_BROADCAST_ALL    (-2)
197 #endif
198
199 #if 0
200 static void **recdQueue_blk;
201 static unsigned int recdQueue_blk_len;
202 static unsigned int recdQueue_first;
203 static unsigned int recdQueue_len;
204 static void recdQueueInit(void);
205 static void recdQueueAddToBack(void *element);
206 static void *recdQueueRemoveFromFront(void);
207 #endif
208
209 static void ConverseRunPE(int everReturn);
210 static void CommunicationServer(int sleepTime);
211 static void CommunicationServerThread(int sleepTime);
212
213 typedef struct msg_list {
214      char *msg;
215      struct msg_list *next;
216      int size, destpe;
217
218 #if CMK_SMP_TRACE_COMMTHREAD
219         int srcpe;
220 #endif  
221         
222      MPI_Request req;
223 } SMSG_LIST;
224
225 int MsgQueueLen=0;
226 static int request_max;
227
228 static SMSG_LIST *sent_msgs=0;
229 static SMSG_LIST *end_sent=0;
230
231 static int Cmi_dim;
232
233 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
234
235 #if NODE_0_IS_CONVHOST
236 int inside_comm = 0;
237 #endif
238
239 void CmiAbort(const char *message);
240 static void PerrorExit(const char *msg);
241
242 void SendSpanningChildren(int size, char *msg);
243 void SendHypercube(int size, char *msg);
244
245 static void PerrorExit(const char *msg)
246 {
247   perror(msg);
248   exit(1);
249 }
250
251 extern unsigned char computeCheckSum(unsigned char *data, int len);
252
253 /**************************  TIMER FUNCTIONS **************************/
254
255 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
256
257 /* MPI calls are not threadsafe, even the timer on some machines */
258 static CmiNodeLock  timerLock = 0;
259 static double starttimer = 0;
260 static int _is_global = 0;
261
262 int CmiTimerIsSynchronized()
263 {
264   int  flag;
265   void *v;
266
267   /*  check if it using synchronized timer */
268   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
269     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
270   if (flag) {
271     _is_global = *(int*)v;
272     if (_is_global && CmiMyPe() == 0)
273       printf("Charm++> MPI timer is synchronized!\n");
274   }
275   return _is_global;
276 }
277
278 void CmiTimerInit()
279 {
280   _is_global = CmiTimerIsSynchronized();
281
282   if (_is_global) {
283     if (CmiMyRank() == 0) {
284       double minTimer;
285 #if CMK_TIMER_USE_XT3_DCLOCK
286       starttimer = dclock();
287 #else
288       starttimer = MPI_Wtime();
289 #endif
290
291       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
292                                   MPI_COMM_WORLD );
293       starttimer = minTimer;
294     }
295   }
296   else {  /* we don't have a synchronous timer, set our own start time */
297     CmiBarrier();
298     CmiBarrier();
299     CmiBarrier();
300 #if CMK_TIMER_USE_XT3_DCLOCK
301     starttimer = dclock();
302 #else
303     starttimer = MPI_Wtime();
304 #endif
305   }
306
307 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
308   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
309     timerLock = CmiCreateLock();
310 #endif
311   CmiNodeAllBarrier();          /* for smp */
312 }
313
314 /**
315  * Since the timerLock is never created, and is
316  * always NULL, then all the if-condition inside
317  * the timer functions could be disabled right
318  * now in the case of SMP. --Chao Mei
319  */
320 double CmiTimer(void)
321 {
322   double t;
323 #if 0 && CMK_SMP
324   if (timerLock) CmiLock(timerLock);
325 #endif
326 #if CMK_TIMER_USE_XT3_DCLOCK
327   t = dclock() - starttimer;
328 #else
329   t = MPI_Wtime() - starttimer;
330 #endif
331
332 #if 0 && CMK_SMP
333   if (timerLock) CmiUnlock(timerLock);
334 #endif
335
336   return t;
337 }
338
339 double CmiWallTimer(void)
340 {
341   double t;
342 #if 0 && CMK_SMP
343   if (timerLock) CmiLock(timerLock);
344 #endif
345 #if CMK_TIMER_USE_XT3_DCLOCK
346   t = dclock() - starttimer;
347 #else
348   t = MPI_Wtime() - starttimer;
349 #endif
350 #if 0 && CMK_SMP
351   if (timerLock) CmiUnlock(timerLock);
352 #endif
353   return t;
354 }
355
356 double CmiCpuTimer(void)
357 {
358   double t;
359 #if 0 && CMK_SMP
360   if (timerLock) CmiLock(timerLock);
361 #endif
362 #if CMK_TIMER_USE_XT3_DCLOCK
363   t = dclock() - starttimer;
364 #else
365   t = MPI_Wtime() - starttimer;
366 #endif
367 #if 0 && CMK_SMP
368   if (timerLock) CmiUnlock(timerLock);
369 #endif
370   return t;
371 }
372
373 #endif
374
375 /* must be called on all ranks including comm thread in SMP */
376 int CmiBarrier()
377 {
378 #if CMK_SMP
379     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
380   CmiNodeAllBarrier();
381   if (CmiMyRank() == CmiMyNodeSize()) 
382 #else
383   if (CmiMyRank() == 0) 
384 #endif
385   {
386 /**
387  *  The call of CmiBarrier is usually before the initialization
388  *  of trace module of Charm++, therefore, the START_EVENT
389  *  and END_EVENT are disabled here. -Chao Mei
390  */     
391     /*START_EVENT();*/
392
393     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
394         CmiAbort("Timernit: MPI_Barrier failed!\n");
395
396     /*END_EVENT(10);*/
397   }
398   CmiNodeAllBarrier();
399   return 0;
400 }
401
402 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
403 int CmiBarrierZero()
404 {
405   int i;
406 #if CMK_SMP
407   if (CmiMyRank() == CmiMyNodeSize()) 
408 #else
409   if (CmiMyRank() == 0) 
410 #endif
411   {
412     char msg[1];
413     MPI_Status sts;
414     if (CmiMyNode() == 0)  {
415       for (i=0; i<CmiNumNodes()-1; i++) {
416          START_EVENT();
417
418          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
419             CmiPrintf("MPI_Recv failed!\n");
420
421          END_EVENT(30);
422       }
423     }
424     else {
425       START_EVENT();
426
427       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
428          printf("MPI_Send failed!\n");
429
430       END_EVENT(20);
431     }
432   }
433   CmiNodeAllBarrier();
434   return 0;
435 }
436
437 typedef struct ProcState {
438 #if MULTI_SENDQUEUE
439 PCQueue      sendMsgBuf;       /* per processor message sending queue */
440 #endif
441 CmiNodeLock  recvLock;              /* for cs->recv */
442 } ProcState;
443
444 static ProcState  *procState;
445
446 #if CMK_SMP
447
448 #if !MULTI_SENDQUEUE
449 static PCQueue sendMsgBuf;
450 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
451 #endif
452
453 #endif
454
455 /************************************************************
456  *
457  * Processor state structure
458  *
459  ************************************************************/
460
461 /* fake Cmi_charmrun_fd */
462 static int Cmi_charmrun_fd = 0;
463 #include "machine-smp.c"
464
465 CsvDeclare(CmiNodeState, NodeState);
466
467 #include "immediate.c"
468
469 #if CMK_SHARED_VARS_UNAVAILABLE
470 /************ non SMP **************/
471 static struct CmiStateStruct Cmi_state;
472 int _Cmi_mype;
473 int _Cmi_myrank;
474
475 void CmiMemLock() {}
476 void CmiMemUnlock() {}
477
478 #define CmiGetState() (&Cmi_state)
479 #define CmiGetStateN(n) (&Cmi_state)
480
481 void CmiYield(void) { sleep(0); }
482
483 static void CmiStartThreads(char **argv)
484 {
485   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
486   _Cmi_mype = Cmi_nodestart;
487   _Cmi_myrank = 0;
488 }
489 #endif  /* non smp */
490
491 /*Add a message to this processor's receive queue, pe is a rank */
492 void CmiPushPE(int pe,void *msg)
493 {
494   CmiState cs = CmiGetStateN(pe);
495   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
496 #if CMK_IMMEDIATE_MSG
497   if (CmiIsImmediate(msg)) {
498 /*
499 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
500     CmiHandleMessage(msg);
501 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
502 */
503     /**(CmiUInt2 *)msg = pe;*/
504     CMI_DEST_RANK(msg) = pe;
505     CmiPushImmediateMsg(msg);
506     return;
507   }
508 #endif
509
510 #if CMK_SMP
511   CmiLock(procState[pe].recvLock);
512 #endif
513   PCQueuePush(cs->recv,msg);
514 #if CMK_SMP
515   CmiUnlock(procState[pe].recvLock);
516 #endif
517   CmiIdleLock_addMessage(&cs->idle);
518   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
519 }
520
521 #if CMK_NODE_QUEUE_AVAILABLE
522 /*Add a message to this processor's receive queue */
523 static void CmiPushNode(void *msg)
524 {
525   MACHSTATE(3,"Pushing message into NodeRecv queue");
526 #if CMK_IMMEDIATE_MSG
527   if (CmiIsImmediate(msg)) {
528     CMI_DEST_RANK(msg) = 0;
529     CmiPushImmediateMsg(msg);
530     return;
531   }
532 #endif
533   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
534   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
535   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
536   {
537   CmiState cs=CmiGetStateN(0);
538   CmiIdleLock_addMessage(&cs->idle);
539   }
540 }
541 #endif
542
543 #ifndef CmiMyPe
544 int CmiMyPe(void)
545 {
546   return CmiGetState()->pe;
547 }
548 #endif
549
550 #ifndef CmiMyRank
551 int CmiMyRank(void)
552 {
553   return CmiGetState()->rank;
554 }
555 #endif
556
557 #ifndef CmiNodeFirst
558 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
559 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
560 #endif
561
562 #ifndef CmiNodeOf
563 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
564 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
565 #endif
566
567 static size_t CmiAllAsyncMsgsSent(void)
568 {
569    SMSG_LIST *msg_tmp = sent_msgs;
570    MPI_Status sts;
571    int done;
572
573    while(msg_tmp!=0) {
574     done = 0;
575     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
576       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
577     if(!done)
578       return 0;
579     msg_tmp = msg_tmp->next;
580 /*    MsgQueueLen--; ????? */
581    }
582    return 1;
583 }
584
585 int CmiAsyncMsgSent(CmiCommHandle c) {
586
587   SMSG_LIST *msg_tmp = sent_msgs;
588   int done;
589   MPI_Status sts;
590
591   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
592     msg_tmp = msg_tmp->next;
593   if(msg_tmp) {
594     done = 0;
595     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
596       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
597     return ((done)?1:0);
598   } else {
599     return 1;
600   }
601 }
602
603 void CmiReleaseCommHandle(CmiCommHandle c)
604 {
605   return;
606 }
607
608 #if CMK_BLUEGENEL
609 extern void MPID_Progress_test();
610 #endif
611
612 void CmiReleaseSentMessages(void)
613 {
614   SMSG_LIST *msg_tmp=sent_msgs;
615   SMSG_LIST *prev=0;
616   SMSG_LIST *temp;
617   int done;
618   MPI_Status sts;
619
620 #if CMK_BLUEGENEL
621   MPID_Progress_test();
622 #endif
623
624   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
625   while(msg_tmp!=0) {
626     done =0;
627 #if CMK_SMP_TRACE_COMMTHREAD
628     double startT = CmiWallTimer();
629 #endif
630     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
631       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
632     if(done) {
633       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
634       MsgQueueLen--;
635       /* Release the message */
636       temp = msg_tmp->next;
637       if(prev==0)  /* first message */
638         sent_msgs = temp;
639       else
640         prev->next = temp;
641       CmiFree(msg_tmp->msg);
642       CmiFree(msg_tmp);
643       msg_tmp = temp;
644     } else {
645       prev = msg_tmp;
646       msg_tmp = msg_tmp->next;
647     }
648 #if CMK_SMP_TRACE_COMMTHREAD
649     traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, CmiWallTimer());
650 #endif
651   }
652   end_sent = prev;
653   MACHSTATE(2,"} CmiReleaseSentMessages end");
654 }
655
656 int PumpMsgs(void)
657 {
658   int nbytes, flg, res;
659   char *msg;
660   MPI_Status sts;
661   int recd=0;
662
663 #if CMI_EXERT_RECV_CAP
664   int recvCnt=0;
665 #endif
666         
667 #if CMK_BLUEGENEL
668   MPID_Progress_test();
669 #endif
670
671   MACHSTATE(2,"PumpMsgs begin {");
672
673         
674   while(1) {
675 #if CMI_EXERT_RECV_CAP
676         if(recvCnt==RECV_CAP) break;
677 #endif
678           
679     /* First check posted recvs then do  probe unmatched outstanding messages */
680 #if MPI_POST_RECV_COUNT > 0 
681     int completed_index=-1;
682     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
683         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
684     if(flg){
685         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
686             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
687
688         recd = 1;
689         msg = (char *) CmiAlloc(nbytes);
690         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
691         /* and repost the recv */
692
693         START_EVENT();
694
695         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
696             MPI_POST_RECV_SIZE,
697             MPI_BYTE,
698             MPI_ANY_SOURCE,
699             POST_RECV_TAG,
700             MPI_COMM_WORLD,
701             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
702                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
703
704         END_EVENT(50);
705
706         CpvAccess(Cmi_posted_recv_total)++;
707     }
708     else {
709         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
710         if(res != MPI_SUCCESS)
711         CmiAbort("MPI_Iprobe failed\n");
712         if(!flg) break;
713         recd = 1;
714         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
715         msg = (char *) CmiAlloc(nbytes);
716
717         START_EVENT();
718
719         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
720             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
721
722         END_EVENT(30);
723
724         CpvAccess(Cmi_unposted_recv_total)++;
725     }
726 #else
727     /* Original version */
728 #if CMK_SMP_TRACE_COMMTHREAD
729   double startT = CmiWallTimer(); 
730 #endif
731     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
732     if(res != MPI_SUCCESS)
733       CmiAbort("MPI_Iprobe failed\n");
734
735     if(!flg) break;
736 #if CMK_SMP_TRACE_COMMTHREAD
737     traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, CmiWallTimer());
738 #endif
739
740     recd = 1;
741     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
742     msg = (char *) CmiAlloc(nbytes);
743     
744     START_EVENT();
745
746     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
747       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
748
749     /*END_EVENT(30);*/
750
751 #endif
752
753 #if CMK_SMP_TRACE_COMMTHREAD
754         traceBeginCommOp(msg);
755         traceChangeLastTimestamp(CpvAccess(projTraceStart));
756         traceEndCommOp(msg);
757         #if CMI_MPI_TRACE_MOREDETAILED
758         char tmp[32];
759         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
760         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
761         #endif
762 #endif  
763         
764     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
765     CMI_CHECK_CHECKSUM(msg, nbytes);
766     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
767       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
768       CmiFree(msg);
769       CmiAbort("Abort!\n");
770       continue;
771     }
772         
773 #if CMK_BROADCAST_SPANNING_TREE
774     if (CMI_BROADCAST_ROOT(msg))
775       SendSpanningChildren(nbytes, msg);
776 #elif CMK_BROADCAST_HYPERCUBE
777     if (CMI_BROADCAST_ROOT(msg))
778       SendHypercube(nbytes, msg);
779 #endif
780         
781         /* In SMP mode, this push operation needs to be executed
782      * after forwarding broadcast messages. If it is executed
783      * earlier, then during the bcast msg forwarding period,    
784          * the msg could be already freed on the worker thread.
785          * As a result, the forwarded message could be wrong! 
786          * --Chao Mei
787          */
788 #if CMK_NODE_QUEUE_AVAILABLE
789     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
790       CmiPushNode(msg);
791     else
792 #endif
793         CmiPushPE(CMI_DEST_RANK(msg), msg);     
794         
795 #if CMI_EXERT_RECV_CAP
796         recvCnt++;
797 #endif  
798   }
799
800   
801 #if CMK_IMMEDIATE_MSG && !CMK_SMP
802   CmiHandleImmediate();
803 #endif
804   
805   MACHSTATE(2,"} PumpMsgs end ");
806   return recd;
807 }
808
809 /* blocking version */
810 static void PumpMsgsBlocking(void)
811 {
812   static int maxbytes = 20000000;
813   static char *buf = NULL;
814   int nbytes, flg;
815   MPI_Status sts;
816   char *msg;
817   int recd=0;
818
819   if (!PCQueueEmpty(CmiGetState()->recv)) return;
820   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
821   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
822   if (sent_msgs)  return;
823
824 #if 0
825   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
826 #endif
827
828   if (buf == NULL) {
829     buf = (char *) CmiAlloc(maxbytes);
830     _MEMCHECK(buf);
831   }
832
833
834 #if MPI_POST_RECV_COUNT > 0
835 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
836 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
837 #endif
838
839   START_EVENT();
840
841   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
842       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
843
844   /*END_EVENT(30);*/
845     
846    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
847    msg = (char *) CmiAlloc(nbytes);
848    memcpy(msg, buf, nbytes);
849
850 #if CMK_SMP_TRACE_COMMTHREAD
851         traceBeginCommOp(msg);
852         traceChangeLastTimestamp(CpvAccess(projTraceStart));
853         traceEndCommOp(msg);
854         #if CMI_MPI_TRACE_MOREDETAILED
855         char tmp[32];
856         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
857         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
858         #endif
859 #endif
860
861 #if CMK_BROADCAST_SPANNING_TREE
862    if (CMI_BROADCAST_ROOT(msg))
863       SendSpanningChildren(nbytes, msg);
864 #elif CMK_BROADCAST_HYPERCUBE
865    if (CMI_BROADCAST_ROOT(msg))
866       SendHypercube(nbytes, msg);
867 #endif
868   
869         /* In SMP mode, this push operation needs to be executed
870      * after forwarding broadcast messages. If it is executed
871      * earlier, then during the bcast msg forwarding period,    
872          * the msg could be already freed on the worker thread.
873          * As a result, the forwarded message could be wrong! 
874          * --Chao Mei
875          */  
876 #if CMK_NODE_QUEUE_AVAILABLE
877    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
878       CmiPushNode(msg);
879    else
880 #endif
881       CmiPushPE(CMI_DEST_RANK(msg), msg);
882 }
883
884 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
885
886 #if CMK_SMP
887
888 static int inexit = 0;
889 static CmiNodeLock  exitLock = 0;
890
891 static int MsgQueueEmpty()
892 {
893   int i;
894 #if MULTI_SENDQUEUE
895   for (i=0; i<_Cmi_mynodesize; i++)
896     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
897 #else
898   return PCQueueEmpty(sendMsgBuf);
899 #endif
900   return 1;
901 }
902
903 static int SendMsgBuf();
904
905 /* test if all processors recv queues are empty */
906 static int RecvQueueEmpty()
907 {
908   int i;
909   for (i=0; i<_Cmi_mynodesize; i++) {
910     CmiState cs=CmiGetStateN(i);
911     if (!PCQueueEmpty(cs->recv)) return 0;
912   }
913   return 1;
914 }
915
916 /**
917 CommunicationServer calls MPI to send messages in the queues and probe message from network.
918 */
919
920 #define REPORT_COMM_METRICS 0
921 #if REPORT_COMM_METRICS
922 static double pumptime = 0.0;
923 static double releasetime = 0.0;
924 static double sendtime = 0.0;
925 #endif
926
927 static void CommunicationServer(int sleepTime)
928 {
929   int static count=0;
930 /*
931   count ++;
932   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
933 */
934 #if REPORT_COMM_METRICS
935   double t1, t2, t3, t4;
936   t1 = CmiWallTimer();
937 #endif
938   PumpMsgs();
939 #if REPORT_COMM_METRICS
940   t2 = CmiWallTimer();
941 #endif
942   CmiReleaseSentMessages();
943 #if REPORT_COMM_METRICS
944   t3 = CmiWallTimer();
945 #endif
946   SendMsgBuf();
947 #if REPORT_COMM_METRICS
948   t4 = CmiWallTimer();
949   pumptime += (t2-t1);
950   releasetime += (t3-t2);
951   sendtime += (t4-t3);
952 #endif
953 /*
954   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
955 */
956   if (inexit == CmiMyNodeSize()) {
957     MACHSTATE(2, "CommunicationServer exiting {");
958 #if 0
959     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
960 #endif
961     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
962       CmiReleaseSentMessages();
963       SendMsgBuf();
964       PumpMsgs();
965     }
966     MACHSTATE(2, "CommunicationServer barrier begin {");
967
968     START_EVENT();
969
970     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
971       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
972
973     END_EVENT(10);
974
975     MACHSTATE(2, "} CommunicationServer barrier end");
976 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
977     if (CmiMyNode() == 0){
978       CmiPrintf("End of program\n");
979     }
980 #endif
981     MACHSTATE(2, "} CommunicationServer EXIT");
982
983     ConverseCommonExit();   
984 #if REPORT_COMM_METRICS
985     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
986 #endif
987
988 #if ! CMK_AUTOBUILD
989     signal(SIGINT, signal_int);
990     MPI_Finalize();
991     #endif
992     exit(0);
993   }
994 }
995
996 #endif
997
998 static void CommunicationServerThread(int sleepTime)
999 {
1000 #if CMK_SMP
1001   CommunicationServer(sleepTime);
1002 #endif
1003 #if CMK_IMMEDIATE_MSG
1004   CmiHandleImmediate();
1005 #endif
1006 }
1007
1008 #if CMK_NODE_QUEUE_AVAILABLE
1009 char *CmiGetNonLocalNodeQ(void)
1010 {
1011   CmiState cs = CmiGetState();
1012   char *result = 0;
1013   CmiIdleLock_checkMessage(&cs->idle);
1014 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1015     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1016     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1017     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1018     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1019     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1020 /*  }  */
1021
1022   return result;
1023 }
1024 #endif
1025
1026 void *CmiGetNonLocal(void)
1027 {
1028   static int count=0;
1029   CmiState cs = CmiGetState();
1030   void *msg;
1031
1032 #if ! CMK_SMP
1033   if (CmiNumPes() == 1) return NULL;
1034 #endif
1035
1036   CmiIdleLock_checkMessage(&cs->idle);
1037   /* although it seems that lock is not needed, I found it crashes very often
1038      on mpi-smp without lock */
1039
1040 #if ! CMK_SMP
1041   CmiReleaseSentMessages();
1042   PumpMsgs();
1043 #endif
1044
1045   /* CmiLock(procState[cs->rank].recvLock); */
1046   msg =  PCQueuePop(cs->recv);
1047   /* CmiUnlock(procState[cs->rank].recvLock); */
1048
1049 /*
1050   if (msg) {
1051     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1052   else {
1053     count++;
1054     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1055   }
1056 */
1057 #if ! CMK_SMP
1058   if (no_outstanding_sends) {
1059     while (MsgQueueLen>0) {
1060       CmiReleaseSentMessages();
1061       PumpMsgs();
1062     }
1063   }
1064
1065   if(!msg) {
1066     CmiReleaseSentMessages();
1067     if (PumpMsgs())
1068       return  PCQueuePop(cs->recv);
1069     else
1070       return 0;
1071   }
1072 #endif
1073
1074   return msg;
1075 }
1076
1077 /* called in non-smp mode */
1078 void CmiNotifyIdle(void)
1079 {
1080   CmiReleaseSentMessages();
1081   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1082 }
1083
1084
1085 /********************************************************
1086     The call to probe immediate messages has been renamed to
1087     CmiMachineProgressImpl
1088 ******************************************************/
1089 /* user call to handle immediate message, only useful in non SMP version
1090    using polling method to schedule message.
1091 */
1092 /*
1093 #if CMK_IMMEDIATE_MSG
1094 void CmiProbeImmediateMsg()
1095 {
1096 #if !CMK_SMP
1097   PumpMsgs();
1098   CmiHandleImmediate();
1099 #endif
1100 }
1101 #endif
1102 */
1103
1104 /* Network progress function is used to poll the network when for
1105    messages. This flushes receive buffers on some  implementations*/
1106 #if CMK_MACHINE_PROGRESS_DEFINED
1107 void CmiMachineProgressImpl()
1108 {
1109 #if !CMK_SMP
1110     PumpMsgs();
1111 #if CMK_IMMEDIATE_MSG
1112     CmiHandleImmediate();
1113 #endif
1114 #else
1115     /*Not implemented yet. Communication server does not seem to be
1116       thread safe, so only communication thread call it */
1117     if (CmiMyRank() == CmiMyNodeSize())
1118         CommunicationServerThread(0);
1119 #endif
1120 }
1121 #endif
1122
1123 /********************* MESSAGE SEND FUNCTIONS ******************/
1124
1125 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1126
1127 static void CmiSendSelf(char *msg)
1128 {
1129 #if CMK_IMMEDIATE_MSG
1130     if (CmiIsImmediate(msg)) {
1131       /* CmiBecomeNonImmediate(msg); */
1132       CmiPushImmediateMsg(msg);
1133       CmiHandleImmediate();
1134       return;
1135     }
1136 #endif
1137     CQdCreate(CpvAccess(cQdState), 1);
1138     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1139 }
1140
1141 void CmiSyncSendFn(int destPE, int size, char *msg)
1142 {
1143   CmiState cs = CmiGetState();
1144   char *dupmsg = (char *) CmiAlloc(size);
1145   memcpy(dupmsg, msg, size);
1146
1147   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1148
1149   if (cs->pe==destPE) {
1150     CmiSendSelf(dupmsg);
1151   }
1152   else
1153     CmiAsyncSendFn_(destPE, size, dupmsg);
1154 }
1155
1156 #if CMK_SMP
1157
1158 /* called by communication thread in SMP */
1159 static int SendMsgBuf()
1160 {
1161   SMSG_LIST *msg_tmp;
1162   char *msg;
1163   int node, rank, size;
1164   int i;
1165   int sent = 0;
1166
1167 #if CMI_EXERT_SEND_CAP
1168         int sentCnt = 0;
1169 #endif  
1170         
1171   MACHSTATE(2,"SendMsgBuf begin {");
1172 #if MULTI_SENDQUEUE
1173   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1174   {
1175     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1176     {
1177       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1178 #else
1179     /* single message sending queue */
1180     /* CmiLock(sendMsgBufLock); */
1181     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1182     /* CmiUnlock(sendMsgBufLock); */
1183     while (NULL != msg_tmp)
1184     {
1185 #endif
1186       node = msg_tmp->destpe;
1187       size = msg_tmp->size;
1188       msg = msg_tmp->msg;
1189       msg_tmp->next = 0;
1190       while (MsgQueueLen > request_max) {
1191         CmiReleaseSentMessages();
1192         PumpMsgs();
1193       }
1194       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1195       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1196       CMI_SET_CHECKSUM(msg, size);
1197
1198 #if MPI_POST_RECV_COUNT > 0
1199         if(size <= MPI_POST_RECV_SIZE){
1200
1201           START_EVENT();
1202           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1203                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1204
1205           STOP_EVENT(40);
1206         }
1207         else {
1208             START_EVENT();
1209             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1210                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1211             STOP_EVENT(40);
1212         }
1213 #else
1214         START_EVENT();
1215         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1216             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1217         /*END_EVENT(40);*/
1218 #endif
1219         
1220 #if CMK_SMP_TRACE_COMMTHREAD
1221         traceBeginCommOp(msg);
1222         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1223         /* traceSendMsgComm must execute after traceBeginCommOp because
1224          * we pretend we execute an entry method, and inside this we
1225          * pretend we will send another message. Otherwise how could
1226          * a message creation just before an entry method invocation?
1227          * If such logic is broken, the projections will not trace
1228          * messages correctly! -Chao Mei
1229          */
1230         traceSendMsgComm(msg);
1231         traceEndCommOp(msg);
1232         #if CMI_MPI_TRACE_MOREDETAILED
1233         char tmp[64];
1234         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1235         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1236         #endif
1237 #endif
1238                 
1239                 
1240       MACHSTATE(3,"}MPI_send end");
1241       MsgQueueLen++;
1242       if(sent_msgs==0)
1243         sent_msgs = msg_tmp;
1244       else
1245         end_sent->next = msg_tmp;
1246       end_sent = msg_tmp;
1247       sent=1;
1248           
1249 #if CMI_EXERT_SEND_CAP    
1250           if(++sentCnt == SEND_CAP) break;
1251 #endif    
1252           
1253 #if ! MULTI_SENDQUEUE
1254       /* CmiLock(sendMsgBufLock); */
1255       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1256       /* CmiUnlock(sendMsgBufLock); */
1257 #endif
1258     }
1259 #if MULTI_SENDQUEUE
1260   }
1261 #endif
1262   MACHSTATE(2,"}SendMsgBuf end ");
1263   return sent;
1264 }
1265
1266 void EnqueueMsg(void *m, int size, int node)
1267 {
1268   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1269   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1270   msg_tmp->msg = m;
1271   msg_tmp->size = size;
1272   msg_tmp->destpe = node;
1273         
1274 #if CMK_SMP_TRACE_COMMTHREAD
1275         msg_tmp->srcpe = CmiMyPe();
1276 #endif  
1277
1278 #if MULTI_SENDQUEUE
1279   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1280 #else
1281   CmiLock(sendMsgBufLock);
1282   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1283   CmiUnlock(sendMsgBufLock);
1284 #endif
1285         
1286   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1287 }
1288
1289 #endif
1290
1291 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1292 {
1293   CmiState cs = CmiGetState();
1294   SMSG_LIST *msg_tmp;
1295   CmiUInt2  rank, node;
1296
1297   if(destPE == cs->pe) {
1298     char *dupmsg = (char *) CmiAlloc(size);
1299     memcpy(dupmsg, msg, size);
1300     CmiSendSelf(dupmsg);
1301     return 0;
1302   }
1303   CQdCreate(CpvAccess(cQdState), 1);
1304 #if CMK_SMP
1305   node = CmiNodeOf(destPE);
1306   rank = CmiRankOf(destPE);
1307   if (node == CmiMyNode())  {
1308     CmiPushPE(rank, msg);
1309     return 0;
1310   }
1311   CMI_DEST_RANK(msg) = rank;
1312   EnqueueMsg(msg, size, node);
1313   return 0;
1314 #else
1315   /* non smp */
1316   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1317   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1318   msg_tmp->msg = msg;
1319   msg_tmp->next = 0;
1320   while (MsgQueueLen > request_max) {
1321         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1322         CmiReleaseSentMessages();
1323         PumpMsgs();
1324   }
1325   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1326   CMI_SET_CHECKSUM(msg, size);
1327
1328 #if MPI_POST_RECV_COUNT > 0
1329         if(size <= MPI_POST_RECV_SIZE){
1330
1331           START_EVENT();
1332           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1333                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1334           END_EVENT(40);
1335         }
1336         else {
1337           START_EVENT();
1338           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1339                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1340           END_EVENT(40);
1341         }
1342 #else
1343   START_EVENT();
1344   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1345     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1346   END_EVENT(40);
1347 #endif
1348
1349   MsgQueueLen++;
1350   if(sent_msgs==0)
1351     sent_msgs = msg_tmp;
1352   else
1353     end_sent->next = msg_tmp;
1354   end_sent = msg_tmp;
1355   return (CmiCommHandle) &(msg_tmp->req);
1356 #endif              /* non-smp */
1357 }
1358
1359 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1360 {
1361   CMI_SET_BROADCAST_ROOT(msg, 0);
1362   CmiAsyncSendFn_(destPE, size, msg);
1363 }
1364
1365 void CmiFreeSendFn(int destPE, int size, char *msg)
1366 {
1367   CmiState cs = CmiGetState();
1368   CMI_SET_BROADCAST_ROOT(msg, 0);
1369
1370   if (cs->pe==destPE) {
1371     CmiSendSelf(msg);
1372   } else {
1373     CmiAsyncSendFn_(destPE, size, msg);
1374   }
1375 }
1376
1377 /*********************** BROADCAST FUNCTIONS **********************/
1378
1379 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1380 void CmiSyncSendFn1(int destPE, int size, char *msg)
1381 {
1382   CmiState cs = CmiGetState();
1383   char *dupmsg = (char *) CmiAlloc(size);
1384   memcpy(dupmsg, msg, size);
1385   if (cs->pe==destPE)
1386     CmiSendSelf(dupmsg);
1387   else
1388     CmiAsyncSendFn_(destPE, size, dupmsg);
1389 }
1390
1391 /* send msg to its spanning children in broadcast. G. Zheng */
1392 void SendSpanningChildren(int size, char *msg)
1393 {
1394   CmiState cs = CmiGetState();
1395   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1396   int startnode = CmiNodeOf(startpe);
1397   int i, exceptRank;
1398         
1399    /* first send msgs to other nodes */
1400   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1401   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1402     int nd = CmiMyNode()-startnode;
1403     if (nd<0) nd+=CmiNumNodes();
1404     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1405     if (nd > CmiNumNodes() - 1) break;
1406     nd += startnode;
1407     nd = nd%CmiNumNodes();
1408     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1409         #if CMK_SMP
1410         /* always send to the first rank of other nodes */
1411         char *newmsg = CmiCopyMsg(msg, size);
1412         CMI_DEST_RANK(newmsg) = 0;
1413     EnqueueMsg(newmsg, size, nd);
1414         #else
1415         CmiSyncSendFn1(nd, size, msg);
1416         #endif
1417   }
1418 #if CMK_SMP  
1419    /* second send msgs to my peers on this node */
1420   /* FIXME: now it's just a flat p2p send!! When node size is large,
1421    * it should also be sent in a tree
1422    */
1423    exceptRank = CMI_DEST_RANK(msg);
1424    for(i=0; i<exceptRank; i++){
1425            CmiPushPE(i, CmiCopyMsg(msg, size));
1426    }
1427    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1428            CmiPushPE(i, CmiCopyMsg(msg, size));
1429    }
1430 #endif
1431 }
1432
1433 #include <math.h>
1434
1435 /* send msg along the hypercube in broadcast. (Sameer) */
1436 void SendHypercube(int size, char *msg)
1437 {
1438   CmiState cs = CmiGetState();
1439   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1440   int startnode = CmiNodeOf(startpe);
1441   int i, exceptRank, cnt, tmp, relPE;
1442   int dims=0;
1443
1444   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1445   tmp = CmiNumNodes()-1;
1446   while(tmp>0){
1447           dims++;
1448           tmp = tmp >> 1;
1449   }
1450   if(CmiNumNodes()==1) dims=1;
1451   
1452    /* first send msgs to other nodes */  
1453   relPE = CmiMyNode()-startnode;
1454   if(relPE < 0) relPE += CmiNumNodes();
1455   cnt=0;
1456   tmp = relPE;
1457   /* count how many zeros (in binary format) relPE has */
1458   for(i=0; i<dims; i++, cnt++){
1459     if(tmp & 1 == 1) break;
1460     tmp = tmp >> 1;
1461   }
1462   
1463   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1464   for (i = cnt-1; i >= 0; i--) {
1465     int nd = relPE + (1 << i);
1466         if(nd >= CmiNumNodes()) continue;
1467         nd = (nd+startnode)%CmiNumNodes();
1468         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1469 #if CMK_SMP
1470     /* always send to the first rank of other nodes */
1471     char *newmsg = CmiCopyMsg(msg, size);
1472     CMI_DEST_RANK(newmsg) = 0;
1473     EnqueueMsg(newmsg, size, nd);
1474 #else
1475         CmiSyncSendFn1(nd, size, msg);
1476 #endif
1477   }
1478   
1479 #if CMK_SMP
1480    /* second send msgs to my peers on this node */
1481    /* FIXME: now it's just a flat p2p send!! When node size is large,
1482     * it should also be sent in a tree
1483     */
1484    exceptRank = CMI_DEST_RANK(msg);
1485    for(i=0; i<exceptRank; i++){
1486            CmiPushPE(i, CmiCopyMsg(msg, size));
1487    }
1488    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1489            CmiPushPE(i, CmiCopyMsg(msg, size));
1490    }
1491 #endif
1492 }
1493
1494 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1495 {
1496   CmiState cs = CmiGetState();
1497
1498 #if CMK_SMP     
1499   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1500   CMI_DEST_RANK(msg) = CmiMyRank();
1501 #endif
1502         
1503 #if CMK_BROADCAST_SPANNING_TREE
1504   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1505   SendSpanningChildren(size, msg);
1506
1507 #elif CMK_BROADCAST_HYPERCUBE
1508   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1509   SendHypercube(size, msg);
1510
1511 #else
1512   int i;
1513
1514   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1515     CmiSyncSendFn(i, size,msg) ;
1516   for ( i=0; i<cs->pe; i++ )
1517     CmiSyncSendFn(i, size,msg) ;
1518 #endif
1519
1520   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1521 }
1522
1523
1524 /*  FIXME: luckily async is never used  G. Zheng */
1525 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1526 {
1527   CmiState cs = CmiGetState();
1528   int i ;
1529
1530   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1531     CmiAsyncSendFn(i,size,msg) ;
1532   for ( i=0; i<cs->pe; i++ )
1533     CmiAsyncSendFn(i,size,msg) ;
1534
1535   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1536   CmiAbort("CmiAsyncBroadcastFn should never be called");
1537   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1538 }
1539
1540 void CmiFreeBroadcastFn(int size, char *msg)
1541 {
1542    CmiSyncBroadcastFn(size,msg);
1543    CmiFree(msg);
1544 }
1545
1546 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1547 {
1548
1549 #if CMK_SMP     
1550   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1551   CMI_DEST_RANK(msg) = CmiMyRank();
1552 #endif
1553
1554 #if CMK_BROADCAST_SPANNING_TREE
1555   CmiState cs = CmiGetState();
1556   CmiSyncSendFn(cs->pe, size,msg) ;
1557   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1558   SendSpanningChildren(size, msg);
1559
1560 #elif CMK_BROADCAST_HYPERCUBE
1561   CmiState cs = CmiGetState();
1562   CmiSyncSendFn(cs->pe, size,msg) ;
1563   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1564   SendHypercube(size, msg);
1565
1566 #else
1567     int i ;
1568
1569   for ( i=0; i<_Cmi_numpes; i++ )
1570     CmiSyncSendFn(i,size,msg) ;
1571 #endif
1572
1573   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1574 }
1575
1576 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1577 {
1578   int i ;
1579
1580   for ( i=1; i<_Cmi_numpes; i++ )
1581     CmiAsyncSendFn(i,size,msg) ;
1582
1583   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1584
1585   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1586 }
1587
1588 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1589 {
1590 #if CMK_SMP     
1591   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1592   CMI_DEST_RANK(msg) = CmiMyRank();
1593 #endif
1594
1595 #if CMK_BROADCAST_SPANNING_TREE
1596   CmiState cs = CmiGetState();
1597   CmiSyncSendFn(cs->pe, size,msg) ;
1598   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1599   SendSpanningChildren(size, msg);
1600
1601 #elif CMK_BROADCAST_HYPERCUBE
1602   CmiState cs = CmiGetState();
1603   CmiSyncSendFn(cs->pe, size,msg) ;
1604   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1605   SendHypercube(size, msg);
1606
1607 #else
1608   int i ;
1609
1610   for ( i=0; i<_Cmi_numpes; i++ )
1611     CmiSyncSendFn(i,size,msg) ;
1612 #endif
1613   CmiFree(msg) ;
1614   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1615 }
1616
1617 #if CMK_NODE_QUEUE_AVAILABLE
1618
1619 static void CmiSendNodeSelf(char *msg)
1620 {
1621 #if CMK_IMMEDIATE_MSG
1622 #if 0
1623     if (CmiIsImmediate(msg) && !_immRunning) {
1624       /*CmiHandleImmediateMessage(msg); */
1625       CmiPushImmediateMsg(msg);
1626       CmiHandleImmediate();
1627       return;
1628     }
1629 #endif
1630     if (CmiIsImmediate(msg))
1631     {
1632       CmiPushImmediateMsg(msg);
1633       if (!_immRunning) CmiHandleImmediate();
1634       return;
1635     }
1636 #endif
1637     CQdCreate(CpvAccess(cQdState), 1);
1638     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1639     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1640     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1641 }
1642
1643 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1644 {
1645   int i;
1646   SMSG_LIST *msg_tmp;
1647   char *dupmsg;
1648
1649   CMI_SET_BROADCAST_ROOT(msg, 0);
1650   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1651   switch (dstNode) {
1652   case NODE_BROADCAST_ALL:
1653     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1654   case NODE_BROADCAST_OTHERS:
1655     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1656     for (i=0; i<_Cmi_numnodes; i++)
1657       if (i!=_Cmi_mynode) {
1658         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1659       }
1660     break;
1661   default:
1662     dupmsg = (char *)CmiCopyMsg(msg,size);
1663     if(dstNode == _Cmi_mynode) {
1664       CmiSendNodeSelf(dupmsg);
1665     }
1666     else {
1667       CQdCreate(CpvAccess(cQdState), 1);
1668       EnqueueMsg(dupmsg, size, dstNode);
1669     }
1670   }
1671   return 0;
1672 }
1673
1674 void CmiSyncNodeSendFn(int p, int s, char *m)
1675 {
1676   CmiAsyncNodeSendFn(p, s, m);
1677 }
1678
1679 /* need */
1680 void CmiFreeNodeSendFn(int p, int s, char *m)
1681 {
1682   CmiAsyncNodeSendFn(p, s, m);
1683   CmiFree(m);
1684 }
1685
1686 /* need */
1687 void CmiSyncNodeBroadcastFn(int s, char *m)
1688 {
1689   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1690 }
1691
1692 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1693 {
1694 }
1695
1696 /* need */
1697 void CmiFreeNodeBroadcastFn(int s, char *m)
1698 {
1699   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1700   CmiFree(m);
1701 }
1702
1703 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1704 {
1705   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1706 }
1707
1708 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1709 {
1710   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1711 }
1712
1713 /* need */
1714 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1715 {
1716   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1717   CmiFree(m);
1718 }
1719 #endif
1720
1721 /************************** MAIN ***********************************/
1722 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1723
1724 void ConverseExit(void)
1725 {
1726 #if ! CMK_SMP
1727   while(!CmiAllAsyncMsgsSent()) {
1728     PumpMsgs();
1729     CmiReleaseSentMessages();
1730   }
1731   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1732     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1733
1734   ConverseCommonExit();
1735 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1736   if (CmiMyPe() == 0){
1737     CmiPrintf("End of program\n");
1738 #if MPI_POST_RECV_COUNT > 0
1739     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1740 #endif
1741 }
1742 #endif
1743 #if ! CMK_AUTOBUILD
1744   signal(SIGINT, signal_int);
1745   MPI_Finalize();
1746 #endif
1747   exit(0);
1748
1749 #else
1750     /* SMP version, communication thread will exit */
1751   ConverseCommonExit();
1752   /* atomic increment */
1753   CmiLock(exitLock);
1754   inexit++;
1755   CmiUnlock(exitLock);
1756   while (1) CmiYield();
1757 #endif
1758 }
1759
1760 static void registerMPITraceEvents() {
1761 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1762     traceRegisterUserEvent("MPI_Barrier", 10);
1763     traceRegisterUserEvent("MPI_Send", 20);
1764     traceRegisterUserEvent("MPI_Recv", 30);
1765     traceRegisterUserEvent("MPI_Isend", 40);
1766     traceRegisterUserEvent("MPI_Irecv", 50);
1767     traceRegisterUserEvent("MPI_Test", 60);
1768     traceRegisterUserEvent("MPI_Iprobe", 70);
1769 #endif
1770 }
1771
1772
1773 static char     **Cmi_argv;
1774 static char     **Cmi_argvcopy;
1775 static CmiStartFn Cmi_startfn;   /* The start function */
1776 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1777
1778 typedef struct {
1779   int sleepMs; /*Milliseconds to sleep while idle*/
1780   int nIdles; /*Number of times we've been idle in a row*/
1781   CmiState cs; /*Machine state*/
1782 } CmiIdleState;
1783
1784 static CmiIdleState *CmiNotifyGetState(void)
1785 {
1786   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1787   s->sleepMs=0;
1788   s->nIdles=0;
1789   s->cs=CmiGetState();
1790   return s;
1791 }
1792
1793 static void CmiNotifyBeginIdle(CmiIdleState *s)
1794 {
1795   s->sleepMs=0;
1796   s->nIdles=0;
1797 }
1798
1799 static void CmiNotifyStillIdle(CmiIdleState *s)
1800 {
1801 #if ! CMK_SMP
1802   CmiReleaseSentMessages();
1803   PumpMsgs();
1804 #else
1805 /*  CmiYield();  */
1806 #endif
1807
1808 #if 1
1809   {
1810   int nSpins=20; /*Number of times to spin before sleeping*/
1811   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1812   s->nIdles++;
1813   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1814     s->sleepMs+=2;
1815     if (s->sleepMs>10) s->sleepMs=10;
1816   }
1817   /*Comm. thread will listen on sockets-- just sleep*/
1818   if (s->sleepMs>0) {
1819     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1820     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1821     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1822   }
1823   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1824   }
1825 #endif
1826 }
1827
1828 #if MACHINE_DEBUG_LOG
1829 FILE *debugLog = NULL;
1830 #endif
1831
1832 static int machine_exit_idx;
1833 static void machine_exit(char *m) {
1834   EmergencyExit();
1835   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1836   fflush(stdout);
1837   CmiNodeBarrier();
1838   if (CmiMyRank() == 0) {
1839     MPI_Barrier(MPI_COMM_WORLD);
1840     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1841     MPI_Abort(MPI_COMM_WORLD, 1);
1842   } else {
1843     while (1) CmiYield();
1844   }
1845 }
1846
1847 static void KillOnAllSigs(int sigNo) {
1848   static int already_in_signal_handler = 0;
1849   char *m;
1850   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1851   already_in_signal_handler = 1;
1852 #if CMK_CCS_AVAILABLE
1853   if (CpvAccess(cmiArgDebugFlag)) {
1854     CpdNotify(CPD_SIGNAL, sigNo);
1855     CpdFreeze();
1856   }
1857 #endif
1858   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1859       "Signal: %d\n",CmiMyPe(),sigNo);
1860   CmiPrintStackTrace(1);
1861
1862   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1863   CmiSetHandler(m, machine_exit_idx);
1864   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1865   machine_exit(m);
1866 }
1867
1868 static void ConverseRunPE(int everReturn)
1869 {
1870   CmiIdleState *s=CmiNotifyGetState();
1871   CmiState cs;
1872   char** CmiMyArgv;
1873
1874   CmiNodeAllBarrier();
1875
1876   cs = CmiGetState();
1877   CpvInitialize(void *,CmiLocalQueue);
1878   CpvAccess(CmiLocalQueue) = cs->localqueue;
1879
1880   if (CmiMyRank())
1881     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1882   else
1883     CmiMyArgv=Cmi_argv;
1884
1885   CthInit(CmiMyArgv);
1886
1887   ConverseCommonInit(CmiMyArgv);
1888   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1889
1890 #if CMI_MPI_TRACE_USEREVENTS && !CMK_OPTIMIZE && !CMK_TRACE_IN_CHARM
1891   CpvInitialize(double, projTraceStart);
1892   /* only PE 0 needs to care about registration (to generate sts file). */
1893   if (CmiMyPe() == 0) {
1894     registerMachineUserEventsFunction(&registerMPITraceEvents);
1895   }
1896 #endif
1897
1898   /* initialize the network progress counter*/
1899   /* Network progress function is used to poll the network when for
1900      messages. This flushes receive buffers on some  implementations*/
1901   CpvInitialize(int , networkProgressCount);
1902   CpvAccess(networkProgressCount) = 0;
1903
1904 #if CMK_SMP
1905   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1906   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1907 #else
1908   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1909 #endif
1910
1911 #if MACHINE_DEBUG_LOG
1912   if (CmiMyRank() == 0) {
1913     char ln[200];
1914     sprintf(ln,"debugLog.%d",CmiMyNode());
1915     debugLog=fopen(ln,"w");
1916   }
1917 #endif
1918
1919   /* Converse initialization finishes, immediate messages can be processed.
1920      node barrier previously should take care of the node synchronization */
1921   _immediateReady = 1;
1922
1923   /* communication thread */
1924   if (CmiMyRank() == CmiMyNodeSize()) {
1925     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1926     while (1) CommunicationServerThread(5);
1927   }
1928   else {  /* worker thread */
1929   if (!everReturn) {
1930     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1931     if (Cmi_usrsched==0) CsdScheduler(-1);
1932     ConverseExit();
1933   }
1934   }
1935 }
1936
1937 static char *thread_level_tostring(int thread_level)
1938 {
1939 #if CMK_MPI_INIT_THREAD
1940   switch (thread_level) {
1941   case MPI_THREAD_SINGLE:
1942       return "MPI_THREAD_SINGLE";
1943   case MPI_THREAD_FUNNELED:
1944       return "MPI_THREAD_FUNNELED";
1945   case MPI_THREAD_SERIALIZED:
1946       return "MPI_THREAD_SERIALIZED";
1947   case MPI_THREAD_MULTIPLE :
1948       return "MPI_THREAD_MULTIPLE ";
1949   default: {
1950       char *str = (char*)malloc(5);
1951       sprintf(str,"%d", thread_level);
1952       return str;
1953       }
1954   }
1955   return  "unknown";
1956 #else
1957   char *str = (char*)malloc(5);
1958   sprintf(str,"%d", thread_level);
1959   return str;
1960 #endif
1961 }
1962
1963 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1964 {
1965   int n,i;
1966   int ver, subver;
1967   int provided;
1968   int thread_level;
1969
1970 #if MACHINE_DEBUG
1971   debugLog=NULL;
1972 #endif
1973 #if CMK_USE_HP_MAIN_FIX
1974 #if FOR_CPLUS
1975   _main(argc,argv);
1976 #endif
1977 #endif
1978
1979 #if CMK_MPI_INIT_THREAD
1980 #if CMK_SMP
1981   thread_level = MPI_THREAD_FUNNELED;
1982 #else
1983   thread_level = MPI_THREAD_SINGLE;
1984 #endif
1985   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1986   _thread_provided = provided;
1987 #else
1988   MPI_Init(&argc, &argv);
1989   thread_level = 0;
1990   provided = -1;
1991 #endif
1992   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1993   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1994
1995   MPI_Get_version(&ver, &subver);
1996   if (_Cmi_mynode == 0) {
1997     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1998   }
1999
2000   /* processor per node */
2001   _Cmi_mynodesize = 1;
2002   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2003     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2004 #if ! CMK_SMP
2005   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2006     CmiAbort("+ppn cannot be used in non SMP version!\n");
2007 #endif
2008   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2009   if (idleblock && _Cmi_mynode == 0) {
2010     printf("Charm++: Running in idle blocking mode.\n");
2011   }
2012
2013   /* setup signal handlers */
2014   signal(SIGSEGV, KillOnAllSigs);
2015   signal(SIGFPE, KillOnAllSigs);
2016   signal(SIGILL, KillOnAllSigs);
2017   signal_int = signal(SIGINT, KillOnAllSigs);
2018   signal(SIGTERM, KillOnAllSigs);
2019   signal(SIGABRT, KillOnAllSigs);
2020 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2021   signal(SIGQUIT, KillOnAllSigs);
2022   signal(SIGBUS, KillOnAllSigs);
2023 /*#     if CMK_HANDLE_SIGUSR
2024   signal(SIGUSR1, HandleUserSignals);
2025   signal(SIGUSR2, HandleUserSignals);
2026 #     endif*/
2027 #   endif /*UNIX*/
2028   
2029 #if CMK_NO_OUTSTANDING_SENDS
2030   no_outstanding_sends=1;
2031 #endif
2032   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2033     no_outstanding_sends = 1;
2034     if (_Cmi_mynode == 0)
2035       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2036         no_outstanding_sends?"":" not");
2037   }
2038   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2039   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2040   Cmi_argvcopy = CmiCopyArgs(argv);
2041   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2042   /* find dim = log2(numpes), to pretend we are a hypercube */
2043   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2044     Cmi_dim++ ;
2045  /* CmiSpanTreeInit();*/
2046   request_max=MAX_QLEN;
2047   CmiGetArgInt(argv,"+requestmax",&request_max);
2048   /*printf("request max=%d\n", request_max);*/
2049
2050   /* checksum flag */
2051   if (CmiGetArgFlag(argv,"+checksum")) {
2052 #if !CMK_OPTIMIZE
2053     checksum_flag = 1;
2054     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2055 #else
2056     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2057 #endif
2058   }
2059
2060   {
2061   int debug = CmiGetArgFlag(argv,"++debug");
2062   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2063   if (debug || debug_no_pause)
2064   {   /*Pause so user has a chance to start and attach debugger*/
2065 #if CMK_HAS_GETPID
2066     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2067     fflush(stdout);
2068     if (!debug_no_pause)
2069       sleep(15);
2070 #else
2071     printf("++debug ignored.\n");
2072 #endif
2073   }
2074   }
2075
2076 #if MPI_POST_RECV_COUNT > 0
2077
2078   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2079   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2080   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2081   CpvInitialize(char*,CmiPostedRecvBuffers);
2082
2083     /* Post some extra recvs to help out with incoming messages */
2084     /* On some MPIs the messages are unexpected and thus slow */
2085
2086     /* An array of request handles for posted recvs */
2087     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2088
2089     /* An array of buffers for posted recvs */
2090     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2091
2092     /* Post Recvs */
2093     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2094         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2095                     MPI_POST_RECV_SIZE,
2096                     MPI_BYTE,
2097                     MPI_ANY_SOURCE,
2098                     POST_RECV_TAG,
2099                     MPI_COMM_WORLD,
2100                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2101           CmiAbort("MPI_Irecv failed\n");
2102     }
2103
2104 #endif
2105
2106
2107
2108   /* CmiTimerInit(); */
2109
2110 #if 0
2111   CthInit(argv);
2112   ConverseCommonInit(argv);
2113
2114   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2115   if (initret==0) {
2116     fn(CmiGetArgc(argv), argv);
2117     if (usched==0) CsdScheduler(-1);
2118     ConverseExit();
2119   }
2120 #endif
2121
2122   CsvInitialize(CmiNodeState, NodeState);
2123   CmiNodeStateInit(&CsvAccess(NodeState));
2124
2125   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2126
2127   for (i=0; i<_Cmi_mynodesize+1; i++) {
2128 #if MULTI_SENDQUEUE
2129     procState[i].sendMsgBuf = PCQueueCreate();
2130 #endif
2131     procState[i].recvLock = CmiCreateLock();
2132   }
2133 #if CMK_SMP
2134 #if !MULTI_SENDQUEUE
2135   sendMsgBuf = PCQueueCreate();
2136   sendMsgBufLock = CmiCreateLock();
2137 #endif
2138   exitLock = CmiCreateLock();            /* exit count lock */
2139 #endif
2140
2141   /* Network progress function is used to poll the network when for
2142      messages. This flushes receive buffers on some  implementations*/
2143   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2144   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2145
2146   CmiStartThreads(argv);
2147   ConverseRunPE(initret);
2148 }
2149
2150 /***********************************************************************
2151  *
2152  * Abort function:
2153  *
2154  ************************************************************************/
2155
2156 void CmiAbort(const char *message)
2157 {
2158   char *m;
2159   /* if CharmDebug is attached simply try to send a message to it */
2160 #if CMK_CCS_AVAILABLE
2161   if (CpvAccess(cmiArgDebugFlag)) {
2162     CpdNotify(CPD_ABORT, message);
2163     CpdFreeze();
2164   }
2165 #endif  
2166   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2167         "Reason: %s\n",CmiMyPe(),message);
2168  /*  CmiError(message); */
2169   CmiPrintStackTrace(0);
2170   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2171   CmiSetHandler(m, machine_exit_idx);
2172   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2173   machine_exit(m);
2174   /* Program never reaches here */
2175   MPI_Abort(MPI_COMM_WORLD, 1);
2176 }
2177
2178
2179 #if 0
2180
2181 /* ****************************************************************** */
2182 /*    The following internal functions implement recd msg queue       */
2183 /* ****************************************************************** */
2184
2185 static void ** AllocBlock(unsigned int len)
2186 {
2187   void ** blk;
2188
2189   blk=(void **)CmiAlloc(len*sizeof(void *));
2190   if(blk==(void **)0) {
2191     CmiError("Cannot Allocate Memory!\n");
2192     MPI_Abort(MPI_COMM_WORLD, 1);
2193   }
2194   return blk;
2195 }
2196
2197 static void
2198 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2199 {
2200   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2201   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2202 }
2203
2204 void recdQueueInit(void)
2205 {
2206   recdQueue_blk = AllocBlock(BLK_LEN);
2207   recdQueue_blk_len = BLK_LEN;
2208   recdQueue_first = 0;
2209   recdQueue_len = 0;
2210 }
2211
2212 void recdQueueAddToBack(void *element)
2213 {
2214 #if NODE_0_IS_CONVHOST
2215   inside_comm = 1;
2216 #endif
2217   if(recdQueue_len==recdQueue_blk_len) {
2218     void **blk;
2219     recdQueue_blk_len *= 3;
2220     blk = AllocBlock(recdQueue_blk_len);
2221     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2222     CmiFree(recdQueue_blk);
2223     recdQueue_blk = blk;
2224     recdQueue_first = 0;
2225   }
2226   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2227 #if NODE_0_IS_CONVHOST
2228   inside_comm = 0;
2229 #endif
2230 }
2231
2232
2233 void * recdQueueRemoveFromFront(void)
2234 {
2235   if(recdQueue_len) {
2236     void *element;
2237     element = recdQueue_blk[recdQueue_first++];
2238     recdQueue_first %= recdQueue_blk_len;
2239     recdQueue_len--;
2240     return element;
2241   }
2242   return 0;
2243 }
2244
2245 #endif
2246
2247 /*@}*/