a mpi-smp that every worker threads send messages instead of going through comm thread.
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 static int thread_level;
164
165 /* =====Beginning of Declarations of Machine Specific Variables===== */
166 #include <signal.h>
167 void (*signal_int)(int);
168
169 static int _thread_provided = -1; /* Indicating MPI thread level */
170 static int idleblock = 0;
171
172 /* A simple list for msgs that have been sent by MPI_Isend */
173 typedef struct msg_list {
174     char *msg;
175     struct msg_list *next;
176     int size, destpe, mode;
177 #if CMK_SMP_TRACE_COMMTHREAD
178     int srcpe;
179 #endif
180     MPI_Request req;
181 } SMSG_LIST;
182
183 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
184 CpvStaticDeclare(SMSG_LIST *, end_sent);
185
186 CpvStaticDeclare(int, MsgQueueLen);
187 static int request_max;
188 /*FLAG: consume outstanding Isends in scheduler loop*/
189 static int no_outstanding_sends=0;
190
191 #if NODE_0_IS_CONVHOST
192 int inside_comm = 0;
193 #endif
194
195 typedef struct ProcState {
196 #if MULTI_SENDQUEUE
197     PCQueue      sendMsgBuf;       /* per processor message sending queue */
198 #endif
199     CmiNodeLock  recvLock;                  /* for cs->recv */
200 } ProcState;
201 static ProcState  *procState;
202
203 #if CMK_SMP && !MULTI_SENDQUEUE
204 static PCQueue sendMsgBuf;
205 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
206 #endif
207 /* =====End of Declarations of Machine Specific Variables===== */
208
209
210 /* =====Beginning of Declarations of Machine Specific Functions===== */
211 /* Utility functions */
212 #if CMK_BLUEGENEL
213 extern void MPID_Progress_test();
214 #endif
215 static size_t CmiAllAsyncMsgsSent(void);
216 static void CmiReleaseSentMessages(void);
217 static int PumpMsgs(void);
218 static void PumpMsgsBlocking(void);
219
220 #if CMK_SMP
221 static int MsgQueueEmpty();
222 static int RecvQueueEmpty();
223 static int SendMsgBuf();
224 static  void EnqueueMsg(void *m, int size, int node, int mode);
225 #endif
226
227 /* The machine-specific send function */
228 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
229 #define LrtsSendFunc MachineSpecificSendForMPI
230
231 /* ### Beginning of Machine-startup Related Functions ### */
232 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
233 #define LrtsInit MachineInitForMPI
234
235 static void MachinePreCommonInitForMPI(int everReturn);
236 static void MachinePostCommonInitForMPI(int everReturn);
237 #define LrtsPreCommonInit MachinePreCommonInitForMPI
238 #define LrtsPostCommonInit MachinePostCommonInitForMPI
239 /* ### End of Machine-startup Related Functions ### */
240
241 /* ### Beginning of Machine-running Related Functions ### */
242 static void AdvanceCommunicationForMPI();
243 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
244
245 static void DrainResourcesForMPI(); /* used when exit */
246 #define LrtsDrainResources DrainResourcesForMPI
247
248 static void MachineExitForMPI();
249 #define LrtsExit MachineExitForMPI
250 /* ### End of Machine-running Related Functions ### */
251
252 /* ### Beginning of Idle-state Related Functions ### */
253 void CmiNotifyIdleForMPI(void);
254 /* ### End of Idle-state Related Functions ### */
255
256 static void MachinePostNonLocalForMPI();
257 #define LrtsPostNonLocal MachinePostNonLocalForMPI
258
259 /* =====End of Declarations of Machine Specific Functions===== */
260
261 /**
262  *  Macros that overwrites the common codes, such as
263  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
264  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
265  *  CMK_OFFLOAD_BCAST_PROCESS etc.
266  */
267 #define CMK_HAS_SIZE_IN_MSGHDR 0
268 #include "machine-lrts.h"
269 #include "machine-common-core.c"
270
271 /* The machine specific msg-sending function */
272
273 #if CMK_SMP
274 static void EnqueueMsg(void *m, int size, int node, int mode) {
275     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
276     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
277     msg_tmp->msg = m;
278     msg_tmp->size = size;
279     msg_tmp->destpe = node;
280     msg_tmp->next = 0;
281     msg_tmp->mode = mode;
282
283 #if CMK_SMP_TRACE_COMMTHREAD
284     msg_tmp->srcpe = CmiMyPe();
285 #endif
286
287 #if MULTI_SENDQUEUE
288     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
289 #else
290     /*CmiLock(sendMsgBufLock);*/
291     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
292     /*CmiUnlock(sendMsgBufLock);*/
293 #endif
294
295     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
296 }
297 #endif
298
299 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
300 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
301     int node = smsg->destpe;
302     int size = smsg->size;
303     char *msg = smsg->msg;
304     int mode = smsg->mode;
305
306     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
307 #if CMK_ERROR_CHECKING
308     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
309     CMI_SET_CHECKSUM(msg, size);
310 #endif
311
312 #if MPI_POST_RECV
313     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
314         START_EVENT();
315         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
316             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
317         /*END_EVENT(40);*/
318     } else {
319         START_EVENT();
320         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
321             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
322         /*END_EVENT(40);*/
323     }
324 #else
325     START_EVENT();
326     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
327         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
328     /*END_EVENT(40);*/
329 #endif
330
331 #if CMK_SMP_TRACE_COMMTHREAD
332     traceBeginCommOp(msg);
333     traceChangeLastTimestamp(CpvAccess(projTraceStart));
334     /* traceSendMsgComm must execute after traceBeginCommOp because
335          * we pretend we execute an entry method, and inside this we
336          * pretend we will send another message. Otherwise how could
337          * a message creation just before an entry method invocation?
338          * If such logic is broken, the projections will not trace
339          * messages correctly! -Chao Mei
340          */
341     traceSendMsgComm(msg);
342     traceEndCommOp(msg);
343 #if CMI_MPI_TRACE_MOREDETAILED
344     char tmp[64];
345     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
346     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
347 #endif
348 #endif
349
350     MACHSTATE(3,"}MPI_send end");
351     CpvAccess(MsgQueueLen)++;
352     if (CpvAccess(sent_msgs)==0)
353         CpvAccess(sent_msgs) = smsg;
354     else
355         CpvAccess(end_sent)->next = smsg;
356     CpvAccess(end_sent) = smsg;
357
358 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
359     if (mode == P2P_SYNC || mode == P2P_ASYNC)
360     {
361     while (CpvAccess(MsgQueueLen) > request_max) {
362         CmiReleaseSentMessages();
363         PumpMsgs();
364     }
365     }
366 #endif
367
368     return (CmiCommHandle) &(smsg->req);
369 }
370
371 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
372     /* Ignoring the mode for MPI layer */
373
374     CmiState cs = CmiGetState();
375     SMSG_LIST *msg_tmp;
376     int  rank;
377
378     CmiAssert(destNode != CmiMyNode());
379 #if CMK_SMP
380     if (thread_level != MPI_THREAD_MULTIPLE) {
381       EnqueueMsg(msg, size, destNode, mode);
382       return 0;
383     }
384 #endif
385     /* non smp */
386     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
387     msg_tmp->msg = msg;
388     msg_tmp->destpe = destNode;
389     msg_tmp->size = size;
390     msg_tmp->next = 0;
391     msg_tmp->mode = mode;
392     return MPISendOneMsg(msg_tmp);
393 }
394
395 static size_t CmiAllAsyncMsgsSent(void) {
396     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
397     MPI_Status sts;
398     int done;
399
400     while (msg_tmp!=0) {
401         done = 0;
402         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
403             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
404         if (!done)
405             return 0;
406         msg_tmp = msg_tmp->next;
407         /*    MsgQueueLen--; ????? */
408     }
409     return 1;
410 }
411
412 int CmiAsyncMsgSent(CmiCommHandle c) {
413
414     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
415     int done;
416     MPI_Status sts;
417
418     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
419         msg_tmp = msg_tmp->next;
420     if (msg_tmp) {
421         done = 0;
422         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
423             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
424         return ((done)?1:0);
425     } else {
426         return 1;
427     }
428 }
429
430 void CmiReleaseCommHandle(CmiCommHandle c) {
431     return;
432 }
433
434 /* ######Beginning of functions related with communication progress ###### */
435 static void CmiReleaseSentMessages(void) {
436     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
437     SMSG_LIST *prev=0;
438     SMSG_LIST *temp;
439     int done;
440     MPI_Status sts;
441
442 #if CMK_BLUEGENEL
443     MPID_Progress_test();
444 #endif
445
446     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
447     while (msg_tmp!=0) {
448         done =0;
449 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
450         double startT = CmiWallTimer();
451 #endif
452         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
453             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
454         if (done) {
455             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
456             CpvAccess(MsgQueueLen)--;
457             /* Release the message */
458             temp = msg_tmp->next;
459             if (prev==0) /* first message */
460                 CpvAccess(sent_msgs) = temp;
461             else
462                 prev->next = temp;
463             CmiFree(msg_tmp->msg);
464             CmiFree(msg_tmp);
465             msg_tmp = temp;
466         } else {
467             prev = msg_tmp;
468             msg_tmp = msg_tmp->next;
469         }
470 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
471         {
472             double endT = CmiWallTimer();
473             /* only record the event if it takes more than 1ms */
474             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
475         }
476 #endif
477     }
478     CpvAccess(end_sent) = prev;
479     MACHSTATE(2,"} CmiReleaseSentMessages end");
480 }
481
482 static int PumpMsgs(void) {
483     int nbytes, flg, res;
484     char *msg;
485     MPI_Status sts;
486     int recd=0;
487
488 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
489     int recvCnt=0;
490 #endif
491
492 #if CMK_BLUEGENEL
493     MPID_Progress_test();
494 #endif
495
496     MACHSTATE(2,"PumpMsgs begin {");
497
498 #if CMI_DYNAMIC_EXERT_CAP
499     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
500 #endif
501
502     while (1) {
503 #if CMI_EXERT_RECV_CAP
504         if (recvCnt==RECV_CAP) break;
505 #elif CMI_DYNAMIC_EXERT_CAP
506         if (recvCnt >= dynamicRecvCap) break;
507 #endif
508
509         /* First check posted recvs then do  probe unmatched outstanding messages */
510 #if MPI_POST_RECV
511         int completed_index=-1;
512         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
513             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
514         if (flg) {
515             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
516                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
517
518             recd = 1;
519             msg = (char *) CmiAlloc(nbytes);
520             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
521             /* and repost the recv */
522
523             START_EVENT();
524
525             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
526                                            MPI_POST_RECV_SIZE,
527                                            MPI_BYTE,
528                                            MPI_ANY_SOURCE,
529                                            POST_RECV_TAG,
530                                            MPI_COMM_WORLD,
531                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
532                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
533
534             END_EVENT(50);
535
536             CpvAccess(Cmi_posted_recv_total)++;
537         } else {
538             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
539             if (res != MPI_SUCCESS)
540                 CmiAbort("MPI_Iprobe failed\n");
541             if (!flg) break;
542             recd = 1;
543             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
544             msg = (char *) CmiAlloc(nbytes);
545
546             START_EVENT();
547
548             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
549                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
550
551             END_EVENT(30);
552
553             CpvAccess(Cmi_unposted_recv_total)++;
554         }
555 #else
556         /* Original version */
557 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
558         double startT = CmiWallTimer();
559 #endif
560         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
561         if (res != MPI_SUCCESS)
562             CmiAbort("MPI_Iprobe failed\n");
563
564         if (!flg) break;
565 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
566         {
567             double endT = CmiWallTimer();
568             /* only trace the probe that last longer than 1ms */
569             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
570         }
571 #endif
572
573         recd = 1;
574         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
575         msg = (char *) CmiAlloc(nbytes);
576
577         START_EVENT();
578
579         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
580             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
581
582         /*END_EVENT(30);*/
583
584 #endif
585
586 #if CMK_SMP_TRACE_COMMTHREAD
587         traceBeginCommOp(msg);
588         traceChangeLastTimestamp(CpvAccess(projTraceStart));
589         traceEndCommOp(msg);
590 #if CMI_MPI_TRACE_MOREDETAILED
591         char tmp[32];
592         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
593         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
594 #endif
595 #elif CMK_TRACE_COMMOVERHEAD
596         char tmp[32];
597         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
598         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
599 #endif
600
601
602         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
603         CMI_CHECK_CHECKSUM(msg, nbytes);
604 #if CMK_ERROR_CHECKING
605         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
606             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
607             CmiFree(msg);
608             CmiAbort("Abort!\n");
609             continue;
610         }
611 #endif
612
613         handleOneRecvedMsg(nbytes, msg);
614
615 #if CMI_EXERT_RECV_CAP
616         recvCnt++;
617 #elif CMI_DYNAMIC_EXERT_CAP
618         recvCnt++;
619 #if CMK_SMP
620         /* check sendMsgBuf to get the  number of messages that have not been sent
621              * which is only available in SMP mode
622          * MsgQueueLen indicates the number of messages that have not been released
623              * by MPI
624              */
625         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
626                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
627             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
628         }
629 #else
630         /* MsgQueueLen indicates the number of messages that have not been released
631              * by MPI
632              */
633         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
634             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
635         }
636 #endif
637
638 #endif
639
640     }
641
642     MACHSTATE(2,"} PumpMsgs end ");
643     return recd;
644 }
645
646 /* blocking version */
647 static void PumpMsgsBlocking(void) {
648     static int maxbytes = 20000000;
649     static char *buf = NULL;
650     int nbytes, flg;
651     MPI_Status sts;
652     char *msg;
653     int recd=0;
654
655     if (!PCQueueEmpty(CmiGetState()->recv)) return;
656     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
657     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
658     if (CpvAccess(sent_msgs))  return;
659
660 #if 0
661     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
662 #endif
663
664     if (buf == NULL) {
665         buf = (char *) CmiAlloc(maxbytes);
666         _MEMCHECK(buf);
667     }
668
669
670 #if MPI_POST_RECV
671 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
672     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
673 #endif
674
675     START_EVENT();
676
677     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
678         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
679
680     /*END_EVENT(30);*/
681
682     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
683     msg = (char *) CmiAlloc(nbytes);
684     memcpy(msg, buf, nbytes);
685
686 #if CMK_SMP_TRACE_COMMTHREAD
687     traceBeginCommOp(msg);
688     traceChangeLastTimestamp(CpvAccess(projTraceStart));
689     traceEndCommOp(msg);
690 #if CMI_MPI_TRACE_MOREDETAILED
691     char tmp[32];
692     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
693     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
694 #endif
695 #endif
696
697     handleOneRecvedMsg(nbytes, msg);
698 }
699
700
701 #if CMK_SMP
702
703 /* called by communication thread in SMP */
704 static int SendMsgBuf() {
705     SMSG_LIST *msg_tmp;
706     char *msg;
707     int node, rank, size;
708     int i;
709     int sent = 0;
710
711 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
712     int sentCnt = 0;
713 #endif
714
715 #if CMI_DYNAMIC_EXERT_CAP
716     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
717 #endif
718
719     MACHSTATE(2,"SendMsgBuf begin {");
720 #if MULTI_SENDQUEUE
721     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
722         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
723             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
724 #else
725     /* single message sending queue */
726     /* CmiLock(sendMsgBufLock); */
727     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
728     /* CmiUnlock(sendMsgBufLock); */
729     while (NULL != msg_tmp) {
730 #endif
731             MPISendOneMsg(msg_tmp);
732             sent=1;
733
734 #if CMI_EXERT_SEND_CAP
735             if (++sentCnt == SEND_CAP) break;
736 #elif CMI_DYNAMIC_EXERT_CAP
737             if (++sentCnt >= dynamicSendCap) break;
738             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
739                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
740 #endif
741
742 #if ! MULTI_SENDQUEUE
743             /* CmiLock(sendMsgBufLock); */
744             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
745             /* CmiUnlock(sendMsgBufLock); */
746 #endif
747         }
748 #if MULTI_SENDQUEUE
749     }
750 #endif
751     MACHSTATE(2,"}SendMsgBuf end ");
752     return sent;
753 }
754
755 static int MsgQueueEmpty() {
756     int i;
757 #if MULTI_SENDQUEUE
758     for (i=0; i<_Cmi_mynodesize; i++)
759         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
760 #else
761     return PCQueueEmpty(sendMsgBuf);
762 #endif
763     return 1;
764 }
765
766 /* test if all processors recv queues are empty */
767 static int RecvQueueEmpty() {
768     int i;
769     for (i=0; i<_Cmi_mynodesize; i++) {
770         CmiState cs=CmiGetStateN(i);
771         if (!PCQueueEmpty(cs->recv)) return 0;
772     }
773     return 1;
774 }
775
776
777 #define REPORT_COMM_METRICS 0
778 #if REPORT_COMM_METRICS
779 static double pumptime = 0.0;
780                          static double releasetime = 0.0;
781                                                      static double sendtime = 0.0;
782 #endif
783
784 #endif //end of CMK_SMP
785
786 static void AdvanceCommunicationForMPI() {
787 #if REPORT_COMM_METRICS
788     double t1, t2, t3, t4;
789     t1 = CmiWallTimer();
790 #endif
791
792 #if CMK_SMP
793     PumpMsgs();
794
795 #if REPORT_COMM_METRICS
796     t2 = CmiWallTimer();
797 #endif
798
799     CmiReleaseSentMessages();
800 #if REPORT_COMM_METRICS
801     t3 = CmiWallTimer();
802 #endif
803
804     SendMsgBuf();
805
806 #if REPORT_COMM_METRICS
807     t4 = CmiWallTimer();
808     pumptime += (t2-t1);
809     releasetime += (t3-t2);
810     sendtime += (t4-t3);
811 #endif
812
813 #else /* non-SMP case */
814     CmiReleaseSentMessages();
815
816 #if REPORT_COMM_METRICS
817     t2 = CmiWallTimer();
818 #endif
819     PumpMsgs();
820
821 #if REPORT_COMM_METRICS
822     t3 = CmiWallTimer();
823     pumptime += (t3-t2);
824     releasetime += (t2-t1);
825 #endif
826
827 #endif /* end of #if CMK_SMP */
828 }
829 /* ######End of functions related with communication progress ###### */
830
831 static void MachinePostNonLocalForMPI() {
832 #if !CMK_SMP
833     if (no_outstanding_sends) {
834         while (MsgQueueLen>0) {
835             AdvanceCommunicationForMPI();
836         }
837     }
838
839     /* FIXME: I don't think the following codes are needed because
840      * it repeats the same job of the next call of CmiGetNonLocal
841      */
842 #if 0
843     if (!msg) {
844         CmiReleaseSentMessages();
845         if (PumpMsgs())
846             return  PCQueuePop(cs->recv);
847         else
848             return 0;
849     }
850 #endif
851 #else
852   if (thread_level == MPI_THREAD_MULTIPLE) {
853         CmiReleaseSentMessages();
854         SendMsgBuf();
855   }
856 #endif
857 }
858
859 /* Idle-state related functions: called in non-smp mode */
860 void CmiNotifyIdleForMPI(void) {
861     CmiReleaseSentMessages();
862     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
863 }
864
865 /* Network progress function is used to poll the network when for
866    messages. This flushes receive buffers on some  implementations*/
867 #if CMK_MACHINE_PROGRESS_DEFINED
868 void CmiMachineProgressImpl() {
869 #if !CMK_SMP
870     PumpMsgs();
871 #if CMK_IMMEDIATE_MSG
872     CmiHandleImmediate();
873 #endif
874 #else
875     /*Not implemented yet. Communication server does not seem to be
876       thread safe, so only communication thread call it */
877     if (CmiMyRank() == CmiMyNodeSize())
878         CommunicationServerThread(0);
879 #endif
880 }
881 #endif
882
883 /* ######Beginning of functions related with exiting programs###### */
884 void DrainResourcesForMPI() {
885 #if !CMK_SMP
886     while (!CmiAllAsyncMsgsSent()) {
887         PumpMsgs();
888         CmiReleaseSentMessages();
889     }
890 #else
891     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
892         CmiReleaseSentMessages();
893         SendMsgBuf();
894         PumpMsgs();
895     }
896 #endif
897     MACHSTATE(2, "Machine exit barrier begin {");
898     START_EVENT();
899     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
900         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
901     END_EVENT(10);
902     MACHSTATE(2, "} Machine exit barrier end");
903 }
904
905 void LrtsExit() {
906 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
907     int doPrint = 0;
908 #if CMK_SMP
909     if (CmiMyNode()==0) doPrint = 1;
910 #else
911     if (CmiMyPe()==0) doPrint = 1;
912 #endif
913
914     if (doPrint) {
915 #if MPI_POST_RECV
916         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
917 #endif
918     }
919 #endif
920
921 #if REPORT_COMM_METRICS
922 #if CMK_SMP
923     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
924               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
925               pumptime, releasetime, sendtime);
926 #else
927     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
928               CmiMyPe(), pumptime, releasetime, sendtime);
929 #endif
930 #endif
931
932 #if ! CMK_AUTOBUILD
933     signal(SIGINT, signal_int);
934     MPI_Finalize();
935 #endif
936     exit(0);
937 }
938
939 static int machine_exit_idx;
940 static void machine_exit(char *m) {
941     EmergencyExit();
942     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
943     fflush(stdout);
944     CmiNodeBarrier();
945     if (CmiMyRank() == 0) {
946         MPI_Barrier(MPI_COMM_WORLD);
947         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
948         MPI_Abort(MPI_COMM_WORLD, 1);
949     } else {
950         while (1) CmiYield();
951     }
952 }
953
954 static void KillOnAllSigs(int sigNo) {
955     static int already_in_signal_handler = 0;
956     char *m;
957     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
958     already_in_signal_handler = 1;
959 #if CMK_CCS_AVAILABLE
960     if (CpvAccess(cmiArgDebugFlag)) {
961         CpdNotify(CPD_SIGNAL, sigNo);
962         CpdFreeze();
963     }
964 #endif
965     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
966              "Signal: %d\n",CmiMyPe(),sigNo);
967     CmiPrintStackTrace(1);
968
969     m = CmiAlloc(CmiMsgHeaderSizeBytes);
970     CmiSetHandler(m, machine_exit_idx);
971     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
972     machine_exit(m);
973 }
974 /* ######End of functions related with exiting programs###### */
975
976
977 /* ######Beginning of functions related with starting programs###### */
978 static void registerMPITraceEvents() {
979 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
980     traceRegisterUserEvent("MPI_Barrier", 10);
981     traceRegisterUserEvent("MPI_Send", 20);
982     traceRegisterUserEvent("MPI_Recv", 30);
983     traceRegisterUserEvent("MPI_Isend", 40);
984     traceRegisterUserEvent("MPI_Irecv", 50);
985     traceRegisterUserEvent("MPI_Test", 60);
986     traceRegisterUserEvent("MPI_Iprobe", 70);
987 #endif
988 }
989
990 #if MACHINE_DEBUG_LOG
991 FILE *debugLog = NULL;
992 #endif
993
994 static char *thread_level_tostring(int thread_level) {
995 #if CMK_MPI_INIT_THREAD
996     switch (thread_level) {
997     case MPI_THREAD_SINGLE:
998         return "MPI_THREAD_SINGLE";
999     case MPI_THREAD_FUNNELED:
1000         return "MPI_THREAD_FUNNELED";
1001     case MPI_THREAD_SERIALIZED:
1002         return "MPI_THREAD_SERIALIZED";
1003     case MPI_THREAD_MULTIPLE :
1004         return "MPI_THREAD_MULTIPLE ";
1005     default: {
1006         char *str = (char*)malloc(5);
1007         sprintf(str,"%d", thread_level);
1008         return str;
1009     }
1010     }
1011     return  "unknown";
1012 #else
1013     char *str = (char*)malloc(5);
1014     sprintf(str,"%d", thread_level);
1015     return str;
1016 #endif
1017 }
1018
1019 /**
1020  *  Obtain the number of nodes, my node id, and consuming machine layer
1021  *  specific arguments
1022  */
1023 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1024     int n,i;
1025     int ver, subver;
1026     int provided;
1027     int myNID;
1028     int largc=*argc;
1029     char** largv=*argv;
1030
1031 #if MACHINE_DEBUG
1032     debugLog=NULL;
1033 #endif
1034 #if CMK_USE_HP_MAIN_FIX
1035 #if FOR_CPLUS
1036     _main(largc,largv);
1037 #endif
1038 #endif
1039
1040 #if CMK_MPI_INIT_THREAD
1041 #if CMK_SMP
1042     thread_level = MPI_THREAD_MULTIPLE;
1043 #else
1044     thread_level = MPI_THREAD_SINGLE;
1045 #endif
1046     MPI_Init_thread(argc, argv, thread_level, &provided);
1047     _thread_provided = provided;
1048 #else
1049     MPI_Init(argc, argv);
1050     thread_level = 0;
1051     provided = -1;
1052 #endif
1053     largc = *argc;
1054     largv = *argv;
1055     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1056     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1057
1058     myNID = *myNodeID;
1059
1060     MPI_Get_version(&ver, &subver);
1061     if (myNID == 0) {
1062         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1063     }
1064
1065     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1066     if (idleblock && _Cmi_mynode == 0) {
1067         printf("Charm++: Running in idle blocking mode.\n");
1068     }
1069
1070     /* setup signal handlers */
1071     signal(SIGSEGV, KillOnAllSigs);
1072     signal(SIGFPE, KillOnAllSigs);
1073     signal(SIGILL, KillOnAllSigs);
1074     signal_int = signal(SIGINT, KillOnAllSigs);
1075     signal(SIGTERM, KillOnAllSigs);
1076     signal(SIGABRT, KillOnAllSigs);
1077 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1078     signal(SIGQUIT, KillOnAllSigs);
1079     signal(SIGBUS, KillOnAllSigs);
1080 #   endif /*UNIX*/
1081
1082 #if CMK_NO_OUTSTANDING_SENDS
1083     no_outstanding_sends=1;
1084 #endif
1085     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1086         no_outstanding_sends = 1;
1087         if (myNID == 0)
1088             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1089                    no_outstanding_sends?"":" not");
1090     }
1091
1092     request_max=MAX_QLEN;
1093     CmiGetArgInt(largv,"+requestmax",&request_max);
1094     /*printf("request max=%d\n", request_max);*/
1095
1096 #if MPI_POST_RECV
1097     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1098     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1099     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1100     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1101     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1102     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1103     if (myNID==0) {
1104         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1105                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1106     }
1107 #endif
1108
1109 #if CMI_DYNAMIC_EXERT_CAP
1110     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1111     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1112     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1113     if (myNID==0) {
1114         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1115                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1116     }
1117 #endif
1118
1119     /* checksum flag */
1120     if (CmiGetArgFlag(largv,"+checksum")) {
1121 #if CMK_ERROR_CHECKING
1122         checksum_flag = 1;
1123         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1124 #else
1125         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1126 #endif
1127     }
1128
1129     {
1130         int debug = CmiGetArgFlag(largv,"++debug");
1131         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1132         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1133 #if CMK_HAS_GETPID
1134             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1135             fflush(stdout);
1136             if (!debug_no_pause)
1137                 sleep(15);
1138 #else
1139             printf("++debug ignored.\n");
1140 #endif
1141         }
1142     }
1143
1144     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1145     for (i=0; i<_Cmi_mynodesize+1; i++) {
1146 #if MULTI_SENDQUEUE
1147         procState[i].sendMsgBuf = PCQueueCreate();
1148 #endif
1149         procState[i].recvLock = CmiCreateLock();
1150     }
1151 #if CMK_SMP
1152 #if !MULTI_SENDQUEUE
1153     sendMsgBuf = PCQueueCreate();
1154     sendMsgBufLock = CmiCreateLock();
1155 #endif
1156 #endif
1157 }
1158
1159 static void MachinePreCommonInitForMPI(int everReturn) {
1160
1161 #if MPI_POST_RECV
1162     int doInit = 1;
1163     int i;
1164
1165 #if CMK_SMP
1166     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1167 #endif
1168
1169     /* Currently, in mpi smp, the main thread will be the comm thread, so
1170      *  only the comm thread should post recvs. Cpvs, however, need to be
1171      * created on rank 0 (the ptrs to the actual cpv memory), while
1172      * other ranks are busy waiting for this to finish. So cpv initialize
1173      * routines have to be called on every ranks, although they are only
1174      * useful on comm thread (whose rank is not zero) -Chao Mei
1175      */
1176     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1177     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1178     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1179     CpvInitialize(char*,CmiPostedRecvBuffers);
1180
1181     if (doInit) {
1182         /* Post some extra recvs to help out with incoming messages */
1183         /* On some MPIs the messages are unexpected and thus slow */
1184
1185         /* An array of request handles for posted recvs */
1186         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1187
1188         /* An array of buffers for posted recvs */
1189         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1190
1191         /* Post Recvs */
1192         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1193             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1194                                            MPI_POST_RECV_SIZE,
1195                                            MPI_BYTE,
1196                                            MPI_ANY_SOURCE,
1197                                            POST_RECV_TAG,
1198                                            MPI_COMM_WORLD,
1199                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1200                 CmiAbort("MPI_Irecv failed\n");
1201         }
1202     }
1203 #endif
1204
1205 }
1206
1207 static void MachinePostCommonInitForMPI(int everReturn) {
1208
1209     CpvInitialize(SMSG_LIST *, sent_msgs);
1210     CpvInitialize(SMSG_LIST *, end_sent);
1211     CpvInitialize(int, MsgQueueLen);
1212     CpvAccess(sent_msgs) = NULL;
1213     CpvAccess(end_sent) = NULL;
1214     CpvAccess(MsgQueueLen) = 0;
1215
1216     CmiIdleState *s=CmiNotifyGetState();
1217     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1218
1219 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1220     CpvInitialize(double, projTraceStart);
1221     /* only PE 0 needs to care about registration (to generate sts file). */
1222     if (CmiMyPe() == 0) {
1223         registerMachineUserEventsFunction(&registerMPITraceEvents);
1224     }
1225 #endif
1226
1227 #if CMK_SMP
1228     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1229     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1230     if (thread_level == MPI_THREAD_MULTIPLE)
1231       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1232 #else
1233     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1234 #endif
1235
1236 #if MACHINE_DEBUG_LOG
1237     if (CmiMyRank() == 0) {
1238         char ln[200];
1239         sprintf(ln,"debugLog.%d",CmiMyNode());
1240         debugLog=fopen(ln,"w");
1241     }
1242 #endif
1243 }
1244 /* ######End of functions related with starting programs###### */
1245
1246 /***********************************************************************
1247  *
1248  * Abort function:
1249  *
1250  ************************************************************************/
1251
1252 void CmiAbort(const char *message) {
1253     char *m;
1254     /* if CharmDebug is attached simply try to send a message to it */
1255 #if CMK_CCS_AVAILABLE
1256     if (CpvAccess(cmiArgDebugFlag)) {
1257         CpdNotify(CPD_ABORT, message);
1258         CpdFreeze();
1259     }
1260 #endif
1261     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1262              "Reason: %s\n",CmiMyPe(),message);
1263     /*  CmiError(message); */
1264     CmiPrintStackTrace(0);
1265     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1266     CmiSetHandler(m, machine_exit_idx);
1267     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1268     machine_exit(m);
1269     /* Program never reaches here */
1270     MPI_Abort(MPI_COMM_WORLD, 1);
1271 }
1272
1273 /**************************  TIMER FUNCTIONS **************************/
1274 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1275
1276 /* MPI calls are not threadsafe, even the timer on some machines */
1277 static CmiNodeLock  timerLock = 0;
1278                                 static int _absoluteTime = 0;
1279                                                            static double starttimer = 0;
1280                                                                                       static int _is_global = 0;
1281
1282 int CmiTimerIsSynchronized() {
1283     int  flag;
1284     void *v;
1285
1286     /*  check if it using synchronized timer */
1287     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1288         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1289     if (flag) {
1290         _is_global = *(int*)v;
1291         if (_is_global && CmiMyPe() == 0)
1292             printf("Charm++> MPI timer is synchronized\n");
1293     }
1294     return _is_global;
1295 }
1296
1297 int CmiTimerAbsolute() {
1298     return _absoluteTime;
1299 }
1300
1301 double CmiStartTimer() {
1302     return 0.0;
1303 }
1304
1305 double CmiInitTime() {
1306     return starttimer;
1307 }
1308
1309 void CmiTimerInit(char **argv) {
1310     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1311     if (_absoluteTime && CmiMyPe() == 0)
1312         printf("Charm++> absolute MPI timer is used\n");
1313
1314     _is_global = CmiTimerIsSynchronized();
1315
1316     if (_is_global) {
1317         if (CmiMyRank() == 0) {
1318             double minTimer;
1319 #if CMK_TIMER_USE_XT3_DCLOCK
1320             starttimer = dclock();
1321 #else
1322             starttimer = MPI_Wtime();
1323 #endif
1324
1325             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1326                           MPI_COMM_WORLD );
1327             starttimer = minTimer;
1328         }
1329     } else { /* we don't have a synchronous timer, set our own start time */
1330         CmiBarrier();
1331         CmiBarrier();
1332         CmiBarrier();
1333 #if CMK_TIMER_USE_XT3_DCLOCK
1334         starttimer = dclock();
1335 #else
1336         starttimer = MPI_Wtime();
1337 #endif
1338     }
1339
1340 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1341     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1342         timerLock = CmiCreateLock();
1343 #endif
1344     CmiNodeAllBarrier();          /* for smp */
1345 }
1346
1347 /**
1348  * Since the timerLock is never created, and is
1349  * always NULL, then all the if-condition inside
1350  * the timer functions could be disabled right
1351  * now in the case of SMP. --Chao Mei
1352  */
1353 double CmiTimer(void) {
1354     double t;
1355 #if 0 && CMK_SMP
1356     if (timerLock) CmiLock(timerLock);
1357 #endif
1358
1359 #if CMK_TIMER_USE_XT3_DCLOCK
1360     t = dclock();
1361 #else
1362     t = MPI_Wtime();
1363 #endif
1364
1365 #if 0 && CMK_SMP
1366     if (timerLock) CmiUnlock(timerLock);
1367 #endif
1368
1369     return _absoluteTime?t: (t-starttimer);
1370 }
1371
1372 double CmiWallTimer(void) {
1373     double t;
1374 #if 0 && CMK_SMP
1375     if (timerLock) CmiLock(timerLock);
1376 #endif
1377
1378 #if CMK_TIMER_USE_XT3_DCLOCK
1379     t = dclock();
1380 #else
1381     t = MPI_Wtime();
1382 #endif
1383
1384 #if 0 && CMK_SMP
1385     if (timerLock) CmiUnlock(timerLock);
1386 #endif
1387
1388     return _absoluteTime? t: (t-starttimer);
1389 }
1390
1391 double CmiCpuTimer(void) {
1392     double t;
1393 #if 0 && CMK_SMP
1394     if (timerLock) CmiLock(timerLock);
1395 #endif
1396 #if CMK_TIMER_USE_XT3_DCLOCK
1397     t = dclock() - starttimer;
1398 #else
1399     t = MPI_Wtime() - starttimer;
1400 #endif
1401 #if 0 && CMK_SMP
1402     if (timerLock) CmiUnlock(timerLock);
1403 #endif
1404     return t;
1405 }
1406
1407 #endif
1408
1409 /************Barrier Related Functions****************/
1410 /* must be called on all ranks including comm thread in SMP */
1411 int CmiBarrier() {
1412 #if CMK_SMP
1413     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1414     CmiNodeAllBarrier();
1415     if (CmiMyRank() == CmiMyNodeSize())
1416 #else
1417     if (CmiMyRank() == 0)
1418 #endif
1419     {
1420         /**
1421          *  The call of CmiBarrier is usually before the initialization
1422          *  of trace module of Charm++, therefore, the START_EVENT
1423          *  and END_EVENT are disabled here. -Chao Mei
1424          */
1425         /*START_EVENT();*/
1426
1427         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1428             CmiAbort("Timernit: MPI_Barrier failed!\n");
1429
1430         /*END_EVENT(10);*/
1431     }
1432     CmiNodeAllBarrier();
1433     return 0;
1434 }
1435
1436 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1437 int CmiBarrierZero() {
1438     int i;
1439 #if CMK_SMP
1440     if (CmiMyRank() == CmiMyNodeSize())
1441 #else
1442     if (CmiMyRank() == 0)
1443 #endif
1444     {
1445         char msg[1];
1446         MPI_Status sts;
1447         if (CmiMyNode() == 0)  {
1448             for (i=0; i<CmiNumNodes()-1; i++) {
1449                 START_EVENT();
1450
1451                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1452                     CmiPrintf("MPI_Recv failed!\n");
1453
1454                 END_EVENT(30);
1455             }
1456         } else {
1457             START_EVENT();
1458
1459             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1460                 printf("MPI_Send failed!\n");
1461
1462             END_EVENT(20);
1463         }
1464     }
1465     CmiNodeAllBarrier();
1466     return 0;
1467 }
1468
1469 /*@}*/
1470