b6ede634a6b691b741a53d46d615a0f2e24b83ca
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207
208 /* =====Beginning of Declarations of Machine Specific Functions===== */
209 /* Utility functions */
210 #if CMK_BLUEGENEL
211 extern void MPID_Progress_test();
212 #endif
213 static size_t CmiAllAsyncMsgsSent(void);
214 static void CmiReleaseSentMessages(void);
215 static int PumpMsgs(void);
216 static void PumpMsgsBlocking(void);
217
218 #if CMK_SMP
219 static int MsgQueueEmpty();
220 static int RecvQueueEmpty();
221 static int SendMsgBuf();
222 static  void EnqueueMsg(void *m, int size, int node, int mode);
223 #endif
224
225 /* The machine-specific send function */
226 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
227 #define LrtsSendFunc MachineSpecificSendForMPI
228
229 /* ### Beginning of Machine-startup Related Functions ### */
230 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
231 #define LrtsInit MachineInitForMPI
232
233 static void MachinePreCommonInitForMPI(int everReturn);
234 static void MachinePostCommonInitForMPI(int everReturn);
235 #define LrtsPreCommonInit MachinePreCommonInitForMPI
236 #define LrtsPostCommonInit MachinePostCommonInitForMPI
237 /* ### End of Machine-startup Related Functions ### */
238
239 /* ### Beginning of Machine-running Related Functions ### */
240 static void AdvanceCommunicationForMPI();
241 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
242
243 static void DrainResourcesForMPI(); /* used when exit */
244 #define LrtsDrainResources DrainResourcesForMPI
245
246 static void MachineExitForMPI();
247 #define LrtsExit MachineExitForMPI
248 /* ### End of Machine-running Related Functions ### */
249
250 /* ### Beginning of Idle-state Related Functions ### */
251 void CmiNotifyIdleForMPI(void);
252 /* ### End of Idle-state Related Functions ### */
253
254 static void MachinePostNonLocalForMPI();
255 #define LrtsPostNonLocal MachinePostNonLocalForMPI
256
257 /* =====End of Declarations of Machine Specific Functions===== */
258
259 /**
260  *  Macros that overwrites the common codes, such as
261  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
262  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
263  *  CMK_OFFLOAD_BCAST_PROCESS etc.
264  */
265 #define CMK_HAS_SIZE_IN_MSGHDR 0
266 #include "machine-lrts.h"
267 #include "machine-common-core.c"
268
269 /* The machine specific msg-sending function */
270
271 #if CMK_SMP
272 static void EnqueueMsg(void *m, int size, int node, int mode) {
273     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
274     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
275     msg_tmp->msg = m;
276     msg_tmp->size = size;
277     msg_tmp->destpe = node;
278     msg_tmp->next = 0;
279     msg_tmp->mode = mode;
280
281 #if CMK_SMP_TRACE_COMMTHREAD
282     msg_tmp->srcpe = CmiMyPe();
283 #endif
284
285 #if MULTI_SENDQUEUE
286     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
287 #else
288     /*CmiLock(sendMsgBufLock);*/
289     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
290     /*CmiUnlock(sendMsgBufLock);*/
291 #endif
292
293     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
294 }
295 #endif
296
297 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
298 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
299     int node = smsg->destpe;
300     int size = smsg->size;
301     char *msg = smsg->msg;
302     int mode = smsg->mode;
303
304     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
305 #if CMK_ERROR_CHECKING
306     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
307     CMI_SET_CHECKSUM(msg, size);
308 #endif
309
310 #if MPI_POST_RECV
311     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
312         START_EVENT();
313         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
314             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
315         /*END_EVENT(40);*/
316     } else {
317         START_EVENT();
318         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
319             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
320         /*END_EVENT(40);*/
321     }
322 #else
323     START_EVENT();
324     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
325         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
326     /*END_EVENT(40);*/
327 #endif
328
329 #if CMK_SMP_TRACE_COMMTHREAD
330     traceBeginCommOp(msg);
331     traceChangeLastTimestamp(CpvAccess(projTraceStart));
332     /* traceSendMsgComm must execute after traceBeginCommOp because
333          * we pretend we execute an entry method, and inside this we
334          * pretend we will send another message. Otherwise how could
335          * a message creation just before an entry method invocation?
336          * If such logic is broken, the projections will not trace
337          * messages correctly! -Chao Mei
338          */
339     traceSendMsgComm(msg);
340     traceEndCommOp(msg);
341 #if CMI_MPI_TRACE_MOREDETAILED
342     char tmp[64];
343     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
344     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
345 #endif
346 #endif
347
348     MACHSTATE(3,"}MPI_send end");
349     CpvAccess(MsgQueueLen)++;
350     if (CpvAccess(sent_msgs)==0)
351         CpvAccess(sent_msgs) = smsg;
352     else
353         CpvAccess(end_sent)->next = smsg;
354     CpvAccess(end_sent) = smsg;
355
356 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
357     if (mode == P2P_SYNC || mode == P2P_ASYNC)
358     {
359     while (CpvAccess(MsgQueueLen) > request_max) {
360         CmiReleaseSentMessages();
361         PumpMsgs();
362     }
363     }
364 #endif
365
366     return (CmiCommHandle) &(smsg->req);
367 }
368
369 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
370     /* Ignoring the mode for MPI layer */
371
372     CmiState cs = CmiGetState();
373     SMSG_LIST *msg_tmp;
374     int  rank;
375
376     CmiAssert(destNode != CmiMyNode());
377 #if CMK_SMP
378     if (_thread_provided != MPI_THREAD_MULTIPLE) {
379       EnqueueMsg(msg, size, destNode, mode);
380       return 0;
381     }
382 #endif
383     /* non smp */
384     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
385     msg_tmp->msg = msg;
386     msg_tmp->destpe = destNode;
387     msg_tmp->size = size;
388     msg_tmp->next = 0;
389     msg_tmp->mode = mode;
390     return MPISendOneMsg(msg_tmp);
391 }
392
393 static size_t CmiAllAsyncMsgsSent(void) {
394     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
395     MPI_Status sts;
396     int done;
397
398     while (msg_tmp!=0) {
399         done = 0;
400         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
401             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
402         if (!done)
403             return 0;
404         msg_tmp = msg_tmp->next;
405         /*    MsgQueueLen--; ????? */
406     }
407     return 1;
408 }
409
410 int CmiAsyncMsgSent(CmiCommHandle c) {
411
412     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
413     int done;
414     MPI_Status sts;
415
416     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
417         msg_tmp = msg_tmp->next;
418     if (msg_tmp) {
419         done = 0;
420         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
421             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
422         return ((done)?1:0);
423     } else {
424         return 1;
425     }
426 }
427
428 void CmiReleaseCommHandle(CmiCommHandle c) {
429     return;
430 }
431
432 /* ######Beginning of functions related with communication progress ###### */
433 static void CmiReleaseSentMessages(void) {
434     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
435     SMSG_LIST *prev=0;
436     SMSG_LIST *temp;
437     int done;
438     MPI_Status sts;
439
440 #if CMK_BLUEGENEL
441     MPID_Progress_test();
442 #endif
443
444     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
445     while (msg_tmp!=0) {
446         done =0;
447 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
448         double startT = CmiWallTimer();
449 #endif
450         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
451             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
452         if (done) {
453             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
454             CpvAccess(MsgQueueLen)--;
455             /* Release the message */
456             temp = msg_tmp->next;
457             if (prev==0) /* first message */
458                 CpvAccess(sent_msgs) = temp;
459             else
460                 prev->next = temp;
461             CmiFree(msg_tmp->msg);
462             CmiFree(msg_tmp);
463             msg_tmp = temp;
464         } else {
465             prev = msg_tmp;
466             msg_tmp = msg_tmp->next;
467         }
468 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
469         {
470             double endT = CmiWallTimer();
471             /* only record the event if it takes more than 1ms */
472             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
473         }
474 #endif
475     }
476     CpvAccess(end_sent) = prev;
477     MACHSTATE(2,"} CmiReleaseSentMessages end");
478 }
479
480 static int PumpMsgs(void) {
481     int nbytes, flg, res;
482     char *msg;
483     MPI_Status sts;
484     int recd=0;
485
486 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
487     int recvCnt=0;
488 #endif
489
490 #if CMK_BLUEGENEL
491     MPID_Progress_test();
492 #endif
493
494     MACHSTATE(2,"PumpMsgs begin {");
495
496 #if CMI_DYNAMIC_EXERT_CAP
497     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
498 #endif
499
500     while (1) {
501 #if CMI_EXERT_RECV_CAP
502         if (recvCnt==RECV_CAP) break;
503 #elif CMI_DYNAMIC_EXERT_CAP
504         if (recvCnt >= dynamicRecvCap) break;
505 #endif
506
507         /* First check posted recvs then do  probe unmatched outstanding messages */
508 #if MPI_POST_RECV
509         int completed_index=-1;
510         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
511             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
512         if (flg) {
513             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
514                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
515
516             recd = 1;
517             msg = (char *) CmiAlloc(nbytes);
518             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
519             /* and repost the recv */
520
521             START_EVENT();
522
523             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
524                                            MPI_POST_RECV_SIZE,
525                                            MPI_BYTE,
526                                            MPI_ANY_SOURCE,
527                                            POST_RECV_TAG,
528                                            MPI_COMM_WORLD,
529                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
530                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
531
532             END_EVENT(50);
533
534             CpvAccess(Cmi_posted_recv_total)++;
535         } else {
536             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
537             if (res != MPI_SUCCESS)
538                 CmiAbort("MPI_Iprobe failed\n");
539             if (!flg) break;
540             recd = 1;
541             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
542             msg = (char *) CmiAlloc(nbytes);
543
544             START_EVENT();
545
546             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
547                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
548
549             END_EVENT(30);
550
551             CpvAccess(Cmi_unposted_recv_total)++;
552         }
553 #else
554         /* Original version */
555 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
556         double startT = CmiWallTimer();
557 #endif
558         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
559         if (res != MPI_SUCCESS)
560             CmiAbort("MPI_Iprobe failed\n");
561
562         if (!flg) break;
563 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
564         {
565             double endT = CmiWallTimer();
566             /* only trace the probe that last longer than 1ms */
567             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
568         }
569 #endif
570
571         recd = 1;
572         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
573         msg = (char *) CmiAlloc(nbytes);
574
575         START_EVENT();
576
577         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
578             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
579
580         /*END_EVENT(30);*/
581
582 #endif
583
584 #if CMK_SMP_TRACE_COMMTHREAD
585         traceBeginCommOp(msg);
586         traceChangeLastTimestamp(CpvAccess(projTraceStart));
587         traceEndCommOp(msg);
588 #if CMI_MPI_TRACE_MOREDETAILED
589         char tmp[32];
590         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
591         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
592 #endif
593 #elif CMK_TRACE_COMMOVERHEAD
594         char tmp[32];
595         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
596         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
597 #endif
598
599
600         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
601         CMI_CHECK_CHECKSUM(msg, nbytes);
602 #if CMK_ERROR_CHECKING
603         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
604             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
605             CmiFree(msg);
606             CmiAbort("Abort!\n");
607             continue;
608         }
609 #endif
610
611         handleOneRecvedMsg(nbytes, msg);
612
613 #if CMI_EXERT_RECV_CAP
614         recvCnt++;
615 #elif CMI_DYNAMIC_EXERT_CAP
616         recvCnt++;
617 #if CMK_SMP
618         /* check sendMsgBuf to get the  number of messages that have not been sent
619              * which is only available in SMP mode
620          * MsgQueueLen indicates the number of messages that have not been released
621              * by MPI
622              */
623         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
624                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
625             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
626         }
627 #else
628         /* MsgQueueLen indicates the number of messages that have not been released
629              * by MPI
630              */
631         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
632             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
633         }
634 #endif
635
636 #endif
637
638     }
639
640     MACHSTATE(2,"} PumpMsgs end ");
641     return recd;
642 }
643
644 /* blocking version */
645 static void PumpMsgsBlocking(void) {
646     static int maxbytes = 20000000;
647     static char *buf = NULL;
648     int nbytes, flg;
649     MPI_Status sts;
650     char *msg;
651     int recd=0;
652
653     if (!PCQueueEmpty(CmiGetState()->recv)) return;
654     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
655     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
656     if (CpvAccess(sent_msgs))  return;
657
658 #if 0
659     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
660 #endif
661
662     if (buf == NULL) {
663         buf = (char *) CmiAlloc(maxbytes);
664         _MEMCHECK(buf);
665     }
666
667
668 #if MPI_POST_RECV
669 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
670     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
671 #endif
672
673     START_EVENT();
674
675     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
676         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
677
678     /*END_EVENT(30);*/
679
680     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
681     msg = (char *) CmiAlloc(nbytes);
682     memcpy(msg, buf, nbytes);
683
684 #if CMK_SMP_TRACE_COMMTHREAD
685     traceBeginCommOp(msg);
686     traceChangeLastTimestamp(CpvAccess(projTraceStart));
687     traceEndCommOp(msg);
688 #if CMI_MPI_TRACE_MOREDETAILED
689     char tmp[32];
690     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
691     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
692 #endif
693 #endif
694
695     handleOneRecvedMsg(nbytes, msg);
696 }
697
698
699 #if CMK_SMP
700
701 /* called by communication thread in SMP */
702 static int SendMsgBuf() {
703     SMSG_LIST *msg_tmp;
704     char *msg;
705     int node, rank, size;
706     int i;
707     int sent = 0;
708
709 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
710     int sentCnt = 0;
711 #endif
712
713 #if CMI_DYNAMIC_EXERT_CAP
714     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
715 #endif
716
717     MACHSTATE(2,"SendMsgBuf begin {");
718 #if MULTI_SENDQUEUE
719     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
720         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
721             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
722 #else
723     /* single message sending queue */
724     /* CmiLock(sendMsgBufLock); */
725     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
726     /* CmiUnlock(sendMsgBufLock); */
727     while (NULL != msg_tmp) {
728 #endif
729             MPISendOneMsg(msg_tmp);
730             sent=1;
731
732 #if CMI_EXERT_SEND_CAP
733             if (++sentCnt == SEND_CAP) break;
734 #elif CMI_DYNAMIC_EXERT_CAP
735             if (++sentCnt >= dynamicSendCap) break;
736             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
737                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
738 #endif
739
740 #if ! MULTI_SENDQUEUE
741             /* CmiLock(sendMsgBufLock); */
742             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
743             /* CmiUnlock(sendMsgBufLock); */
744 #endif
745         }
746 #if MULTI_SENDQUEUE
747     }
748 #endif
749     MACHSTATE(2,"}SendMsgBuf end ");
750     return sent;
751 }
752
753 static int MsgQueueEmpty() {
754     int i;
755 #if MULTI_SENDQUEUE
756     for (i=0; i<_Cmi_mynodesize; i++)
757         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
758 #else
759     return PCQueueEmpty(sendMsgBuf);
760 #endif
761     return 1;
762 }
763
764 /* test if all processors recv queues are empty */
765 static int RecvQueueEmpty() {
766     int i;
767     for (i=0; i<_Cmi_mynodesize; i++) {
768         CmiState cs=CmiGetStateN(i);
769         if (!PCQueueEmpty(cs->recv)) return 0;
770     }
771     return 1;
772 }
773
774
775 #define REPORT_COMM_METRICS 0
776 #if REPORT_COMM_METRICS
777 static double pumptime = 0.0;
778                          static double releasetime = 0.0;
779                                                      static double sendtime = 0.0;
780 #endif
781
782 #endif //end of CMK_SMP
783
784 static void AdvanceCommunicationForMPI() {
785 #if REPORT_COMM_METRICS
786     double t1, t2, t3, t4;
787     t1 = CmiWallTimer();
788 #endif
789
790 #if CMK_SMP
791     PumpMsgs();
792
793 #if REPORT_COMM_METRICS
794     t2 = CmiWallTimer();
795 #endif
796
797     CmiReleaseSentMessages();
798 #if REPORT_COMM_METRICS
799     t3 = CmiWallTimer();
800 #endif
801
802     SendMsgBuf();
803
804 #if REPORT_COMM_METRICS
805     t4 = CmiWallTimer();
806     pumptime += (t2-t1);
807     releasetime += (t3-t2);
808     sendtime += (t4-t3);
809 #endif
810
811 #else /* non-SMP case */
812     CmiReleaseSentMessages();
813
814 #if REPORT_COMM_METRICS
815     t2 = CmiWallTimer();
816 #endif
817     PumpMsgs();
818
819 #if REPORT_COMM_METRICS
820     t3 = CmiWallTimer();
821     pumptime += (t3-t2);
822     releasetime += (t2-t1);
823 #endif
824
825 #endif /* end of #if CMK_SMP */
826 }
827 /* ######End of functions related with communication progress ###### */
828
829 static void MachinePostNonLocalForMPI() {
830 #if !CMK_SMP
831     if (no_outstanding_sends) {
832         while (MsgQueueLen>0) {
833             AdvanceCommunicationForMPI();
834         }
835     }
836
837     /* FIXME: I don't think the following codes are needed because
838      * it repeats the same job of the next call of CmiGetNonLocal
839      */
840 #if 0
841     if (!msg) {
842         CmiReleaseSentMessages();
843         if (PumpMsgs())
844             return  PCQueuePop(cs->recv);
845         else
846             return 0;
847     }
848 #endif
849 #else
850   if (_thread_provided == MPI_THREAD_MULTIPLE) {
851         CmiReleaseSentMessages();
852         SendMsgBuf();
853   }
854 #endif
855 }
856
857 /* Idle-state related functions: called in non-smp mode */
858 void CmiNotifyIdleForMPI(void) {
859     CmiReleaseSentMessages();
860     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
861 }
862
863 /* Network progress function is used to poll the network when for
864    messages. This flushes receive buffers on some  implementations*/
865 #if CMK_MACHINE_PROGRESS_DEFINED
866 void CmiMachineProgressImpl() {
867 #if !CMK_SMP
868     PumpMsgs();
869 #if CMK_IMMEDIATE_MSG
870     CmiHandleImmediate();
871 #endif
872 #else
873     /*Not implemented yet. Communication server does not seem to be
874       thread safe, so only communication thread call it */
875     if (CmiMyRank() == CmiMyNodeSize())
876         CommunicationServerThread(0);
877 #endif
878 }
879 #endif
880
881 /* ######Beginning of functions related with exiting programs###### */
882 void DrainResourcesForMPI() {
883 #if !CMK_SMP
884     while (!CmiAllAsyncMsgsSent()) {
885         PumpMsgs();
886         CmiReleaseSentMessages();
887     }
888 #else
889     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
890         CmiReleaseSentMessages();
891         SendMsgBuf();
892         PumpMsgs();
893     }
894 #endif
895     MACHSTATE(2, "Machine exit barrier begin {");
896     START_EVENT();
897     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
898         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
899     END_EVENT(10);
900     MACHSTATE(2, "} Machine exit barrier end");
901 }
902
903 void LrtsExit() {
904 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
905     int doPrint = 0;
906 #if CMK_SMP
907     if (CmiMyNode()==0) doPrint = 1;
908 #else
909     if (CmiMyPe()==0) doPrint = 1;
910 #endif
911
912     if (doPrint) {
913 #if MPI_POST_RECV
914         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
915 #endif
916     }
917 #endif
918
919 #if REPORT_COMM_METRICS
920 #if CMK_SMP
921     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
922               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
923               pumptime, releasetime, sendtime);
924 #else
925     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
926               CmiMyPe(), pumptime, releasetime, sendtime);
927 #endif
928 #endif
929
930 #if ! CMK_AUTOBUILD
931     signal(SIGINT, signal_int);
932     MPI_Finalize();
933 #endif
934     exit(0);
935 }
936
937 static int machine_exit_idx;
938 static void machine_exit(char *m) {
939     EmergencyExit();
940     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
941     fflush(stdout);
942     CmiNodeBarrier();
943     if (CmiMyRank() == 0) {
944         MPI_Barrier(MPI_COMM_WORLD);
945         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
946         MPI_Abort(MPI_COMM_WORLD, 1);
947     } else {
948         while (1) CmiYield();
949     }
950 }
951
952 static void KillOnAllSigs(int sigNo) {
953     static int already_in_signal_handler = 0;
954     char *m;
955     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
956     already_in_signal_handler = 1;
957 #if CMK_CCS_AVAILABLE
958     if (CpvAccess(cmiArgDebugFlag)) {
959         CpdNotify(CPD_SIGNAL, sigNo);
960         CpdFreeze();
961     }
962 #endif
963     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
964              "Signal: %d\n",CmiMyPe(),sigNo);
965     CmiPrintStackTrace(1);
966
967     m = CmiAlloc(CmiMsgHeaderSizeBytes);
968     CmiSetHandler(m, machine_exit_idx);
969     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
970     machine_exit(m);
971 }
972 /* ######End of functions related with exiting programs###### */
973
974
975 /* ######Beginning of functions related with starting programs###### */
976 static void registerMPITraceEvents() {
977 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
978     traceRegisterUserEvent("MPI_Barrier", 10);
979     traceRegisterUserEvent("MPI_Send", 20);
980     traceRegisterUserEvent("MPI_Recv", 30);
981     traceRegisterUserEvent("MPI_Isend", 40);
982     traceRegisterUserEvent("MPI_Irecv", 50);
983     traceRegisterUserEvent("MPI_Test", 60);
984     traceRegisterUserEvent("MPI_Iprobe", 70);
985 #endif
986 }
987
988 #if MACHINE_DEBUG_LOG
989 FILE *debugLog = NULL;
990 #endif
991
992 static char *thread_level_tostring(int thread_level) {
993 #if CMK_MPI_INIT_THREAD
994     switch (thread_level) {
995     case MPI_THREAD_SINGLE:
996         return "MPI_THREAD_SINGLE";
997     case MPI_THREAD_FUNNELED:
998         return "MPI_THREAD_FUNNELED";
999     case MPI_THREAD_SERIALIZED:
1000         return "MPI_THREAD_SERIALIZED";
1001     case MPI_THREAD_MULTIPLE :
1002         return "MPI_THREAD_MULTIPLE ";
1003     default: {
1004         char *str = (char*)malloc(5);
1005         sprintf(str,"%d", thread_level);
1006         return str;
1007     }
1008     }
1009     return  "unknown";
1010 #else
1011     char *str = (char*)malloc(5);
1012     sprintf(str,"%d", thread_level);
1013     return str;
1014 #endif
1015 }
1016
1017 /**
1018  *  Obtain the number of nodes, my node id, and consuming machine layer
1019  *  specific arguments
1020  */
1021 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1022     int n,i;
1023     int ver, subver;
1024     int provided;
1025     int thread_level;
1026     int myNID;
1027     int largc=*argc;
1028     char** largv=*argv;
1029
1030 #if MACHINE_DEBUG
1031     debugLog=NULL;
1032 #endif
1033 #if CMK_USE_HP_MAIN_FIX
1034 #if FOR_CPLUS
1035     _main(largc,largv);
1036 #endif
1037 #endif
1038
1039 #if CMK_MPI_INIT_THREAD
1040 #if CMK_SMP
1041     thread_level = MPI_THREAD_MULTIPLE;
1042 #else
1043     thread_level = MPI_THREAD_SINGLE;
1044 #endif
1045     MPI_Init_thread(argc, argv, thread_level, &provided);
1046     _thread_provided = provided;
1047 #else
1048     MPI_Init(argc, argv);
1049     thread_level = 0;
1050     provided = -1;
1051 #endif
1052     largc = *argc;
1053     largv = *argv;
1054     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1055     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1056
1057     myNID = *myNodeID;
1058
1059     MPI_Get_version(&ver, &subver);
1060     if (myNID == 0) {
1061         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1062     }
1063
1064     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1065     if (idleblock && _Cmi_mynode == 0) {
1066         printf("Charm++: Running in idle blocking mode.\n");
1067     }
1068
1069     /* setup signal handlers */
1070     signal(SIGSEGV, KillOnAllSigs);
1071     signal(SIGFPE, KillOnAllSigs);
1072     signal(SIGILL, KillOnAllSigs);
1073     signal_int = signal(SIGINT, KillOnAllSigs);
1074     signal(SIGTERM, KillOnAllSigs);
1075     signal(SIGABRT, KillOnAllSigs);
1076 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1077     signal(SIGQUIT, KillOnAllSigs);
1078     signal(SIGBUS, KillOnAllSigs);
1079 #   endif /*UNIX*/
1080
1081 #if CMK_NO_OUTSTANDING_SENDS
1082     no_outstanding_sends=1;
1083 #endif
1084     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1085         no_outstanding_sends = 1;
1086         if (myNID == 0)
1087             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1088                    no_outstanding_sends?"":" not");
1089     }
1090
1091     request_max=MAX_QLEN;
1092     CmiGetArgInt(largv,"+requestmax",&request_max);
1093     /*printf("request max=%d\n", request_max);*/
1094
1095 #if MPI_POST_RECV
1096     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1097     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1098     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1099     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1100     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1101     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1102     if (myNID==0) {
1103         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1104                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1105     }
1106 #endif
1107
1108 #if CMI_DYNAMIC_EXERT_CAP
1109     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1110     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1111     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1112     if (myNID==0) {
1113         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1114                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1115     }
1116 #endif
1117
1118     /* checksum flag */
1119     if (CmiGetArgFlag(largv,"+checksum")) {
1120 #if CMK_ERROR_CHECKING
1121         checksum_flag = 1;
1122         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1123 #else
1124         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1125 #endif
1126     }
1127
1128     {
1129         int debug = CmiGetArgFlag(largv,"++debug");
1130         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1131         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1132 #if CMK_HAS_GETPID
1133             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1134             fflush(stdout);
1135             if (!debug_no_pause)
1136                 sleep(15);
1137 #else
1138             printf("++debug ignored.\n");
1139 #endif
1140         }
1141     }
1142
1143     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1144     for (i=0; i<_Cmi_mynodesize+1; i++) {
1145 #if MULTI_SENDQUEUE
1146         procState[i].sendMsgBuf = PCQueueCreate();
1147 #endif
1148         procState[i].recvLock = CmiCreateLock();
1149     }
1150 #if CMK_SMP
1151 #if !MULTI_SENDQUEUE
1152     sendMsgBuf = PCQueueCreate();
1153     sendMsgBufLock = CmiCreateLock();
1154 #endif
1155 #endif
1156 }
1157
1158 static void MachinePreCommonInitForMPI(int everReturn) {
1159
1160 #if MPI_POST_RECV
1161     int doInit = 1;
1162     int i;
1163
1164 #if CMK_SMP
1165     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1166 #endif
1167
1168     /* Currently, in mpi smp, the main thread will be the comm thread, so
1169      *  only the comm thread should post recvs. Cpvs, however, need to be
1170      * created on rank 0 (the ptrs to the actual cpv memory), while
1171      * other ranks are busy waiting for this to finish. So cpv initialize
1172      * routines have to be called on every ranks, although they are only
1173      * useful on comm thread (whose rank is not zero) -Chao Mei
1174      */
1175     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1176     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1177     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1178     CpvInitialize(char*,CmiPostedRecvBuffers);
1179
1180     if (doInit) {
1181         /* Post some extra recvs to help out with incoming messages */
1182         /* On some MPIs the messages are unexpected and thus slow */
1183
1184         /* An array of request handles for posted recvs */
1185         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1186
1187         /* An array of buffers for posted recvs */
1188         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1189
1190         /* Post Recvs */
1191         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1192             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1193                                            MPI_POST_RECV_SIZE,
1194                                            MPI_BYTE,
1195                                            MPI_ANY_SOURCE,
1196                                            POST_RECV_TAG,
1197                                            MPI_COMM_WORLD,
1198                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1199                 CmiAbort("MPI_Irecv failed\n");
1200         }
1201     }
1202 #endif
1203
1204 }
1205
1206 static void MachinePostCommonInitForMPI(int everReturn) {
1207
1208     CpvInitialize(SMSG_LIST *, sent_msgs);
1209     CpvInitialize(SMSG_LIST *, end_sent);
1210     CpvInitialize(int, MsgQueueLen);
1211     CpvAccess(sent_msgs) = NULL;
1212     CpvAccess(end_sent) = NULL;
1213     CpvAccess(MsgQueueLen) = 0;
1214
1215     CmiIdleState *s=CmiNotifyGetState();
1216     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1217
1218 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1219     CpvInitialize(double, projTraceStart);
1220     /* only PE 0 needs to care about registration (to generate sts file). */
1221     if (CmiMyPe() == 0) {
1222         registerMachineUserEventsFunction(&registerMPITraceEvents);
1223     }
1224 #endif
1225
1226 #if CMK_SMP
1227     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1228     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1229     if (_thread_provided == MPI_THREAD_MULTIPLE)
1230       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1231 #else
1232     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1233 #endif
1234
1235 #if MACHINE_DEBUG_LOG
1236     if (CmiMyRank() == 0) {
1237         char ln[200];
1238         sprintf(ln,"debugLog.%d",CmiMyNode());
1239         debugLog=fopen(ln,"w");
1240     }
1241 #endif
1242 }
1243 /* ######End of functions related with starting programs###### */
1244
1245 /***********************************************************************
1246  *
1247  * Abort function:
1248  *
1249  ************************************************************************/
1250
1251 void CmiAbort(const char *message) {
1252     char *m;
1253     /* if CharmDebug is attached simply try to send a message to it */
1254 #if CMK_CCS_AVAILABLE
1255     if (CpvAccess(cmiArgDebugFlag)) {
1256         CpdNotify(CPD_ABORT, message);
1257         CpdFreeze();
1258     }
1259 #endif
1260     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1261              "Reason: %s\n",CmiMyPe(),message);
1262     /*  CmiError(message); */
1263     CmiPrintStackTrace(0);
1264     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1265     CmiSetHandler(m, machine_exit_idx);
1266     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1267     machine_exit(m);
1268     /* Program never reaches here */
1269     MPI_Abort(MPI_COMM_WORLD, 1);
1270 }
1271
1272 /**************************  TIMER FUNCTIONS **************************/
1273 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1274
1275 /* MPI calls are not threadsafe, even the timer on some machines */
1276 static CmiNodeLock  timerLock = 0;
1277                                 static int _absoluteTime = 0;
1278                                                            static double starttimer = 0;
1279                                                                                       static int _is_global = 0;
1280
1281 int CmiTimerIsSynchronized() {
1282     int  flag;
1283     void *v;
1284
1285     /*  check if it using synchronized timer */
1286     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1287         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1288     if (flag) {
1289         _is_global = *(int*)v;
1290         if (_is_global && CmiMyPe() == 0)
1291             printf("Charm++> MPI timer is synchronized\n");
1292     }
1293     return _is_global;
1294 }
1295
1296 int CmiTimerAbsolute() {
1297     return _absoluteTime;
1298 }
1299
1300 double CmiStartTimer() {
1301     return 0.0;
1302 }
1303
1304 double CmiInitTime() {
1305     return starttimer;
1306 }
1307
1308 void CmiTimerInit(char **argv) {
1309     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1310     if (_absoluteTime && CmiMyPe() == 0)
1311         printf("Charm++> absolute MPI timer is used\n");
1312
1313     _is_global = CmiTimerIsSynchronized();
1314
1315     if (_is_global) {
1316         if (CmiMyRank() == 0) {
1317             double minTimer;
1318 #if CMK_TIMER_USE_XT3_DCLOCK
1319             starttimer = dclock();
1320 #else
1321             starttimer = MPI_Wtime();
1322 #endif
1323
1324             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1325                           MPI_COMM_WORLD );
1326             starttimer = minTimer;
1327         }
1328     } else { /* we don't have a synchronous timer, set our own start time */
1329         CmiBarrier();
1330         CmiBarrier();
1331         CmiBarrier();
1332 #if CMK_TIMER_USE_XT3_DCLOCK
1333         starttimer = dclock();
1334 #else
1335         starttimer = MPI_Wtime();
1336 #endif
1337     }
1338
1339 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1340     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1341         timerLock = CmiCreateLock();
1342 #endif
1343     CmiNodeAllBarrier();          /* for smp */
1344 }
1345
1346 /**
1347  * Since the timerLock is never created, and is
1348  * always NULL, then all the if-condition inside
1349  * the timer functions could be disabled right
1350  * now in the case of SMP. --Chao Mei
1351  */
1352 double CmiTimer(void) {
1353     double t;
1354 #if 0 && CMK_SMP
1355     if (timerLock) CmiLock(timerLock);
1356 #endif
1357
1358 #if CMK_TIMER_USE_XT3_DCLOCK
1359     t = dclock();
1360 #else
1361     t = MPI_Wtime();
1362 #endif
1363
1364 #if 0 && CMK_SMP
1365     if (timerLock) CmiUnlock(timerLock);
1366 #endif
1367
1368     return _absoluteTime?t: (t-starttimer);
1369 }
1370
1371 double CmiWallTimer(void) {
1372     double t;
1373 #if 0 && CMK_SMP
1374     if (timerLock) CmiLock(timerLock);
1375 #endif
1376
1377 #if CMK_TIMER_USE_XT3_DCLOCK
1378     t = dclock();
1379 #else
1380     t = MPI_Wtime();
1381 #endif
1382
1383 #if 0 && CMK_SMP
1384     if (timerLock) CmiUnlock(timerLock);
1385 #endif
1386
1387     return _absoluteTime? t: (t-starttimer);
1388 }
1389
1390 double CmiCpuTimer(void) {
1391     double t;
1392 #if 0 && CMK_SMP
1393     if (timerLock) CmiLock(timerLock);
1394 #endif
1395 #if CMK_TIMER_USE_XT3_DCLOCK
1396     t = dclock() - starttimer;
1397 #else
1398     t = MPI_Wtime() - starttimer;
1399 #endif
1400 #if 0 && CMK_SMP
1401     if (timerLock) CmiUnlock(timerLock);
1402 #endif
1403     return t;
1404 }
1405
1406 #endif
1407
1408 /************Barrier Related Functions****************/
1409 /* must be called on all ranks including comm thread in SMP */
1410 int CmiBarrier() {
1411 #if CMK_SMP
1412     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1413     CmiNodeAllBarrier();
1414     if (CmiMyRank() == CmiMyNodeSize())
1415 #else
1416     if (CmiMyRank() == 0)
1417 #endif
1418     {
1419         /**
1420          *  The call of CmiBarrier is usually before the initialization
1421          *  of trace module of Charm++, therefore, the START_EVENT
1422          *  and END_EVENT are disabled here. -Chao Mei
1423          */
1424         /*START_EVENT();*/
1425
1426         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1427             CmiAbort("Timernit: MPI_Barrier failed!\n");
1428
1429         /*END_EVENT(10);*/
1430     }
1431     CmiNodeAllBarrier();
1432     return 0;
1433 }
1434
1435 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1436 int CmiBarrierZero() {
1437     int i;
1438 #if CMK_SMP
1439     if (CmiMyRank() == CmiMyNodeSize())
1440 #else
1441     if (CmiMyRank() == 0)
1442 #endif
1443     {
1444         char msg[1];
1445         MPI_Status sts;
1446         if (CmiMyNode() == 0)  {
1447             for (i=0; i<CmiNumNodes()-1; i++) {
1448                 START_EVENT();
1449
1450                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1451                     CmiPrintf("MPI_Recv failed!\n");
1452
1453                 END_EVENT(30);
1454             }
1455         } else {
1456             START_EVENT();
1457
1458             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1459                 printf("MPI_Send failed!\n");
1460
1461             END_EVENT(20);
1462         }
1463     }
1464     CmiNodeAllBarrier();
1465     return 0;
1466 }
1467
1468 /*@}*/
1469