c3b632933486ae153d82aa0bc7e187d41c61e9d7
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207
208 /* =====Beginning of Declarations of Machine Specific Functions===== */
209 /* Utility functions */
210 #if CMK_BLUEGENEL
211 extern void MPID_Progress_test();
212 #endif
213 static size_t CmiAllAsyncMsgsSent(void);
214 static void CmiReleaseSentMessages(void);
215 static int PumpMsgs(void);
216 static void PumpMsgsBlocking(void);
217
218 #if CMK_SMP
219 static int MsgQueueEmpty();
220 static int RecvQueueEmpty();
221 static int SendMsgBuf();
222 static  void EnqueueMsg(void *m, int size, int node, int mode);
223 #endif
224
225 /* The machine-specific send function */
226 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
227 #define LrtsSendFunc MachineSpecificSendForMPI
228
229 /* ### Beginning of Machine-startup Related Functions ### */
230 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
231 #define LrtsInit MachineInitForMPI
232
233 static void MachinePreCommonInitForMPI(int everReturn);
234 static void MachinePostCommonInitForMPI(int everReturn);
235 #define LrtsPreCommonInit MachinePreCommonInitForMPI
236 #define LrtsPostCommonInit MachinePostCommonInitForMPI
237 /* ### End of Machine-startup Related Functions ### */
238
239 /* ### Beginning of Machine-running Related Functions ### */
240 static void AdvanceCommunicationForMPI();
241 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
242
243 static void DrainResourcesForMPI(); /* used when exit */
244 #define LrtsDrainResources DrainResourcesForMPI
245
246 static void MachineExitForMPI();
247 #define LrtsExit MachineExitForMPI
248 /* ### End of Machine-running Related Functions ### */
249
250 /* ### Beginning of Idle-state Related Functions ### */
251 void CmiNotifyIdleForMPI(void);
252 /* ### End of Idle-state Related Functions ### */
253
254 static void MachinePostNonLocalForMPI();
255 #define LrtsPostNonLocal MachinePostNonLocalForMPI
256
257 /* =====End of Declarations of Machine Specific Functions===== */
258
259 /**
260  *  Macros that overwrites the common codes, such as
261  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
262  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
263  *  CMK_OFFLOAD_BCAST_PROCESS etc.
264  */
265 #define CMK_HAS_SIZE_IN_MSGHDR 0
266 #include "machine-lrts.h"
267 #include "machine-common-core.c"
268
269 /* The machine specific msg-sending function */
270
271 #if CMK_SMP
272 static void EnqueueMsg(void *m, int size, int node, int mode) {
273     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
274     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
275     msg_tmp->msg = m;
276     msg_tmp->size = size;
277     msg_tmp->destpe = node;
278     msg_tmp->next = 0;
279     msg_tmp->mode = mode;
280
281 #if CMK_SMP_TRACE_COMMTHREAD
282     msg_tmp->srcpe = CmiMyPe();
283 #endif
284
285 #if MULTI_SENDQUEUE
286     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
287 #else
288     /*CmiLock(sendMsgBufLock);*/
289     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
290     /*CmiUnlock(sendMsgBufLock);*/
291 #endif
292
293     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
294 }
295 #endif
296
297 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
298 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
299     int node = smsg->destpe;
300     int size = smsg->size;
301     char *msg = smsg->msg;
302     int mode = smsg->mode;
303
304     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
305 #if CMK_ERROR_CHECKING
306     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
307     CMI_SET_CHECKSUM(msg, size);
308 #endif
309
310 #if MPI_POST_RECV
311     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
312         START_EVENT();
313         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
314             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
315         /*END_EVENT(40);*/
316     } else {
317         START_EVENT();
318         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
319             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
320         /*END_EVENT(40);*/
321     }
322 #else
323     START_EVENT();
324     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
325         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
326     /*END_EVENT(40);*/
327 #endif
328
329 #if CMK_SMP_TRACE_COMMTHREAD
330     traceBeginCommOp(msg);
331     traceChangeLastTimestamp(CpvAccess(projTraceStart));
332     /* traceSendMsgComm must execute after traceBeginCommOp because
333          * we pretend we execute an entry method, and inside this we
334          * pretend we will send another message. Otherwise how could
335          * a message creation just before an entry method invocation?
336          * If such logic is broken, the projections will not trace
337          * messages correctly! -Chao Mei
338          */
339     traceSendMsgComm(msg);
340     traceEndCommOp(msg);
341 #if CMI_MPI_TRACE_MOREDETAILED
342     char tmp[64];
343     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
344     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
345 #endif
346 #endif
347
348     MACHSTATE(3,"}MPI_send end");
349     CpvAccess(MsgQueueLen)++;
350     if (CpvAccess(sent_msgs)==0)
351         CpvAccess(sent_msgs) = smsg;
352     else
353         CpvAccess(end_sent)->next = smsg;
354     CpvAccess(end_sent) = smsg;
355
356 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
357     if (mode == P2P_SYNC || mode == P2P_ASYNC)
358     {
359     while (CpvAccess(MsgQueueLen) > request_max) {
360         CmiReleaseSentMessages();
361         PumpMsgs();
362     }
363     }
364 #endif
365
366     return (CmiCommHandle) &(smsg->req);
367 }
368
369 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
370     /* Ignoring the mode for MPI layer */
371
372     CmiState cs = CmiGetState();
373     SMSG_LIST *msg_tmp;
374     int  rank;
375
376     CmiAssert(destNode != CmiMyNode());
377 #if CMK_SMP
378     if (_thread_provided != MPI_THREAD_MULTIPLE) {
379       EnqueueMsg(msg, size, destNode, mode);
380       return 0;
381     }
382 #endif
383     /* non smp */
384     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
385     msg_tmp->msg = msg;
386     msg_tmp->destpe = destNode;
387     msg_tmp->size = size;
388     msg_tmp->next = 0;
389     msg_tmp->mode = mode;
390     return MPISendOneMsg(msg_tmp);
391 }
392
393 static size_t CmiAllAsyncMsgsSent(void) {
394     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
395     MPI_Status sts;
396     int done;
397
398     while (msg_tmp!=0) {
399         done = 0;
400         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
401             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
402         if (!done)
403             return 0;
404         msg_tmp = msg_tmp->next;
405         /*    MsgQueueLen--; ????? */
406     }
407     return 1;
408 }
409
410 int CmiAsyncMsgSent(CmiCommHandle c) {
411
412     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
413     int done;
414     MPI_Status sts;
415
416     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
417         msg_tmp = msg_tmp->next;
418     if (msg_tmp) {
419         done = 0;
420         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
421             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
422         return ((done)?1:0);
423     } else {
424         return 1;
425     }
426 }
427
428 void CmiReleaseCommHandle(CmiCommHandle c) {
429     return;
430 }
431
432 /* ######Beginning of functions related with communication progress ###### */
433 static void CmiReleaseSentMessages(void) {
434     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
435     SMSG_LIST *prev=0;
436     SMSG_LIST *temp;
437     int done;
438     MPI_Status sts;
439
440 #if CMK_BLUEGENEL
441     MPID_Progress_test();
442 #endif
443
444     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
445     while (msg_tmp!=0) {
446         done =0;
447 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
448         double startT = CmiWallTimer();
449 #endif
450         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
451             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
452         if (done) {
453             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
454             CpvAccess(MsgQueueLen)--;
455             /* Release the message */
456             temp = msg_tmp->next;
457             if (prev==0) /* first message */
458                 CpvAccess(sent_msgs) = temp;
459             else
460                 prev->next = temp;
461             CmiFree(msg_tmp->msg);
462             CmiFree(msg_tmp);
463             msg_tmp = temp;
464         } else {
465             prev = msg_tmp;
466             msg_tmp = msg_tmp->next;
467         }
468 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
469         {
470             double endT = CmiWallTimer();
471             /* only record the event if it takes more than 1ms */
472             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
473         }
474 #endif
475     }
476     CpvAccess(end_sent) = prev;
477     MACHSTATE(2,"} CmiReleaseSentMessages end");
478 }
479
480 static int PumpMsgs(void) {
481     int nbytes, flg, res;
482     char *msg;
483     MPI_Status sts;
484     int recd=0;
485
486 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
487     int recvCnt=0;
488 #endif
489
490 #if CMK_BLUEGENEL
491     MPID_Progress_test();
492 #endif
493
494     MACHSTATE(2,"PumpMsgs begin {");
495
496 #if CMI_DYNAMIC_EXERT_CAP
497     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
498 #endif
499
500     while (1) {
501 #if CMI_EXERT_RECV_CAP
502         if (recvCnt==RECV_CAP) break;
503 #elif CMI_DYNAMIC_EXERT_CAP
504         if (recvCnt >= dynamicRecvCap) break;
505 #endif
506
507         /* First check posted recvs then do  probe unmatched outstanding messages */
508 #if MPI_POST_RECV
509         int completed_index=-1;
510         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
511             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
512         if (flg) {
513             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
514                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
515
516             recd = 1;
517             msg = (char *) CmiAlloc(nbytes);
518             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
519             /* and repost the recv */
520
521             START_EVENT();
522
523             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
524                                            MPI_POST_RECV_SIZE,
525                                            MPI_BYTE,
526                                            MPI_ANY_SOURCE,
527                                            POST_RECV_TAG,
528                                            MPI_COMM_WORLD,
529                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
530                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
531
532             END_EVENT(50);
533
534             CpvAccess(Cmi_posted_recv_total)++;
535         } else {
536             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
537             if (res != MPI_SUCCESS)
538                 CmiAbort("MPI_Iprobe failed\n");
539             if (!flg) break;
540             recd = 1;
541             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
542             msg = (char *) CmiAlloc(nbytes);
543
544             START_EVENT();
545
546             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
547                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
548
549             END_EVENT(30);
550
551             CpvAccess(Cmi_unposted_recv_total)++;
552         }
553 #else
554         /* Original version */
555 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
556         double startT = CmiWallTimer();
557 #endif
558         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
559         if (res != MPI_SUCCESS)
560             CmiAbort("MPI_Iprobe failed\n");
561
562         if (!flg) break;
563 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
564         {
565             double endT = CmiWallTimer();
566             /* only trace the probe that last longer than 1ms */
567             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
568         }
569 #endif
570
571         recd = 1;
572         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
573         msg = (char *) CmiAlloc(nbytes);
574
575         START_EVENT();
576
577         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
578             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
579
580         /*END_EVENT(30);*/
581
582 #endif
583
584 #if CMK_SMP_TRACE_COMMTHREAD
585         traceBeginCommOp(msg);
586         traceChangeLastTimestamp(CpvAccess(projTraceStart));
587         traceEndCommOp(msg);
588 #if CMI_MPI_TRACE_MOREDETAILED
589         char tmp[32];
590         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
591         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
592 #endif
593 #elif CMK_TRACE_COMMOVERHEAD
594         char tmp[32];
595         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
596         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
597 #endif
598
599
600         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
601         CMI_CHECK_CHECKSUM(msg, nbytes);
602 #if CMK_ERROR_CHECKING
603         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
604             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
605             CmiFree(msg);
606             CmiAbort("Abort!\n");
607             continue;
608         }
609 #endif
610
611         handleOneRecvedMsg(nbytes, msg);
612
613 #if CMI_EXERT_RECV_CAP
614         recvCnt++;
615 #elif CMI_DYNAMIC_EXERT_CAP
616         recvCnt++;
617 #if CMK_SMP
618         /* check sendMsgBuf to get the  number of messages that have not been sent
619              * which is only available in SMP mode
620          * MsgQueueLen indicates the number of messages that have not been released
621              * by MPI
622              */
623         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
624                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
625             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
626         }
627 #else
628         /* MsgQueueLen indicates the number of messages that have not been released
629              * by MPI
630              */
631         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
632             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
633         }
634 #endif
635
636 #endif
637
638     }
639
640     MACHSTATE(2,"} PumpMsgs end ");
641     return recd;
642 }
643
644 /* blocking version */
645 static void PumpMsgsBlocking(void) {
646     static int maxbytes = 20000000;
647     static char *buf = NULL;
648     int nbytes, flg;
649     MPI_Status sts;
650     char *msg;
651     int recd=0;
652
653     if (!PCQueueEmpty(CmiGetState()->recv)) return;
654     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
655     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
656     if (CpvAccess(sent_msgs))  return;
657
658 #if 0
659     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
660 #endif
661
662     if (buf == NULL) {
663         buf = (char *) CmiAlloc(maxbytes);
664         _MEMCHECK(buf);
665     }
666
667
668 #if MPI_POST_RECV
669 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
670     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
671 #endif
672
673     START_EVENT();
674
675     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
676         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
677
678     /*END_EVENT(30);*/
679
680     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
681     msg = (char *) CmiAlloc(nbytes);
682     memcpy(msg, buf, nbytes);
683
684 #if CMK_SMP_TRACE_COMMTHREAD
685     traceBeginCommOp(msg);
686     traceChangeLastTimestamp(CpvAccess(projTraceStart));
687     traceEndCommOp(msg);
688 #if CMI_MPI_TRACE_MOREDETAILED
689     char tmp[32];
690     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
691     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
692 #endif
693 #endif
694
695     handleOneRecvedMsg(nbytes, msg);
696 }
697
698
699 #if CMK_SMP
700
701 /* called by communication thread in SMP */
702 static int SendMsgBuf() {
703     SMSG_LIST *msg_tmp;
704     char *msg;
705     int node, rank, size;
706     int i;
707     int sent = 0;
708
709 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
710     int sentCnt = 0;
711 #endif
712
713 #if CMI_DYNAMIC_EXERT_CAP
714     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
715 #endif
716
717     MACHSTATE(2,"SendMsgBuf begin {");
718 #if MULTI_SENDQUEUE
719     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
720         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
721             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
722 #else
723     /* single message sending queue */
724     /* CmiLock(sendMsgBufLock); */
725     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
726     /* CmiUnlock(sendMsgBufLock); */
727     while (NULL != msg_tmp) {
728 #endif
729             MPISendOneMsg(msg_tmp);
730             sent=1;
731
732 #if CMI_EXERT_SEND_CAP
733             if (++sentCnt == SEND_CAP) break;
734 #elif CMI_DYNAMIC_EXERT_CAP
735             if (++sentCnt >= dynamicSendCap) break;
736             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
737                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
738 #endif
739
740 #if ! MULTI_SENDQUEUE
741             /* CmiLock(sendMsgBufLock); */
742             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
743             /* CmiUnlock(sendMsgBufLock); */
744 #endif
745         }
746 #if MULTI_SENDQUEUE
747     }
748 #endif
749     MACHSTATE(2,"}SendMsgBuf end ");
750     return sent;
751 }
752
753 static int MsgQueueEmpty() {
754     int i;
755 #if MULTI_SENDQUEUE
756     for (i=0; i<_Cmi_mynodesize; i++)
757         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
758 #else
759     return PCQueueEmpty(sendMsgBuf);
760 #endif
761     return 1;
762 }
763
764 /* test if all processors recv queues are empty */
765 static int RecvQueueEmpty() {
766     int i;
767     for (i=0; i<_Cmi_mynodesize; i++) {
768         CmiState cs=CmiGetStateN(i);
769         if (!PCQueueEmpty(cs->recv)) return 0;
770     }
771     return 1;
772 }
773
774
775 #define REPORT_COMM_METRICS 0
776 #if REPORT_COMM_METRICS
777 static double pumptime = 0.0;
778                          static double releasetime = 0.0;
779                                                      static double sendtime = 0.0;
780 #endif
781
782 #endif //end of CMK_SMP
783
784 static void AdvanceCommunicationForMPI() {
785 #if REPORT_COMM_METRICS
786     double t1, t2, t3, t4;
787     t1 = CmiWallTimer();
788 #endif
789
790 #if CMK_SMP
791     PumpMsgs();
792
793 #if REPORT_COMM_METRICS
794     t2 = CmiWallTimer();
795 #endif
796
797     CmiReleaseSentMessages();
798 #if REPORT_COMM_METRICS
799     t3 = CmiWallTimer();
800 #endif
801
802     SendMsgBuf();
803
804 #if REPORT_COMM_METRICS
805     t4 = CmiWallTimer();
806     pumptime += (t2-t1);
807     releasetime += (t3-t2);
808     sendtime += (t4-t3);
809 #endif
810
811 #else /* non-SMP case */
812     CmiReleaseSentMessages();
813
814 #if REPORT_COMM_METRICS
815     t2 = CmiWallTimer();
816 #endif
817     PumpMsgs();
818
819 #if REPORT_COMM_METRICS
820     t3 = CmiWallTimer();
821     pumptime += (t3-t2);
822     releasetime += (t2-t1);
823 #endif
824
825 #endif /* end of #if CMK_SMP */
826 }
827 /* ######End of functions related with communication progress ###### */
828
829 static void MachinePostNonLocalForMPI() {
830 #if !CMK_SMP
831     if (no_outstanding_sends) {
832         while (CpvAccess(MsgQueueLen)>0) {
833             AdvanceCommunicationForMPI();
834         }
835     }
836
837     /* FIXME: I don't think the following codes are needed because
838      * it repeats the same job of the next call of CmiGetNonLocal
839      */
840 #if 0
841     if (!msg) {
842         CmiReleaseSentMessages();
843         if (PumpMsgs())
844             return  PCQueuePop(cs->recv);
845         else
846             return 0;
847     }
848 #endif
849 #else
850   if (_thread_provided == MPI_THREAD_MULTIPLE) {
851         CmiReleaseSentMessages();
852         SendMsgBuf();
853   }
854 #endif
855 }
856
857 /* Idle-state related functions: called in non-smp mode */
858 void CmiNotifyIdleForMPI(void) {
859     CmiReleaseSentMessages();
860     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
861 }
862
863 /* Network progress function is used to poll the network when for
864    messages. This flushes receive buffers on some  implementations*/
865 #if CMK_MACHINE_PROGRESS_DEFINED
866 void CmiMachineProgressImpl() {
867 #if !CMK_SMP
868     PumpMsgs();
869 #if CMK_IMMEDIATE_MSG
870     CmiHandleImmediate();
871 #endif
872 #else
873     /*Not implemented yet. Communication server does not seem to be
874       thread safe, so only communication thread call it */
875     if (CmiMyRank() == CmiMyNodeSize())
876         CommunicationServerThread(0);
877 #endif
878 }
879 #endif
880
881 /* ######Beginning of functions related with exiting programs###### */
882 void DrainResourcesForMPI() {
883 #if !CMK_SMP
884     while (!CmiAllAsyncMsgsSent()) {
885         PumpMsgs();
886         CmiReleaseSentMessages();
887     }
888 #else
889     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
890         CmiReleaseSentMessages();
891         SendMsgBuf();
892         PumpMsgs();
893     }
894 #endif
895     MACHSTATE(2, "Machine exit barrier begin {");
896     START_EVENT();
897     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
898         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
899     END_EVENT(10);
900     MACHSTATE(2, "} Machine exit barrier end");
901 }
902
903 void LrtsExit() {
904 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
905     int doPrint = 0;
906 #if CMK_SMP
907     if (CmiMyNode()==0) doPrint = 1;
908 #else
909     if (CmiMyPe()==0) doPrint = 1;
910 #endif
911
912     if (doPrint) {
913 #if MPI_POST_RECV
914         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
915 #endif
916     }
917 #endif
918
919 #if REPORT_COMM_METRICS
920 #if CMK_SMP
921     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
922               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
923               pumptime, releasetime, sendtime);
924 #else
925     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
926               CmiMyPe(), pumptime, releasetime, sendtime);
927 #endif
928 #endif
929
930 #if ! CMK_AUTOBUILD
931     signal(SIGINT, signal_int);
932     MPI_Finalize();
933 #endif
934     exit(0);
935 }
936
937 static int machine_exit_idx;
938 static void machine_exit(char *m) {
939     EmergencyExit();
940     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
941     fflush(stdout);
942     CmiNodeBarrier();
943     if (CmiMyRank() == 0) {
944         MPI_Barrier(MPI_COMM_WORLD);
945         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
946         MPI_Abort(MPI_COMM_WORLD, 1);
947     } else {
948         while (1) CmiYield();
949     }
950 }
951
952 static void KillOnAllSigs(int sigNo) {
953     static int already_in_signal_handler = 0;
954     char *m;
955     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
956     already_in_signal_handler = 1;
957 #if CMK_CCS_AVAILABLE
958     if (CpvAccess(cmiArgDebugFlag)) {
959         CpdNotify(CPD_SIGNAL, sigNo);
960         CpdFreeze();
961     }
962 #endif
963     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
964              "Signal: %d\n",CmiMyPe(),sigNo);
965     CmiPrintStackTrace(1);
966
967     m = CmiAlloc(CmiMsgHeaderSizeBytes);
968     CmiSetHandler(m, machine_exit_idx);
969     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
970     machine_exit(m);
971 }
972 /* ######End of functions related with exiting programs###### */
973
974
975 /* ######Beginning of functions related with starting programs###### */
976 static void registerMPITraceEvents() {
977 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
978     traceRegisterUserEvent("MPI_Barrier", 10);
979     traceRegisterUserEvent("MPI_Send", 20);
980     traceRegisterUserEvent("MPI_Recv", 30);
981     traceRegisterUserEvent("MPI_Isend", 40);
982     traceRegisterUserEvent("MPI_Irecv", 50);
983     traceRegisterUserEvent("MPI_Test", 60);
984     traceRegisterUserEvent("MPI_Iprobe", 70);
985 #endif
986 }
987
988 #if MACHINE_DEBUG_LOG
989 FILE *debugLog = NULL;
990 #endif
991
992 static char *thread_level_tostring(int thread_level) {
993 #if CMK_MPI_INIT_THREAD
994     switch (thread_level) {
995     case MPI_THREAD_SINGLE:
996         return "MPI_THREAD_SINGLE";
997     case MPI_THREAD_FUNNELED:
998         return "MPI_THREAD_FUNNELED";
999     case MPI_THREAD_SERIALIZED:
1000         return "MPI_THREAD_SERIALIZED";
1001     case MPI_THREAD_MULTIPLE :
1002         return "MPI_THREAD_MULTIPLE ";
1003     default: {
1004         char *str = (char*)malloc(5);
1005         sprintf(str,"%d", thread_level);
1006         return str;
1007     }
1008     }
1009     return  "unknown";
1010 #else
1011     char *str = (char*)malloc(5);
1012     sprintf(str,"%d", thread_level);
1013     return str;
1014 #endif
1015 }
1016
1017 /**
1018  *  Obtain the number of nodes, my node id, and consuming machine layer
1019  *  specific arguments
1020  */
1021 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1022     int n,i;
1023     int ver, subver;
1024     int provided;
1025     int thread_level;
1026     int myNID;
1027     int largc=*argc;
1028     char** largv=*argv;
1029
1030 #if MACHINE_DEBUG
1031     debugLog=NULL;
1032 #endif
1033 #if CMK_USE_HP_MAIN_FIX
1034 #if FOR_CPLUS
1035     _main(largc,largv);
1036 #endif
1037 #endif
1038
1039 #if CMK_MPI_INIT_THREAD
1040 #if CMK_SMP
1041     thread_level = MPI_THREAD_MULTIPLE;
1042 #else
1043     thread_level = MPI_THREAD_SINGLE;
1044 #endif
1045     MPI_Init_thread(argc, argv, thread_level, &provided);
1046     _thread_provided = provided;
1047 #else
1048     MPI_Init(argc, argv);
1049     thread_level = 0;
1050     provided = -1;
1051 #endif
1052     largc = *argc;
1053     largv = *argv;
1054     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1055     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1056
1057     myNID = *myNodeID;
1058
1059     MPI_Get_version(&ver, &subver);
1060     if (myNID == 0) {
1061         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1062     }
1063
1064     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1065     if (idleblock && _Cmi_mynode == 0) {
1066         printf("Charm++: Running in idle blocking mode.\n");
1067     }
1068
1069 #if CMK_CHARMDEBUG
1070     /* setup signal handlers */
1071     signal(SIGSEGV, KillOnAllSigs);
1072     signal(SIGFPE, KillOnAllSigs);
1073     signal(SIGILL, KillOnAllSigs);
1074     signal_int = signal(SIGINT, KillOnAllSigs);
1075     signal(SIGTERM, KillOnAllSigs);
1076     signal(SIGABRT, KillOnAllSigs);
1077 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1078     signal(SIGQUIT, KillOnAllSigs);
1079     signal(SIGBUS, KillOnAllSigs);
1080 #   endif /*UNIX*/
1081 #endif
1082
1083 #if CMK_NO_OUTSTANDING_SENDS
1084     no_outstanding_sends=1;
1085 #endif
1086     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1087         no_outstanding_sends = 1;
1088         if (myNID == 0)
1089             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1090                    no_outstanding_sends?"":" not");
1091     }
1092
1093     request_max=MAX_QLEN;
1094     CmiGetArgInt(largv,"+requestmax",&request_max);
1095     /*printf("request max=%d\n", request_max);*/
1096
1097 #if MPI_POST_RECV
1098     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1099     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1100     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1101     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1102     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1103     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1104     if (myNID==0) {
1105         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1106                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1107     }
1108 #endif
1109
1110 #if CMI_DYNAMIC_EXERT_CAP
1111     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1112     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1113     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1114     if (myNID==0) {
1115         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1116                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1117     }
1118 #endif
1119
1120     /* checksum flag */
1121     if (CmiGetArgFlag(largv,"+checksum")) {
1122 #if CMK_ERROR_CHECKING
1123         checksum_flag = 1;
1124         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1125 #else
1126         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1127 #endif
1128     }
1129
1130     {
1131         int debug = CmiGetArgFlag(largv,"++debug");
1132         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1133         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1134 #if CMK_HAS_GETPID
1135             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1136             fflush(stdout);
1137             if (!debug_no_pause)
1138                 sleep(15);
1139 #else
1140             printf("++debug ignored.\n");
1141 #endif
1142         }
1143     }
1144
1145     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1146     for (i=0; i<_Cmi_mynodesize+1; i++) {
1147 #if MULTI_SENDQUEUE
1148         procState[i].sendMsgBuf = PCQueueCreate();
1149 #endif
1150         procState[i].recvLock = CmiCreateLock();
1151     }
1152 #if CMK_SMP
1153 #if !MULTI_SENDQUEUE
1154     sendMsgBuf = PCQueueCreate();
1155     sendMsgBufLock = CmiCreateLock();
1156 #endif
1157 #endif
1158 }
1159
1160 static void MachinePreCommonInitForMPI(int everReturn) {
1161
1162 #if MPI_POST_RECV
1163     int doInit = 1;
1164     int i;
1165
1166 #if CMK_SMP
1167     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1168 #endif
1169
1170     /* Currently, in mpi smp, the main thread will be the comm thread, so
1171      *  only the comm thread should post recvs. Cpvs, however, need to be
1172      * created on rank 0 (the ptrs to the actual cpv memory), while
1173      * other ranks are busy waiting for this to finish. So cpv initialize
1174      * routines have to be called on every ranks, although they are only
1175      * useful on comm thread (whose rank is not zero) -Chao Mei
1176      */
1177     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1178     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1179     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1180     CpvInitialize(char*,CmiPostedRecvBuffers);
1181
1182     if (doInit) {
1183         /* Post some extra recvs to help out with incoming messages */
1184         /* On some MPIs the messages are unexpected and thus slow */
1185
1186         /* An array of request handles for posted recvs */
1187         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1188
1189         /* An array of buffers for posted recvs */
1190         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1191
1192         /* Post Recvs */
1193         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1194             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1195                                            MPI_POST_RECV_SIZE,
1196                                            MPI_BYTE,
1197                                            MPI_ANY_SOURCE,
1198                                            POST_RECV_TAG,
1199                                            MPI_COMM_WORLD,
1200                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1201                 CmiAbort("MPI_Irecv failed\n");
1202         }
1203     }
1204 #endif
1205
1206 }
1207
1208 static void MachinePostCommonInitForMPI(int everReturn) {
1209
1210     CmiIdleState *s=CmiNotifyGetState();
1211
1212     CpvInitialize(SMSG_LIST *, sent_msgs);
1213     CpvInitialize(SMSG_LIST *, end_sent);
1214     CpvInitialize(int, MsgQueueLen);
1215     CpvAccess(sent_msgs) = NULL;
1216     CpvAccess(end_sent) = NULL;
1217     CpvAccess(MsgQueueLen) = 0;
1218
1219     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1220
1221 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1222     CpvInitialize(double, projTraceStart);
1223     /* only PE 0 needs to care about registration (to generate sts file). */
1224     if (CmiMyPe() == 0) {
1225         registerMachineUserEventsFunction(&registerMPITraceEvents);
1226     }
1227 #endif
1228
1229 #if CMK_SMP
1230     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1231     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1232     if (_thread_provided == MPI_THREAD_MULTIPLE)
1233       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1234 #else
1235     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1236 #endif
1237
1238 #if MACHINE_DEBUG_LOG
1239     if (CmiMyRank() == 0) {
1240         char ln[200];
1241         sprintf(ln,"debugLog.%d",CmiMyNode());
1242         debugLog=fopen(ln,"w");
1243     }
1244 #endif
1245 }
1246 /* ######End of functions related with starting programs###### */
1247
1248 /***********************************************************************
1249  *
1250  * Abort function:
1251  *
1252  ************************************************************************/
1253
1254 void CmiAbort(const char *message) {
1255     char *m;
1256     /* if CharmDebug is attached simply try to send a message to it */
1257 #if CMK_CCS_AVAILABLE
1258     if (CpvAccess(cmiArgDebugFlag)) {
1259         CpdNotify(CPD_ABORT, message);
1260         CpdFreeze();
1261     }
1262 #endif
1263     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1264              "Reason: %s\n",CmiMyPe(),message);
1265     /*  CmiError(message); */
1266     CmiPrintStackTrace(0);
1267     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1268     CmiSetHandler(m, machine_exit_idx);
1269     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1270     machine_exit(m);
1271     /* Program never reaches here */
1272     MPI_Abort(MPI_COMM_WORLD, 1);
1273 }
1274
1275 /**************************  TIMER FUNCTIONS **************************/
1276 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1277
1278 /* MPI calls are not threadsafe, even the timer on some machines */
1279 static CmiNodeLock  timerLock = 0;
1280                                 static int _absoluteTime = 0;
1281                                                            static double starttimer = 0;
1282                                                                                       static int _is_global = 0;
1283
1284 int CmiTimerIsSynchronized() {
1285     int  flag;
1286     void *v;
1287
1288     /*  check if it using synchronized timer */
1289     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1290         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1291     if (flag) {
1292         _is_global = *(int*)v;
1293         if (_is_global && CmiMyPe() == 0)
1294             printf("Charm++> MPI timer is synchronized\n");
1295     }
1296     return _is_global;
1297 }
1298
1299 int CmiTimerAbsolute() {
1300     return _absoluteTime;
1301 }
1302
1303 double CmiStartTimer() {
1304     return 0.0;
1305 }
1306
1307 double CmiInitTime() {
1308     return starttimer;
1309 }
1310
1311 void CmiTimerInit(char **argv) {
1312     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1313     if (_absoluteTime && CmiMyPe() == 0)
1314         printf("Charm++> absolute MPI timer is used\n");
1315
1316     _is_global = CmiTimerIsSynchronized();
1317
1318     if (_is_global) {
1319         if (CmiMyRank() == 0) {
1320             double minTimer;
1321 #if CMK_TIMER_USE_XT3_DCLOCK
1322             starttimer = dclock();
1323 #else
1324             starttimer = MPI_Wtime();
1325 #endif
1326
1327             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1328                           MPI_COMM_WORLD );
1329             starttimer = minTimer;
1330         }
1331     } else { /* we don't have a synchronous timer, set our own start time */
1332         CmiBarrier();
1333         CmiBarrier();
1334         CmiBarrier();
1335 #if CMK_TIMER_USE_XT3_DCLOCK
1336         starttimer = dclock();
1337 #else
1338         starttimer = MPI_Wtime();
1339 #endif
1340     }
1341
1342 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1343     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1344         timerLock = CmiCreateLock();
1345 #endif
1346     CmiNodeAllBarrier();          /* for smp */
1347 }
1348
1349 /**
1350  * Since the timerLock is never created, and is
1351  * always NULL, then all the if-condition inside
1352  * the timer functions could be disabled right
1353  * now in the case of SMP. --Chao Mei
1354  */
1355 double CmiTimer(void) {
1356     double t;
1357 #if 0 && CMK_SMP
1358     if (timerLock) CmiLock(timerLock);
1359 #endif
1360
1361 #if CMK_TIMER_USE_XT3_DCLOCK
1362     t = dclock();
1363 #else
1364     t = MPI_Wtime();
1365 #endif
1366
1367 #if 0 && CMK_SMP
1368     if (timerLock) CmiUnlock(timerLock);
1369 #endif
1370
1371     return _absoluteTime?t: (t-starttimer);
1372 }
1373
1374 double CmiWallTimer(void) {
1375     double t;
1376 #if 0 && CMK_SMP
1377     if (timerLock) CmiLock(timerLock);
1378 #endif
1379
1380 #if CMK_TIMER_USE_XT3_DCLOCK
1381     t = dclock();
1382 #else
1383     t = MPI_Wtime();
1384 #endif
1385
1386 #if 0 && CMK_SMP
1387     if (timerLock) CmiUnlock(timerLock);
1388 #endif
1389
1390     return _absoluteTime? t: (t-starttimer);
1391 }
1392
1393 double CmiCpuTimer(void) {
1394     double t;
1395 #if 0 && CMK_SMP
1396     if (timerLock) CmiLock(timerLock);
1397 #endif
1398 #if CMK_TIMER_USE_XT3_DCLOCK
1399     t = dclock() - starttimer;
1400 #else
1401     t = MPI_Wtime() - starttimer;
1402 #endif
1403 #if 0 && CMK_SMP
1404     if (timerLock) CmiUnlock(timerLock);
1405 #endif
1406     return t;
1407 }
1408
1409 #endif
1410
1411 /************Barrier Related Functions****************/
1412 /* must be called on all ranks including comm thread in SMP */
1413 int CmiBarrier() {
1414 #if CMK_SMP
1415     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1416     CmiNodeAllBarrier();
1417     if (CmiMyRank() == CmiMyNodeSize())
1418 #else
1419     if (CmiMyRank() == 0)
1420 #endif
1421     {
1422         /**
1423          *  The call of CmiBarrier is usually before the initialization
1424          *  of trace module of Charm++, therefore, the START_EVENT
1425          *  and END_EVENT are disabled here. -Chao Mei
1426          */
1427         /*START_EVENT();*/
1428
1429         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1430             CmiAbort("Timernit: MPI_Barrier failed!\n");
1431
1432         /*END_EVENT(10);*/
1433     }
1434     CmiNodeAllBarrier();
1435     return 0;
1436 }
1437
1438 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1439 int CmiBarrierZero() {
1440     int i;
1441 #if CMK_SMP
1442     if (CmiMyRank() == CmiMyNodeSize())
1443 #else
1444     if (CmiMyRank() == 0)
1445 #endif
1446     {
1447         char msg[1];
1448         MPI_Status sts;
1449         if (CmiMyNode() == 0)  {
1450             for (i=0; i<CmiNumNodes()-1; i++) {
1451                 START_EVENT();
1452
1453                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1454                     CmiPrintf("MPI_Recv failed!\n");
1455
1456                 END_EVENT(30);
1457             }
1458         } else {
1459             START_EVENT();
1460
1461             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1462                 printf("MPI_Send failed!\n");
1463
1464             END_EVENT(20);
1465         }
1466     }
1467     CmiNodeAllBarrier();
1468     return 0;
1469 }
1470
1471 /*@}*/
1472