Re-organized the code segments. In particular, the timer & barrier functions are...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #define CMI_DYNAMIC_EXERT_CAP 0
51 /* This macro defines the max number of msgs in the sender msg buffer 
52  * that is allowed for recving operation to continue
53  */
54 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
55 #define CMI_DYNAMIC_MAXCAPSIZE 1000
56 #define CMI_DYNAMIC_SEND_CAPSIZE 4
57 #define CMI_DYNAMIC_RECV_CAPSIZE 3
58 /* initial values, -1 indiates there's no cap */
59 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
60 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
61
62 #if CMI_EXERT_SEND_CAP
63 #define SEND_CAP 3
64 #endif
65
66 #if CMI_EXERT_RECV_CAP
67 #define RECV_CAP 2
68 #endif
69
70
71 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
72 #define CMI_MPI_TRACE_MOREDETAILED 0
73 #undef CMI_MPI_TRACE_USEREVENTS
74 #define CMI_MPI_TRACE_USEREVENTS 1
75 #else
76 #undef CMK_SMP_TRACE_COMMTHREAD
77 #define CMK_SMP_TRACE_COMMTHREAD 0
78 #endif
79
80 #define CMK_TRACE_COMMOVERHEAD 0
81 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_TRACE_COMMOVERHEAD
86 #define CMK_TRACE_COMMOVERHEAD 0
87 #endif
88
89 #include "machine.h"
90
91 #include "pcqueue.h"
92
93 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
94
95 #if CMK_BLUEGENEL
96 #define MAX_QLEN 8
97 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
98 #else
99 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
100 #define MAX_QLEN 200
101 #endif
102
103 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
104 CpvStaticDeclare(double, projTraceStart);
105 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
106 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
107 #else
108 # define  START_EVENT()
109 # define  END_EVENT(x)
110 #endif
111
112 /*
113     To reduce the buffer used in broadcast and distribute the load from
114   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
115   spanning tree broadcast algorithm.
116     This will use the fourth short in message as an indicator of spanning tree
117   root.
118 */
119 #define CMK_BROADCAST_SPANNING_TREE    1
120 #define CMK_BROADCAST_HYPERCUBE        0
121
122 #define BROADCAST_SPANNING_FACTOR      4
123 /* The number of children used when a msg is broadcast inside a node */
124 #define BROADCAST_SPANNING_INTRA_FACTOR      8
125
126 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
127 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
128
129 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
130 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
131
132 /* FIXME: need a random number that everyone agrees ! */
133 #define CHARM_MAGIC_NUMBER               126
134
135 #if CMK_ERROR_CHECKING
136 static int checksum_flag = 0;
137 #define CMI_SET_CHECKSUM(msg, len)      \
138         if (checksum_flag)  {   \
139           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
140           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
141         }
142 #define CMI_CHECK_CHECKSUM(msg, len)    \
143         if (checksum_flag)      \
144           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
145             CmiAbort("Fatal error: checksum doesn't agree!\n");
146 #else
147 #define CMI_SET_CHECKSUM(msg, len)
148 #define CMI_CHECK_CHECKSUM(msg, len)
149 #endif
150
151 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
152 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
153 #else
154 #define CMI_SET_BROADCAST_ROOT(msg, root) 
155 #endif
156
157
158 /** 
159     If MPI_POST_RECV is defined, we provide default values for size 
160     and number of posted recieves. If MPI_POST_RECV_COUNT is set
161     then a default value for MPI_POST_RECV_SIZE is used if not specified
162     by the user.
163 */
164
165 #define MPI_POST_RECV 0
166 #if MPI_POST_RECV
167 #define MPI_POST_RECV_COUNT 10
168 #undef MPI_POST_RECV
169 #endif
170 #if MPI_POST_RECV_COUNT > 0
171 #ifndef MPI_POST_RECV_SIZE
172 #define MPI_POST_RECV_LOWERSIZE 2000
173 #define MPI_POST_RECV_UPPERSIZE 4000
174 #define MPI_POST_RECV_SIZE MPI_POST_RECV_UPPERSIZE
175 #endif
176 /* #undef  MPI_POST_RECV_DEBUG  */
177 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
178 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
179 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
180 CpvDeclare(char*,CmiPostedRecvBuffers);
181 #endif
182
183 /*
184  to avoid MPI's in order delivery, changing MPI Tag all the time
185 */
186 #define TAG     1375
187
188 #if MPI_POST_RECV_COUNT > 0
189 #define POST_RECV_TAG (TAG+1)
190 #define BARRIER_ZERO_TAG TAG
191 #else
192 #define BARRIER_ZERO_TAG     1375
193 #endif
194
195 #define BLK_LEN  512
196
197 #if CMK_NODE_QUEUE_AVAILABLE
198 #define DGRAM_NODEMESSAGE   (0xFB)
199
200 #define NODE_BROADCAST_OTHERS (-1)
201 #define NODE_BROADCAST_ALL    (-2)
202 #endif
203
204 #include <signal.h>
205 void (*signal_int)(int);
206
207 /*
208 static int mpi_tag = TAG;
209 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
210 */
211
212 static int        _thread_provided = -1;
213 int               _Cmi_mynode;    /* Which address space am I */
214 int               _Cmi_mynodesize;/* Number of processors in my address space */
215 int               _Cmi_numnodes;  /* Total number of address spaces */
216 int               _Cmi_numpes;    /* Total number of processors */
217 static int        Cmi_nodestart; /* First processor in this address space */
218 CpvDeclare(void*, CmiLocalQueue);
219
220 /*Network progress utility variables. Period controls the rate at
221   which the network poll is called */
222 CpvDeclare(unsigned , networkProgressCount);
223 int networkProgressPeriod;
224
225 int               idleblock = 0;
226
227 #if 0
228 static void **recdQueue_blk;
229 static unsigned int recdQueue_blk_len;
230 static unsigned int recdQueue_first;
231 static unsigned int recdQueue_len;
232 static void recdQueueInit(void);
233 static void recdQueueAddToBack(void *element);
234 static void *recdQueueRemoveFromFront(void);
235 #endif
236
237 static void ConverseRunPE(int everReturn);
238 static void CommunicationServer(int sleepTime);
239 static void CommunicationServerThread(int sleepTime);
240
241 typedef struct msg_list {
242      char *msg;
243      struct msg_list *next;
244      int size, destpe;
245
246 #if CMK_SMP_TRACE_COMMTHREAD
247         int srcpe;
248 #endif  
249         
250      MPI_Request req;
251 } SMSG_LIST;
252
253 int MsgQueueLen=0;
254 static int request_max;
255
256 static SMSG_LIST *sent_msgs=0;
257 static SMSG_LIST *end_sent=0;
258
259 static int Cmi_dim;
260
261 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
262
263 #if NODE_0_IS_CONVHOST
264 int inside_comm = 0;
265 #endif
266
267 void CmiAbort(const char *message);
268 static void PerrorExit(const char *msg);
269
270 void SendSpanningChildren(int size, char *msg);
271 void SendHypercube(int size, char *msg);
272
273 static void PerrorExit(const char *msg)
274 {
275   perror(msg);
276   exit(1);
277 }
278
279 extern unsigned char computeCheckSum(unsigned char *data, int len);
280
281
282
283 typedef struct ProcState {
284 #if MULTI_SENDQUEUE
285 PCQueue      sendMsgBuf;       /* per processor message sending queue */
286 #endif
287 CmiNodeLock  recvLock;              /* for cs->recv */
288 } ProcState;
289
290 static ProcState  *procState;
291
292 #if CMK_SMP
293
294 #if !MULTI_SENDQUEUE
295 static PCQueue sendMsgBuf;
296 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
297 #endif
298
299 #endif
300
301 /************************************************************
302  *
303  * Processor state structure
304  *
305  ************************************************************/
306
307 /* fake Cmi_charmrun_fd */
308 static int Cmi_charmrun_fd = 0;
309 #include "machine-smp.c"
310
311 CsvDeclare(CmiNodeState, NodeState);
312
313 #include "immediate.c"
314
315 #if CMK_SHARED_VARS_UNAVAILABLE
316 /************ non SMP **************/
317 static struct CmiStateStruct Cmi_state;
318 int _Cmi_mype;
319 int _Cmi_myrank;
320
321 void CmiMemLock() {}
322 void CmiMemUnlock() {}
323
324 #define CmiGetState() (&Cmi_state)
325 #define CmiGetStateN(n) (&Cmi_state)
326
327 void CmiYield(void) { sleep(0); }
328
329 static void CmiStartThreads(char **argv)
330 {
331   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
332   _Cmi_mype = Cmi_nodestart;
333   _Cmi_myrank = 0;
334 }
335 #else   /* non smp */
336 int CmiMyPe(void)
337 {
338   return CmiGetState()->pe;
339 }
340 int CmiMyRank(void)
341 {
342   return CmiGetState()->rank;
343 }
344 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
345 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
346 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
347 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
348 #endif
349
350 /*Add a message to this processor's receive queue, pe is a rank */
351 void CmiPushPE(int pe,void *msg)
352 {
353   CmiState cs = CmiGetStateN(pe);
354   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
355 #if CMK_IMMEDIATE_MSG
356   if (CmiIsImmediate(msg)) {
357 /*
358 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
359     CmiHandleMessage(msg);
360 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
361 */
362     /**(CmiUInt2 *)msg = pe;*/
363     CMI_DEST_RANK(msg) = pe;
364     CmiPushImmediateMsg(msg);
365     return;
366   }
367 #endif
368
369 #if CMK_SMP
370   CmiLock(procState[pe].recvLock);
371 #endif
372   PCQueuePush(cs->recv,msg);
373 #if CMK_SMP
374   CmiUnlock(procState[pe].recvLock);
375 #endif
376   CmiIdleLock_addMessage(&cs->idle);
377   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
378 }
379
380 #if CMK_NODE_QUEUE_AVAILABLE
381 /*Add a message to this processor's receive queue */
382 static void CmiPushNode(void *msg)
383 {
384   MACHSTATE(3,"Pushing message into NodeRecv queue");
385 #if CMK_IMMEDIATE_MSG
386   if (CmiIsImmediate(msg)) {
387     CMI_DEST_RANK(msg) = 0;
388     CmiPushImmediateMsg(msg);
389     return;
390   }
391 #endif
392   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
393   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
394   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
395   {
396   CmiState cs=CmiGetStateN(0);
397   CmiIdleLock_addMessage(&cs->idle);
398   }
399 }
400 #endif
401
402
403 static size_t CmiAllAsyncMsgsSent(void)
404 {
405    SMSG_LIST *msg_tmp = sent_msgs;
406    MPI_Status sts;
407    int done;
408
409    while(msg_tmp!=0) {
410     done = 0;
411     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
412       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
413     if(!done)
414       return 0;
415     msg_tmp = msg_tmp->next;
416 /*    MsgQueueLen--; ????? */
417    }
418    return 1;
419 }
420
421 int CmiAsyncMsgSent(CmiCommHandle c) {
422
423   SMSG_LIST *msg_tmp = sent_msgs;
424   int done;
425   MPI_Status sts;
426
427   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
428     msg_tmp = msg_tmp->next;
429   if(msg_tmp) {
430     done = 0;
431     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
432       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
433     return ((done)?1:0);
434   } else {
435     return 1;
436   }
437 }
438
439 void CmiReleaseCommHandle(CmiCommHandle c)
440 {
441   return;
442 }
443
444 #if CMK_BLUEGENEL
445 extern void MPID_Progress_test();
446 #endif
447
448 void CmiReleaseSentMessages(void)
449 {
450   SMSG_LIST *msg_tmp=sent_msgs;
451   SMSG_LIST *prev=0;
452   SMSG_LIST *temp;
453   int done;
454   MPI_Status sts;
455
456 #if CMK_BLUEGENEL
457   MPID_Progress_test();
458 #endif
459
460   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
461   while(msg_tmp!=0) {
462     done =0;
463 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
464     double startT = CmiWallTimer();
465 #endif
466     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
467       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
468     if(done) {
469       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
470       MsgQueueLen--;
471       /* Release the message */
472       temp = msg_tmp->next;
473       if(prev==0)  /* first message */
474         sent_msgs = temp;
475       else
476         prev->next = temp;
477       CmiFree(msg_tmp->msg);
478       CmiFree(msg_tmp);
479       msg_tmp = temp;
480     } else {
481       prev = msg_tmp;
482       msg_tmp = msg_tmp->next;
483     }
484 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
485     {
486     double endT = CmiWallTimer();
487     /* only record the event if it takes more than 1ms */
488     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
489     }
490 #endif
491   }
492   end_sent = prev;
493   MACHSTATE(2,"} CmiReleaseSentMessages end");
494 }
495
496 int PumpMsgs(void)
497 {
498   int nbytes, flg, res;
499   char *msg;
500   MPI_Status sts;
501   int recd=0;
502
503 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
504   int recvCnt=0;
505 #endif
506         
507 #if CMK_BLUEGENEL
508   MPID_Progress_test();
509 #endif
510
511   MACHSTATE(2,"PumpMsgs begin {");
512
513 #if CMI_DYNAMIC_EXERT_CAP
514   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
515 #endif
516         
517   while(1) {
518 #if CMI_EXERT_RECV_CAP
519         if(recvCnt==RECV_CAP) break;
520 #elif CMI_DYNAMIC_EXERT_CAP
521         if(recvCnt >= dynamicRecvCap) break;
522 #endif
523           
524     /* First check posted recvs then do  probe unmatched outstanding messages */
525 #if MPI_POST_RECV_COUNT > 0 
526     int completed_index=-1;
527     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
528         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
529     if(flg){
530         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
531             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
532
533                 recd = 1;
534         msg = (char *) CmiAlloc(nbytes);
535         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
536         /* and repost the recv */
537
538         START_EVENT();
539
540         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
541             MPI_POST_RECV_SIZE,
542             MPI_BYTE,
543             MPI_ANY_SOURCE,
544             POST_RECV_TAG,
545             MPI_COMM_WORLD,
546             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
547                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
548
549         END_EVENT(50);
550
551         CpvAccess(Cmi_posted_recv_total)++;
552     }
553     else {
554         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
555         if(res != MPI_SUCCESS)
556         CmiAbort("MPI_Iprobe failed\n");
557         if(!flg) break;
558         recd = 1;
559         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
560         msg = (char *) CmiAlloc(nbytes);
561
562         START_EVENT();
563
564         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
565             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
566
567         END_EVENT(30);
568
569         CpvAccess(Cmi_unposted_recv_total)++;
570     }
571 #else
572     /* Original version */
573 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
574   double startT = CmiWallTimer(); 
575 #endif
576     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
577     if(res != MPI_SUCCESS)
578       CmiAbort("MPI_Iprobe failed\n");
579
580     if(!flg) break;
581 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
582     {
583     double endT = CmiWallTimer();
584     /* only trace the probe that last longer than 1ms */
585     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
586     }
587 #endif
588
589     recd = 1;
590     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
591     msg = (char *) CmiAlloc(nbytes);
592     
593     START_EVENT();
594
595     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
596       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
597
598     /*END_EVENT(30);*/
599
600 #endif
601
602 #if CMK_SMP_TRACE_COMMTHREAD
603         traceBeginCommOp(msg);
604         traceChangeLastTimestamp(CpvAccess(projTraceStart));
605         traceEndCommOp(msg);
606         #if CMI_MPI_TRACE_MOREDETAILED
607         char tmp[32];
608         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
609         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
610         #endif
611 #elif CMK_TRACE_COMMOVERHEAD
612         char tmp[32];
613         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
614         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
615 #endif
616         
617         
618     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
619     CMI_CHECK_CHECKSUM(msg, nbytes);
620 #if CMK_ERROR_CHECKING
621     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
622       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
623       CmiFree(msg);
624       CmiAbort("Abort!\n");
625       continue;
626     }
627 #endif
628         
629 #if CMK_BROADCAST_SPANNING_TREE
630     if (CMI_BROADCAST_ROOT(msg))
631       SendSpanningChildren(nbytes, msg);
632 #elif CMK_BROADCAST_HYPERCUBE
633     if (CMI_BROADCAST_ROOT(msg))
634       SendHypercube(nbytes, msg);
635 #endif
636         
637         /* In SMP mode, this push operation needs to be executed
638      * after forwarding broadcast messages. If it is executed
639      * earlier, then during the bcast msg forwarding period,    
640          * the msg could be already freed on the worker thread.
641          * As a result, the forwarded message could be wrong! 
642          * --Chao Mei
643          */
644 #if CMK_NODE_QUEUE_AVAILABLE
645     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
646       CmiPushNode(msg);
647     else
648 #endif
649         CmiPushPE(CMI_DEST_RANK(msg), msg);     
650         
651 #if CMI_EXERT_RECV_CAP
652         recvCnt++;
653 #elif CMI_DYNAMIC_EXERT_CAP
654         recvCnt++;
655 #if CMK_SMP
656         /* check sendMsgBuf to get the  number of messages that have not been sent
657          * which is only available in SMP mode
658          * MsgQueueLen indicates the number of messages that have not been released 
659          * by MPI 
660          */
661         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
662                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
663                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
664         }
665 #else
666         /* MsgQueueLen indicates the number of messages that have not been released 
667          * by MPI 
668          */
669         if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
670                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
671         }
672 #endif
673
674 #endif  
675         
676   }
677
678   
679 #if CMK_IMMEDIATE_MSG && !CMK_SMP
680   CmiHandleImmediate();
681 #endif
682   
683   MACHSTATE(2,"} PumpMsgs end ");
684   return recd;
685 }
686
687 /* blocking version */
688 static void PumpMsgsBlocking(void)
689 {
690   static int maxbytes = 20000000;
691   static char *buf = NULL;
692   int nbytes, flg;
693   MPI_Status sts;
694   char *msg;
695   int recd=0;
696
697   if (!PCQueueEmpty(CmiGetState()->recv)) return;
698   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
699   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
700   if (sent_msgs)  return;
701
702 #if 0
703   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
704 #endif
705
706   if (buf == NULL) {
707     buf = (char *) CmiAlloc(maxbytes);
708     _MEMCHECK(buf);
709   }
710
711
712 #if MPI_POST_RECV_COUNT > 0
713 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
714 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
715 #endif
716
717   START_EVENT();
718
719   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
720       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
721
722   /*END_EVENT(30);*/
723     
724    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
725    msg = (char *) CmiAlloc(nbytes);
726    memcpy(msg, buf, nbytes);
727
728 #if CMK_SMP_TRACE_COMMTHREAD
729         traceBeginCommOp(msg);
730         traceChangeLastTimestamp(CpvAccess(projTraceStart));
731         traceEndCommOp(msg);
732         #if CMI_MPI_TRACE_MOREDETAILED
733         char tmp[32];
734         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
735         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
736         #endif
737 #endif
738
739 #if CMK_BROADCAST_SPANNING_TREE
740    if (CMI_BROADCAST_ROOT(msg))
741       SendSpanningChildren(nbytes, msg);
742 #elif CMK_BROADCAST_HYPERCUBE
743    if (CMI_BROADCAST_ROOT(msg))
744       SendHypercube(nbytes, msg);
745 #endif
746   
747         /* In SMP mode, this push operation needs to be executed
748      * after forwarding broadcast messages. If it is executed
749      * earlier, then during the bcast msg forwarding period,    
750          * the msg could be already freed on the worker thread.
751          * As a result, the forwarded message could be wrong! 
752          * --Chao Mei
753          */  
754 #if CMK_NODE_QUEUE_AVAILABLE
755    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
756       CmiPushNode(msg);
757    else
758 #endif
759       CmiPushPE(CMI_DEST_RANK(msg), msg);
760 }
761
762 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
763
764 #if CMK_SMP
765
766 static int inexit = 0;
767 static CmiNodeLock  exitLock = 0;
768
769 static int MsgQueueEmpty()
770 {
771   int i;
772 #if MULTI_SENDQUEUE
773   for (i=0; i<_Cmi_mynodesize; i++)
774     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
775 #else
776   return PCQueueEmpty(sendMsgBuf);
777 #endif
778   return 1;
779 }
780
781 static int SendMsgBuf();
782
783 /* test if all processors recv queues are empty */
784 static int RecvQueueEmpty()
785 {
786   int i;
787   for (i=0; i<_Cmi_mynodesize; i++) {
788     CmiState cs=CmiGetStateN(i);
789     if (!PCQueueEmpty(cs->recv)) return 0;
790   }
791   return 1;
792 }
793
794 /**
795 CommunicationServer calls MPI to send messages in the queues and probe message from network.
796 */
797
798 #define REPORT_COMM_METRICS 0
799 #if REPORT_COMM_METRICS
800 static double pumptime = 0.0;
801 static double releasetime = 0.0;
802 static double sendtime = 0.0;
803 #endif
804
805 static void CommunicationServer(int sleepTime)
806 {
807   int static count=0;
808 /*
809   count ++;
810   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
811 */
812 #if REPORT_COMM_METRICS
813   double t1, t2, t3, t4;
814   t1 = CmiWallTimer();
815 #endif
816   PumpMsgs();
817 #if REPORT_COMM_METRICS
818   t2 = CmiWallTimer();
819 #endif
820   CmiReleaseSentMessages();
821 #if REPORT_COMM_METRICS
822   t3 = CmiWallTimer();
823 #endif
824   SendMsgBuf();
825 #if REPORT_COMM_METRICS
826   t4 = CmiWallTimer();
827   pumptime += (t2-t1);
828   releasetime += (t3-t2);
829   sendtime += (t4-t3);
830 #endif
831 /*
832   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
833 */
834   if (inexit == CmiMyNodeSize()) {
835     MACHSTATE(2, "CommunicationServer exiting {");
836 #if 0
837     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
838 #endif
839     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
840       CmiReleaseSentMessages();
841       SendMsgBuf();
842       PumpMsgs();
843     }
844     MACHSTATE(2, "CommunicationServer barrier begin {");
845
846     START_EVENT();
847
848     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
849       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
850
851     END_EVENT(10);
852
853     MACHSTATE(2, "} CommunicationServer barrier end");
854 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
855     if (CmiMyNode() == 0){
856       CmiPrintf("End of program\n");
857           #if MPI_POST_RECV_COUNT > 0
858         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
859           #endif
860     }
861 #endif
862     MACHSTATE(2, "} CommunicationServer EXIT");
863
864     ConverseCommonExit();   
865 #if REPORT_COMM_METRICS
866     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
867 #endif
868
869 #if ! CMK_AUTOBUILD
870     signal(SIGINT, signal_int);
871     MPI_Finalize();
872     #endif
873     exit(0);
874   }
875 }
876
877 #endif
878
879 static void CommunicationServerThread(int sleepTime)
880 {
881 #if CMK_SMP
882   CommunicationServer(sleepTime);
883 #endif
884 #if CMK_IMMEDIATE_MSG
885   CmiHandleImmediate();
886 #endif
887 }
888
889 #if CMK_NODE_QUEUE_AVAILABLE
890 char *CmiGetNonLocalNodeQ(void)
891 {
892   CmiState cs = CmiGetState();
893   char *result = 0;
894   CmiIdleLock_checkMessage(&cs->idle);
895 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
896     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
897     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
898     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
899     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
900     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
901 /*  }  */
902
903   return result;
904 }
905 #endif
906
907 void *CmiGetNonLocal(void)
908 {
909   static int count=0;
910   CmiState cs = CmiGetState();
911   void *msg;
912
913 #if ! CMK_SMP
914   if (CmiNumPes() == 1) return NULL;
915 #endif
916
917   CmiIdleLock_checkMessage(&cs->idle);
918   /* although it seems that lock is not needed, I found it crashes very often
919      on mpi-smp without lock */
920
921 #if ! CMK_SMP
922   CmiReleaseSentMessages();
923   PumpMsgs();
924 #endif
925
926   /* CmiLock(procState[cs->rank].recvLock); */
927   msg =  PCQueuePop(cs->recv);
928   /* CmiUnlock(procState[cs->rank].recvLock); */
929
930 /*
931   if (msg) {
932     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
933   else {
934     count++;
935     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
936   }
937 */
938 #if ! CMK_SMP
939   if (no_outstanding_sends) {
940     while (MsgQueueLen>0) {
941       CmiReleaseSentMessages();
942       PumpMsgs();
943     }
944   }
945
946   if(!msg) {
947     CmiReleaseSentMessages();
948     if (PumpMsgs())
949       return  PCQueuePop(cs->recv);
950     else
951       return 0;
952   }
953 #endif
954
955   return msg;
956 }
957
958 /* called in non-smp mode */
959 void CmiNotifyIdle(void)
960 {
961   CmiReleaseSentMessages();
962   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
963 }
964
965
966 /********************************************************
967     The call to probe immediate messages has been renamed to
968     CmiMachineProgressImpl
969 ******************************************************/
970 /* user call to handle immediate message, only useful in non SMP version
971    using polling method to schedule message.
972 */
973 /*
974 #if CMK_IMMEDIATE_MSG
975 void CmiProbeImmediateMsg()
976 {
977 #if !CMK_SMP
978   PumpMsgs();
979   CmiHandleImmediate();
980 #endif
981 }
982 #endif
983 */
984
985 /* Network progress function is used to poll the network when for
986    messages. This flushes receive buffers on some  implementations*/
987 #if CMK_MACHINE_PROGRESS_DEFINED
988 void CmiMachineProgressImpl()
989 {
990 #if !CMK_SMP
991     PumpMsgs();
992 #if CMK_IMMEDIATE_MSG
993     CmiHandleImmediate();
994 #endif
995 #else
996     /*Not implemented yet. Communication server does not seem to be
997       thread safe, so only communication thread call it */
998     if (CmiMyRank() == CmiMyNodeSize())
999         CommunicationServerThread(0);
1000 #endif
1001 }
1002 #endif
1003
1004 /********************* MESSAGE SEND FUNCTIONS ******************/
1005
1006 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1007
1008 static void CmiSendSelf(char *msg)
1009 {
1010 #if CMK_IMMEDIATE_MSG
1011     if (CmiIsImmediate(msg)) {
1012       /* CmiBecomeNonImmediate(msg); */
1013       CmiPushImmediateMsg(msg);
1014       CmiHandleImmediate();
1015       return;
1016     }
1017 #endif
1018     CQdCreate(CpvAccess(cQdState), 1);
1019     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1020 }
1021
1022 void CmiSyncSendFn(int destPE, int size, char *msg)
1023 {
1024   CmiState cs = CmiGetState();
1025   char *dupmsg = (char *) CmiAlloc(size);
1026   memcpy(dupmsg, msg, size);
1027
1028   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1029
1030   if (cs->pe==destPE) {
1031     CmiSendSelf(dupmsg);
1032   }
1033   else
1034     CmiAsyncSendFn_(destPE, size, dupmsg);
1035 }
1036
1037 #if CMK_SMP
1038
1039 /* called by communication thread in SMP */
1040 static int SendMsgBuf()
1041 {
1042   SMSG_LIST *msg_tmp;
1043   char *msg;
1044   int node, rank, size;
1045   int i;
1046   int sent = 0;
1047
1048 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1049         int sentCnt = 0;
1050 #endif  
1051         
1052 #if CMI_DYNAMIC_EXERT_CAP
1053         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1054 #endif
1055         
1056   MACHSTATE(2,"SendMsgBuf begin {");
1057 #if MULTI_SENDQUEUE
1058   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1059   {
1060     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1061     {
1062       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1063 #else
1064     /* single message sending queue */
1065     /* CmiLock(sendMsgBufLock); */
1066     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1067     /* CmiUnlock(sendMsgBufLock); */
1068     while (NULL != msg_tmp)
1069     {
1070 #endif
1071       node = msg_tmp->destpe;
1072       size = msg_tmp->size;
1073       msg = msg_tmp->msg;
1074       msg_tmp->next = 0;
1075                 
1076 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1077       while (MsgQueueLen > request_max) {
1078                 CmiReleaseSentMessages();
1079                 PumpMsgs();
1080       }
1081 #endif
1082           
1083       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1084 #if CMK_ERROR_CHECKING
1085       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1086 #endif
1087       CMI_SET_CHECKSUM(msg, size);
1088
1089 #if MPI_POST_RECV_COUNT > 0
1090         if(size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE){
1091           START_EVENT();
1092           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1093                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1094           END_EVENT(40);
1095         }
1096         else {
1097             START_EVENT();
1098             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1099                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1100             END_EVENT(40);
1101         }
1102 #else
1103         START_EVENT();
1104         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1105             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1106         /*END_EVENT(40);*/
1107 #endif
1108         
1109 #if CMK_SMP_TRACE_COMMTHREAD
1110         traceBeginCommOp(msg);
1111         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1112         /* traceSendMsgComm must execute after traceBeginCommOp because
1113          * we pretend we execute an entry method, and inside this we
1114          * pretend we will send another message. Otherwise how could
1115          * a message creation just before an entry method invocation?
1116          * If such logic is broken, the projections will not trace
1117          * messages correctly! -Chao Mei
1118          */
1119         traceSendMsgComm(msg);
1120         traceEndCommOp(msg);
1121         #if CMI_MPI_TRACE_MOREDETAILED
1122         char tmp[64];
1123         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1124         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1125         #endif
1126 #endif
1127                 
1128                 
1129       MACHSTATE(3,"}MPI_send end");
1130       MsgQueueLen++;
1131       if(sent_msgs==0)
1132         sent_msgs = msg_tmp;
1133       else
1134         end_sent->next = msg_tmp;
1135       end_sent = msg_tmp;
1136       sent=1;
1137           
1138 #if CMI_EXERT_SEND_CAP    
1139           if(++sentCnt == SEND_CAP) break;
1140 #elif CMI_DYNAMIC_EXERT_CAP
1141           if(++sentCnt >= dynamicSendCap) break;
1142           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1143                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1144 #endif    
1145           
1146 #if ! MULTI_SENDQUEUE
1147       /* CmiLock(sendMsgBufLock); */
1148       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1149       /* CmiUnlock(sendMsgBufLock); */
1150 #endif
1151     }
1152 #if MULTI_SENDQUEUE
1153   }
1154 #endif
1155   MACHSTATE(2,"}SendMsgBuf end ");
1156   return sent;
1157 }
1158
1159 void EnqueueMsg(void *m, int size, int node)
1160 {
1161   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1162   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1163   msg_tmp->msg = m;
1164   msg_tmp->size = size;
1165   msg_tmp->destpe = node;
1166         
1167 #if CMK_SMP_TRACE_COMMTHREAD
1168         msg_tmp->srcpe = CmiMyPe();
1169 #endif  
1170
1171 #if MULTI_SENDQUEUE
1172   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1173 #else
1174   CmiLock(sendMsgBufLock);
1175   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1176   CmiUnlock(sendMsgBufLock);
1177 #endif
1178         
1179   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1180 }
1181
1182 #endif
1183
1184 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1185 {
1186   CmiState cs = CmiGetState();
1187   SMSG_LIST *msg_tmp;
1188   CmiUInt2  rank, node;
1189
1190   if(destPE == cs->pe) {
1191     char *dupmsg = (char *) CmiAlloc(size);
1192     memcpy(dupmsg, msg, size);
1193     CmiSendSelf(dupmsg);
1194     return 0;
1195   }
1196   CQdCreate(CpvAccess(cQdState), 1);
1197 #if CMK_SMP
1198   node = CmiNodeOf(destPE);
1199   rank = CmiRankOf(destPE);
1200   if (node == CmiMyNode())  {
1201     CmiPushPE(rank, msg);
1202     return 0;
1203   }
1204   CMI_DEST_RANK(msg) = rank;
1205   EnqueueMsg(msg, size, node);
1206   return 0;
1207 #else
1208   /* non smp */
1209   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1210   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1211   msg_tmp->msg = msg;
1212   msg_tmp->next = 0;
1213   while (MsgQueueLen > request_max) {
1214         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1215         CmiReleaseSentMessages();
1216         PumpMsgs();
1217   }
1218 #if CMK_ERROR_CHECKING
1219   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1220 #endif
1221   CMI_SET_CHECKSUM(msg, size);
1222
1223 #if MPI_POST_RECV_COUNT > 0
1224                 if(size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE){
1225           START_EVENT();
1226           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1227                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1228           END_EVENT(40);
1229         }
1230         else {
1231           START_EVENT();
1232           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1233                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1234           END_EVENT(40);
1235         }
1236 #else
1237   START_EVENT();
1238   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1239     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1240   /*END_EVENT(40);*/
1241   #if CMK_TRACE_COMMOVERHEAD
1242         char tmp[64];
1243         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1244         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1245   #endif
1246 #endif
1247
1248   MsgQueueLen++;
1249   if(sent_msgs==0)
1250     sent_msgs = msg_tmp;
1251   else
1252     end_sent->next = msg_tmp;
1253   end_sent = msg_tmp;
1254   return (CmiCommHandle) &(msg_tmp->req);
1255 #endif              /* non-smp */
1256 }
1257
1258 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1259 {
1260   CMI_SET_BROADCAST_ROOT(msg, 0);
1261   CmiAsyncSendFn_(destPE, size, msg);
1262 }
1263
1264 void CmiFreeSendFn(int destPE, int size, char *msg)
1265 {
1266   CmiState cs = CmiGetState();
1267   CMI_SET_BROADCAST_ROOT(msg, 0);
1268
1269   if (cs->pe==destPE) {
1270     CmiSendSelf(msg);
1271   } else {
1272     CmiAsyncSendFn_(destPE, size, msg);
1273   }
1274 }
1275
1276 /*********************** BROADCAST FUNCTIONS **********************/
1277
1278 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1279 void CmiSyncSendFn1(int destPE, int size, char *msg)
1280 {
1281   CmiState cs = CmiGetState();
1282   char *dupmsg = (char *) CmiAlloc(size);
1283   memcpy(dupmsg, msg, size);
1284   if (cs->pe==destPE)
1285     CmiSendSelf(dupmsg);
1286   else
1287     CmiAsyncSendFn_(destPE, size, dupmsg);
1288 }
1289
1290 /* send msg to its spanning children in broadcast. G. Zheng */
1291 void SendSpanningChildren(int size, char *msg)
1292 {
1293   CmiState cs = CmiGetState();
1294   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1295   int startnode = CmiNodeOf(startpe);
1296   int i, exceptRank;
1297         
1298    /* first send msgs to other nodes */
1299   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1300   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1301     int nd = CmiMyNode()-startnode;
1302     if (nd<0) nd+=CmiNumNodes();
1303     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1304     if (nd > CmiNumNodes() - 1) break;
1305     nd += startnode;
1306     nd = nd%CmiNumNodes();
1307     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1308 #if CMK_SMP
1309       /* always send to the first rank of other nodes */
1310     char *newmsg = CmiCopyMsg(msg, size);
1311     CMI_DEST_RANK(newmsg) = 0;
1312     EnqueueMsg(newmsg, size, nd);
1313 #else
1314     CmiSyncSendFn1(nd, size, msg);
1315 #endif
1316   }
1317 #if CMK_SMP  
1318    /* second send msgs to my peers on this node */
1319   /* FIXME: now it's just a flat p2p send!! When node size is large,
1320    * it should also be sent in a tree
1321    */
1322    exceptRank = CMI_DEST_RANK(msg);
1323    for(i=0; i<exceptRank; i++){
1324            CmiPushPE(i, CmiCopyMsg(msg, size));
1325    }
1326    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1327            CmiPushPE(i, CmiCopyMsg(msg, size));
1328    }
1329 #endif
1330 }
1331
1332 #include <math.h>
1333
1334 /* send msg along the hypercube in broadcast. (Sameer) */
1335 void SendHypercube(int size, char *msg)
1336 {
1337   CmiState cs = CmiGetState();
1338   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1339   int startnode = CmiNodeOf(startpe);
1340   int i, exceptRank, cnt, tmp, relPE;
1341   int dims=0;
1342
1343   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1344   tmp = CmiNumNodes()-1;
1345   while(tmp>0){
1346           dims++;
1347           tmp = tmp >> 1;
1348   }
1349   if(CmiNumNodes()==1) dims=1;
1350   
1351    /* first send msgs to other nodes */  
1352   relPE = CmiMyNode()-startnode;
1353   if(relPE < 0) relPE += CmiNumNodes();
1354   cnt=0;
1355   tmp = relPE;
1356   /* count how many zeros (in binary format) relPE has */
1357   for(i=0; i<dims; i++, cnt++){
1358     if(tmp & 1 == 1) break;
1359     tmp = tmp >> 1;
1360   }
1361   
1362   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1363   for (i = cnt-1; i >= 0; i--) {
1364     int nd = relPE + (1 << i);
1365         if(nd >= CmiNumNodes()) continue;
1366         nd = (nd+startnode)%CmiNumNodes();
1367         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1368 #if CMK_SMP
1369     /* always send to the first rank of other nodes */
1370     char *newmsg = CmiCopyMsg(msg, size);
1371     CMI_DEST_RANK(newmsg) = 0;
1372     EnqueueMsg(newmsg, size, nd);
1373 #else
1374         CmiSyncSendFn1(nd, size, msg);
1375 #endif
1376   }
1377   
1378 #if CMK_SMP
1379    /* second send msgs to my peers on this node */
1380    /* FIXME: now it's just a flat p2p send!! When node size is large,
1381     * it should also be sent in a tree
1382     */
1383    exceptRank = CMI_DEST_RANK(msg);
1384    for(i=0; i<exceptRank; i++){
1385            CmiPushPE(i, CmiCopyMsg(msg, size));
1386    }
1387    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1388            CmiPushPE(i, CmiCopyMsg(msg, size));
1389    }
1390 #endif
1391 }
1392
1393 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1394 {
1395   CmiState cs = CmiGetState();
1396
1397 #if CMK_SMP     
1398   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1399   CMI_DEST_RANK(msg) = CmiMyRank();
1400 #endif
1401         
1402 #if CMK_BROADCAST_SPANNING_TREE
1403   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1404   SendSpanningChildren(size, msg);
1405
1406 #elif CMK_BROADCAST_HYPERCUBE
1407   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1408   SendHypercube(size, msg);
1409
1410 #else
1411   int i;
1412
1413   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1414     CmiSyncSendFn(i, size,msg) ;
1415   for ( i=0; i<cs->pe; i++ )
1416     CmiSyncSendFn(i, size,msg) ;
1417 #endif
1418
1419   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1420 }
1421
1422
1423 /*  FIXME: luckily async is never used  G. Zheng */
1424 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1425 {
1426   CmiState cs = CmiGetState();
1427   int i ;
1428
1429   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1430     CmiAsyncSendFn(i,size,msg) ;
1431   for ( i=0; i<cs->pe; i++ )
1432     CmiAsyncSendFn(i,size,msg) ;
1433
1434   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1435   CmiAbort("CmiAsyncBroadcastFn should never be called");
1436   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1437 }
1438
1439 void CmiFreeBroadcastFn(int size, char *msg)
1440 {
1441    CmiSyncBroadcastFn(size,msg);
1442    CmiFree(msg);
1443 }
1444
1445 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1446 {
1447
1448 #if CMK_SMP     
1449   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1450   CMI_DEST_RANK(msg) = CmiMyRank();
1451 #endif
1452
1453 #if CMK_BROADCAST_SPANNING_TREE
1454   CmiState cs = CmiGetState();
1455   CmiSyncSendFn(cs->pe, size,msg) ;
1456   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1457   SendSpanningChildren(size, msg);
1458
1459 #elif CMK_BROADCAST_HYPERCUBE
1460   CmiState cs = CmiGetState();
1461   CmiSyncSendFn(cs->pe, size,msg) ;
1462   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1463   SendHypercube(size, msg);
1464
1465 #else
1466     int i ;
1467
1468   for ( i=0; i<_Cmi_numpes; i++ )
1469     CmiSyncSendFn(i,size,msg) ;
1470 #endif
1471
1472   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1473 }
1474
1475 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1476 {
1477   int i ;
1478
1479   for ( i=1; i<_Cmi_numpes; i++ )
1480     CmiAsyncSendFn(i,size,msg) ;
1481
1482   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1483
1484   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1485 }
1486
1487 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1488 {
1489 #if CMK_SMP     
1490   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1491   CMI_DEST_RANK(msg) = CmiMyRank();
1492 #endif
1493
1494 #if CMK_BROADCAST_SPANNING_TREE
1495   CmiState cs = CmiGetState();
1496   CmiSyncSendFn(cs->pe, size,msg) ;
1497   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1498   SendSpanningChildren(size, msg);
1499
1500 #elif CMK_BROADCAST_HYPERCUBE
1501   CmiState cs = CmiGetState();
1502   CmiSyncSendFn(cs->pe, size,msg) ;
1503   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1504   SendHypercube(size, msg);
1505
1506 #else
1507   int i ;
1508
1509   for ( i=0; i<_Cmi_numpes; i++ )
1510     CmiSyncSendFn(i,size,msg) ;
1511 #endif
1512   CmiFree(msg) ;
1513   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1514 }
1515
1516 #if CMK_NODE_QUEUE_AVAILABLE
1517
1518 static void CmiSendNodeSelf(char *msg)
1519 {
1520 #if CMK_IMMEDIATE_MSG
1521 #if 0
1522     if (CmiIsImmediate(msg) && !_immRunning) {
1523       /*CmiHandleImmediateMessage(msg); */
1524       CmiPushImmediateMsg(msg);
1525       CmiHandleImmediate();
1526       return;
1527     }
1528 #endif
1529     if (CmiIsImmediate(msg))
1530     {
1531       CmiPushImmediateMsg(msg);
1532       if (!_immRunning) CmiHandleImmediate();
1533       return;
1534     }
1535 #endif
1536     CQdCreate(CpvAccess(cQdState), 1);
1537     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1538     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1539     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1540 }
1541
1542 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1543 {
1544   int i;
1545   SMSG_LIST *msg_tmp;
1546   char *dupmsg;
1547
1548   CMI_SET_BROADCAST_ROOT(msg, 0);
1549   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1550   switch (dstNode) {
1551   case NODE_BROADCAST_ALL:
1552     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1553   case NODE_BROADCAST_OTHERS:
1554     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1555     for (i=0; i<_Cmi_numnodes; i++)
1556       if (i!=_Cmi_mynode) {
1557         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1558       }
1559     break;
1560   default:
1561     dupmsg = (char *)CmiCopyMsg(msg,size);
1562     if(dstNode == _Cmi_mynode) {
1563       CmiSendNodeSelf(dupmsg);
1564     }
1565     else {
1566       CQdCreate(CpvAccess(cQdState), 1);
1567       EnqueueMsg(dupmsg, size, dstNode);
1568     }
1569   }
1570   return 0;
1571 }
1572
1573 void CmiSyncNodeSendFn(int p, int s, char *m)
1574 {
1575   CmiAsyncNodeSendFn(p, s, m);
1576 }
1577
1578 /* need */
1579 void CmiFreeNodeSendFn(int p, int s, char *m)
1580 {
1581   CmiAsyncNodeSendFn(p, s, m);
1582   CmiFree(m);
1583 }
1584
1585 /* need */
1586 void CmiSyncNodeBroadcastFn(int s, char *m)
1587 {
1588   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1589 }
1590
1591 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1592 {
1593 }
1594
1595 /* need */
1596 void CmiFreeNodeBroadcastFn(int s, char *m)
1597 {
1598   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1599   CmiFree(m);
1600 }
1601
1602 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1603 {
1604   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1605 }
1606
1607 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1608 {
1609   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1610 }
1611
1612 /* need */
1613 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1614 {
1615   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1616   CmiFree(m);
1617 }
1618 #endif
1619
1620 /************************** MAIN ***********************************/
1621 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1622
1623 void ConverseExit(void)
1624 {
1625 #if ! CMK_SMP
1626   while(!CmiAllAsyncMsgsSent()) {
1627     PumpMsgs();
1628     CmiReleaseSentMessages();
1629   }
1630   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1631     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1632
1633   ConverseCommonExit();
1634 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1635   if (CmiMyPe() == 0){
1636     CmiPrintf("End of program\n");
1637 #if MPI_POST_RECV_COUNT > 0
1638     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1639 #endif
1640 }
1641 #endif
1642 #if ! CMK_AUTOBUILD
1643   signal(SIGINT, signal_int);
1644   MPI_Finalize();
1645 #endif
1646   exit(0);
1647
1648 #else
1649     /* SMP version, communication thread will exit */
1650   ConverseCommonExit();
1651   /* atomic increment */
1652   CmiLock(exitLock);
1653   inexit++;
1654   CmiUnlock(exitLock);
1655   while (1) CmiYield();
1656 #endif
1657 }
1658
1659 static void registerMPITraceEvents() {
1660 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1661     traceRegisterUserEvent("MPI_Barrier", 10);
1662     traceRegisterUserEvent("MPI_Send", 20);
1663     traceRegisterUserEvent("MPI_Recv", 30);
1664     traceRegisterUserEvent("MPI_Isend", 40);
1665     traceRegisterUserEvent("MPI_Irecv", 50);
1666     traceRegisterUserEvent("MPI_Test", 60);
1667     traceRegisterUserEvent("MPI_Iprobe", 70);
1668 #endif
1669 }
1670
1671
1672 static char     **Cmi_argv;
1673 static char     **Cmi_argvcopy;
1674 static CmiStartFn Cmi_startfn;   /* The start function */
1675 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1676
1677 typedef struct {
1678   int sleepMs; /*Milliseconds to sleep while idle*/
1679   int nIdles; /*Number of times we've been idle in a row*/
1680   CmiState cs; /*Machine state*/
1681 } CmiIdleState;
1682
1683 static CmiIdleState *CmiNotifyGetState(void)
1684 {
1685   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1686   s->sleepMs=0;
1687   s->nIdles=0;
1688   s->cs=CmiGetState();
1689   return s;
1690 }
1691
1692 static void CmiNotifyBeginIdle(CmiIdleState *s)
1693 {
1694   s->sleepMs=0;
1695   s->nIdles=0;
1696 }
1697
1698 static void CmiNotifyStillIdle(CmiIdleState *s)
1699 {
1700 #if ! CMK_SMP
1701   CmiReleaseSentMessages();
1702   PumpMsgs();
1703 #else
1704 /*  CmiYield();  */
1705 #endif
1706
1707 #if 1
1708   {
1709   int nSpins=20; /*Number of times to spin before sleeping*/
1710   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1711   s->nIdles++;
1712   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1713     s->sleepMs+=2;
1714     if (s->sleepMs>10) s->sleepMs=10;
1715   }
1716   /*Comm. thread will listen on sockets-- just sleep*/
1717   if (s->sleepMs>0) {
1718     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1719     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1720     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1721   }
1722   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1723   }
1724 #endif
1725 }
1726
1727 #if MACHINE_DEBUG_LOG
1728 FILE *debugLog = NULL;
1729 #endif
1730
1731 static int machine_exit_idx;
1732 static void machine_exit(char *m) {
1733   EmergencyExit();
1734   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1735   fflush(stdout);
1736   CmiNodeBarrier();
1737   if (CmiMyRank() == 0) {
1738     MPI_Barrier(MPI_COMM_WORLD);
1739     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1740     MPI_Abort(MPI_COMM_WORLD, 1);
1741   } else {
1742     while (1) CmiYield();
1743   }
1744 }
1745
1746 static void KillOnAllSigs(int sigNo) {
1747   static int already_in_signal_handler = 0;
1748   char *m;
1749   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1750   already_in_signal_handler = 1;
1751 #if CMK_CCS_AVAILABLE
1752   if (CpvAccess(cmiArgDebugFlag)) {
1753     CpdNotify(CPD_SIGNAL, sigNo);
1754     CpdFreeze();
1755   }
1756 #endif
1757   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1758       "Signal: %d\n",CmiMyPe(),sigNo);
1759   CmiPrintStackTrace(1);
1760
1761   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1762   CmiSetHandler(m, machine_exit_idx);
1763   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1764   machine_exit(m);
1765 }
1766
1767 static void ConverseRunPE(int everReturn)
1768 {
1769   CmiIdleState *s=CmiNotifyGetState();
1770   CmiState cs;
1771   char** CmiMyArgv;
1772
1773 #if MPI_POST_RECV_COUNT > 0
1774         int doInit = 1;
1775         int i;
1776         
1777 #if CMK_SMP
1778         if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1779 #endif
1780         
1781         /* Currently, in mpi smp, the main thread will be the comm thread, so
1782      *  only the comm thread should post recvs. Cpvs, however, need to be
1783          * created on rank 0 (the ptrs to the actual cpv memory), while
1784      * other ranks are busy waiting for this to finish. So cpv initialize
1785          * routines have to be called on every ranks, although they are only
1786          * useful on comm thread (whose rank is not zero) -Chao Mei
1787          */
1788         CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1789         CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1790         CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
1791         CpvInitialize(char*,CmiPostedRecvBuffers);
1792
1793         if(doInit){
1794                 /* Post some extra recvs to help out with incoming messages */
1795                 /* On some MPIs the messages are unexpected and thus slow */
1796
1797                 /* An array of request handles for posted recvs */
1798                 CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1799
1800                 /* An array of buffers for posted recvs */
1801                 CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1802
1803                 /* Post Recvs */
1804                 for(i=0; i<MPI_POST_RECV_COUNT; i++){
1805                         printf("Pre post recv %d\n", i);
1806                         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
1807                                                 MPI_POST_RECV_SIZE,
1808                                                 MPI_BYTE,
1809                                                 MPI_ANY_SOURCE,
1810                                                 POST_RECV_TAG,
1811                                                 MPI_COMM_WORLD,
1812                                 &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1813                                         CmiAbort("MPI_Irecv failed\n");
1814                 }
1815         }
1816 #endif
1817         
1818   CmiNodeAllBarrier();
1819         
1820   cs = CmiGetState();
1821   CpvInitialize(void *,CmiLocalQueue);
1822   CpvAccess(CmiLocalQueue) = cs->localqueue;
1823
1824   if (CmiMyRank())
1825     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1826   else
1827     CmiMyArgv=Cmi_argv;
1828
1829   CthInit(CmiMyArgv);
1830
1831   ConverseCommonInit(CmiMyArgv);
1832   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1833
1834 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1835   CpvInitialize(double, projTraceStart);
1836   /* only PE 0 needs to care about registration (to generate sts file). */
1837   if (CmiMyPe() == 0) {
1838     registerMachineUserEventsFunction(&registerMPITraceEvents);
1839   }
1840 #endif
1841
1842   /* initialize the network progress counter*/
1843   /* Network progress function is used to poll the network when for
1844      messages. This flushes receive buffers on some  implementations*/
1845   CpvInitialize(unsigned , networkProgressCount);
1846   CpvAccess(networkProgressCount) = 0;
1847
1848 #if CMK_SMP
1849   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1850   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1851 #else
1852   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1853 #endif
1854
1855 #if MACHINE_DEBUG_LOG
1856   if (CmiMyRank() == 0) {
1857     char ln[200];
1858     sprintf(ln,"debugLog.%d",CmiMyNode());
1859     debugLog=fopen(ln,"w");
1860   }
1861 #endif
1862
1863   /* Converse initialization finishes, immediate messages can be processed.
1864      node barrier previously should take care of the node synchronization */
1865   _immediateReady = 1;
1866
1867   /* communication thread */
1868   if (CmiMyRank() == CmiMyNodeSize()) {
1869     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1870     while (1) CommunicationServerThread(5);
1871   }
1872   else {  /* worker thread */
1873   if (!everReturn) {
1874     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1875     if (Cmi_usrsched==0) CsdScheduler(-1);
1876     ConverseExit();
1877   }
1878   }
1879 }
1880
1881 static char *thread_level_tostring(int thread_level)
1882 {
1883 #if CMK_MPI_INIT_THREAD
1884   switch (thread_level) {
1885   case MPI_THREAD_SINGLE:
1886       return "MPI_THREAD_SINGLE";
1887   case MPI_THREAD_FUNNELED:
1888       return "MPI_THREAD_FUNNELED";
1889   case MPI_THREAD_SERIALIZED:
1890       return "MPI_THREAD_SERIALIZED";
1891   case MPI_THREAD_MULTIPLE :
1892       return "MPI_THREAD_MULTIPLE ";
1893   default: {
1894       char *str = (char*)malloc(5);
1895       sprintf(str,"%d", thread_level);
1896       return str;
1897       }
1898   }
1899   return  "unknown";
1900 #else
1901   char *str = (char*)malloc(5);
1902   sprintf(str,"%d", thread_level);
1903   return str;
1904 #endif
1905 }
1906
1907 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1908 {
1909   int n,i;
1910   int ver, subver;
1911   int provided;
1912   int thread_level;
1913
1914 #if MACHINE_DEBUG
1915   debugLog=NULL;
1916 #endif
1917 #if CMK_USE_HP_MAIN_FIX
1918 #if FOR_CPLUS
1919   _main(argc,argv);
1920 #endif
1921 #endif
1922
1923 #if CMK_MPI_INIT_THREAD
1924 #if CMK_SMP
1925   thread_level = MPI_THREAD_FUNNELED;
1926 #else
1927   thread_level = MPI_THREAD_SINGLE;
1928 #endif
1929   MPI_Init_thread(&argc, &argv, thread_level, &provided);
1930   _thread_provided = provided;
1931 #else
1932   MPI_Init(&argc, &argv);
1933   thread_level = 0;
1934   provided = -1;
1935 #endif
1936   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1937   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1938
1939   MPI_Get_version(&ver, &subver);
1940   if (_Cmi_mynode == 0) {
1941     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1942   }
1943
1944   /* processor per node */
1945   _Cmi_mynodesize = 1;
1946   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
1947     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1948 #if ! CMK_SMP
1949   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
1950     CmiAbort("+ppn cannot be used in non SMP version!\n");
1951 #endif
1952   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1953   if (idleblock && _Cmi_mynode == 0) {
1954     printf("Charm++: Running in idle blocking mode.\n");
1955   }
1956
1957   /* setup signal handlers */
1958   signal(SIGSEGV, KillOnAllSigs);
1959   signal(SIGFPE, KillOnAllSigs);
1960   signal(SIGILL, KillOnAllSigs);
1961   signal_int = signal(SIGINT, KillOnAllSigs);
1962   signal(SIGTERM, KillOnAllSigs);
1963   signal(SIGABRT, KillOnAllSigs);
1964 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1965   signal(SIGQUIT, KillOnAllSigs);
1966   signal(SIGBUS, KillOnAllSigs);
1967 /*#     if CMK_HANDLE_SIGUSR
1968   signal(SIGUSR1, HandleUserSignals);
1969   signal(SIGUSR2, HandleUserSignals);
1970 #     endif*/
1971 #   endif /*UNIX*/
1972   
1973 #if CMK_NO_OUTSTANDING_SENDS
1974   no_outstanding_sends=1;
1975 #endif
1976   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1977     no_outstanding_sends = 1;
1978     if (_Cmi_mynode == 0)
1979       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1980         no_outstanding_sends?"":" not");
1981   }
1982   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1983   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1984   Cmi_argvcopy = CmiCopyArgs(argv);
1985   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1986   /* find dim = log2(numpes), to pretend we are a hypercube */
1987   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1988     Cmi_dim++ ;
1989  /* CmiSpanTreeInit();*/
1990   request_max=MAX_QLEN;
1991   CmiGetArgInt(argv,"+requestmax",&request_max);
1992   /*printf("request max=%d\n", request_max);*/
1993
1994   /* checksum flag */
1995   if (CmiGetArgFlag(argv,"+checksum")) {
1996 #if !CMK_OPTIMIZE
1997     checksum_flag = 1;
1998     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1999 #else
2000     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2001 #endif
2002   }
2003
2004   {
2005   int debug = CmiGetArgFlag(argv,"++debug");
2006   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2007   if (debug || debug_no_pause)
2008   {   /*Pause so user has a chance to start and attach debugger*/
2009 #if CMK_HAS_GETPID
2010     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2011     fflush(stdout);
2012     if (!debug_no_pause)
2013       sleep(15);
2014 #else
2015     printf("++debug ignored.\n");
2016 #endif
2017   }
2018   }
2019
2020   /* CmiTimerInit(); */
2021
2022 #if 0
2023   CthInit(argv);
2024   ConverseCommonInit(argv);
2025
2026   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2027   if (initret==0) {
2028     fn(CmiGetArgc(argv), argv);
2029     if (usched==0) CsdScheduler(-1);
2030     ConverseExit();
2031   }
2032 #endif
2033
2034   CsvInitialize(CmiNodeState, NodeState);
2035   CmiNodeStateInit(&CsvAccess(NodeState));
2036
2037   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2038
2039   for (i=0; i<_Cmi_mynodesize+1; i++) {
2040 #if MULTI_SENDQUEUE
2041     procState[i].sendMsgBuf = PCQueueCreate();
2042 #endif
2043     procState[i].recvLock = CmiCreateLock();
2044   }
2045 #if CMK_SMP
2046 #if !MULTI_SENDQUEUE
2047   sendMsgBuf = PCQueueCreate();
2048   sendMsgBufLock = CmiCreateLock();
2049 #endif
2050   exitLock = CmiCreateLock();            /* exit count lock */
2051 #endif
2052
2053   /* Network progress function is used to poll the network when for
2054      messages. This flushes receive buffers on some  implementations*/
2055   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2056   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2057
2058   CmiStartThreads(argv);
2059   ConverseRunPE(initret);
2060 }
2061
2062 /***********************************************************************
2063  *
2064  * Abort function:
2065  *
2066  ************************************************************************/
2067
2068 void CmiAbort(const char *message)
2069 {
2070   char *m;
2071   /* if CharmDebug is attached simply try to send a message to it */
2072 #if CMK_CCS_AVAILABLE
2073   if (CpvAccess(cmiArgDebugFlag)) {
2074     CpdNotify(CPD_ABORT, message);
2075     CpdFreeze();
2076   }
2077 #endif  
2078   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2079         "Reason: %s\n",CmiMyPe(),message);
2080  /*  CmiError(message); */
2081   CmiPrintStackTrace(0);
2082   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2083   CmiSetHandler(m, machine_exit_idx);
2084   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2085   machine_exit(m);
2086   /* Program never reaches here */
2087   MPI_Abort(MPI_COMM_WORLD, 1);
2088 }
2089
2090
2091 #if 0
2092
2093 /* ****************************************************************** */
2094 /*    The following internal functions implement recd msg queue       */
2095 /* ****************************************************************** */
2096
2097 static void ** AllocBlock(unsigned int len)
2098 {
2099   void ** blk;
2100
2101   blk=(void **)CmiAlloc(len*sizeof(void *));
2102   if(blk==(void **)0) {
2103     CmiError("Cannot Allocate Memory!\n");
2104     MPI_Abort(MPI_COMM_WORLD, 1);
2105   }
2106   return blk;
2107 }
2108
2109 static void
2110 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2111 {
2112   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2113   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2114 }
2115
2116 void recdQueueInit(void)
2117 {
2118   recdQueue_blk = AllocBlock(BLK_LEN);
2119   recdQueue_blk_len = BLK_LEN;
2120   recdQueue_first = 0;
2121   recdQueue_len = 0;
2122 }
2123
2124 void recdQueueAddToBack(void *element)
2125 {
2126 #if NODE_0_IS_CONVHOST
2127   inside_comm = 1;
2128 #endif
2129   if(recdQueue_len==recdQueue_blk_len) {
2130     void **blk;
2131     recdQueue_blk_len *= 3;
2132     blk = AllocBlock(recdQueue_blk_len);
2133     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2134     CmiFree(recdQueue_blk);
2135     recdQueue_blk = blk;
2136     recdQueue_first = 0;
2137   }
2138   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2139 #if NODE_0_IS_CONVHOST
2140   inside_comm = 0;
2141 #endif
2142 }
2143
2144
2145 void * recdQueueRemoveFromFront(void)
2146 {
2147   if(recdQueue_len) {
2148     void *element;
2149     element = recdQueue_blk[recdQueue_first++];
2150     recdQueue_first %= recdQueue_blk_len;
2151     recdQueue_len--;
2152     return element;
2153   }
2154   return 0;
2155 }
2156
2157 #endif
2158
2159 /**************************  TIMER FUNCTIONS **************************/
2160 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
2161
2162 /* MPI calls are not threadsafe, even the timer on some machines */
2163 static CmiNodeLock  timerLock = 0;
2164 static int _absoluteTime = 0;
2165 static double starttimer = 0;
2166 static int _is_global = 0;
2167
2168 int CmiTimerIsSynchronized()
2169 {
2170   int  flag;
2171   void *v;
2172
2173   /*  check if it using synchronized timer */
2174   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
2175     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
2176   if (flag) {
2177     _is_global = *(int*)v;
2178     if (_is_global && CmiMyPe() == 0)
2179       printf("Charm++> MPI timer is synchronized\n");
2180   }
2181   return _is_global;
2182 }
2183
2184 int CmiTimerAbsolute()
2185 {       
2186   return _absoluteTime;
2187 }
2188
2189 double CmiStartTimer()
2190 {
2191   return 0.0;
2192 }
2193
2194 double CmiInitTime()
2195 {
2196   return starttimer;
2197 }
2198
2199 void CmiTimerInit(char **argv)
2200 {
2201   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
2202   if (_absoluteTime && CmiMyPe() == 0)
2203       printf("Charm++> absolute MPI timer is used\n");
2204
2205   _is_global = CmiTimerIsSynchronized();
2206
2207   if (_is_global) {
2208     if (CmiMyRank() == 0) {
2209       double minTimer;
2210 #if CMK_TIMER_USE_XT3_DCLOCK
2211       starttimer = dclock();
2212 #else
2213       starttimer = MPI_Wtime();
2214 #endif
2215
2216       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
2217                                   MPI_COMM_WORLD );
2218       starttimer = minTimer;
2219     }
2220   }
2221   else {  /* we don't have a synchronous timer, set our own start time */
2222     CmiBarrier();
2223     CmiBarrier();
2224     CmiBarrier();
2225 #if CMK_TIMER_USE_XT3_DCLOCK
2226     starttimer = dclock();
2227 #else
2228     starttimer = MPI_Wtime();
2229 #endif
2230   }
2231
2232 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
2233   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
2234     timerLock = CmiCreateLock();
2235 #endif
2236   CmiNodeAllBarrier();          /* for smp */
2237 }
2238
2239 /**
2240  * Since the timerLock is never created, and is
2241  * always NULL, then all the if-condition inside
2242  * the timer functions could be disabled right
2243  * now in the case of SMP. --Chao Mei
2244  */
2245 double CmiTimer(void)
2246 {
2247   double t;
2248 #if 0 && CMK_SMP
2249   if (timerLock) CmiLock(timerLock);
2250 #endif
2251
2252 #if CMK_TIMER_USE_XT3_DCLOCK
2253   t = dclock();
2254 #else
2255   t = MPI_Wtime();
2256 #endif
2257
2258 #if 0 && CMK_SMP
2259   if (timerLock) CmiUnlock(timerLock);
2260 #endif
2261
2262   return _absoluteTime?t: (t-starttimer);
2263 }
2264
2265 double CmiWallTimer(void)
2266 {
2267   double t;
2268 #if 0 && CMK_SMP
2269   if (timerLock) CmiLock(timerLock);
2270 #endif
2271
2272 #if CMK_TIMER_USE_XT3_DCLOCK
2273   t = dclock();
2274 #else
2275   t = MPI_Wtime();
2276 #endif
2277
2278 #if 0 && CMK_SMP
2279   if (timerLock) CmiUnlock(timerLock);
2280 #endif
2281
2282   return _absoluteTime? t: (t-starttimer);
2283 }
2284
2285 double CmiCpuTimer(void)
2286 {
2287   double t;
2288 #if 0 && CMK_SMP
2289   if (timerLock) CmiLock(timerLock);
2290 #endif
2291 #if CMK_TIMER_USE_XT3_DCLOCK
2292   t = dclock() - starttimer;
2293 #else
2294   t = MPI_Wtime() - starttimer;
2295 #endif
2296 #if 0 && CMK_SMP
2297   if (timerLock) CmiUnlock(timerLock);
2298 #endif
2299   return t;
2300 }
2301
2302 #endif
2303
2304 /* must be called on all ranks including comm thread in SMP */
2305 int CmiBarrier()
2306 {
2307 #if CMK_SMP
2308     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
2309   CmiNodeAllBarrier();
2310   if (CmiMyRank() == CmiMyNodeSize()) 
2311 #else
2312   if (CmiMyRank() == 0) 
2313 #endif
2314   {
2315 /**
2316  *  The call of CmiBarrier is usually before the initialization
2317  *  of trace module of Charm++, therefore, the START_EVENT
2318  *  and END_EVENT are disabled here. -Chao Mei
2319  */     
2320     /*START_EVENT();*/
2321
2322     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
2323         CmiAbort("Timernit: MPI_Barrier failed!\n");
2324
2325     /*END_EVENT(10);*/
2326   }
2327   CmiNodeAllBarrier();
2328   return 0;
2329 }
2330
2331 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
2332 int CmiBarrierZero()
2333 {
2334   int i;
2335 #if CMK_SMP
2336   if (CmiMyRank() == CmiMyNodeSize()) 
2337 #else
2338   if (CmiMyRank() == 0) 
2339 #endif
2340   {
2341     char msg[1];
2342     MPI_Status sts;
2343     if (CmiMyNode() == 0)  {
2344       for (i=0; i<CmiNumNodes()-1; i++) {
2345          START_EVENT();
2346
2347          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
2348             CmiPrintf("MPI_Recv failed!\n");
2349
2350          END_EVENT(30);
2351       }
2352     }
2353     else {
2354       START_EVENT();
2355
2356       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
2357          printf("MPI_Send failed!\n");
2358
2359       END_EVENT(20);
2360     }
2361   }
2362   CmiNodeAllBarrier();
2363   return 0;
2364 }
2365
2366 /*@}*/