minor change about a print
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310
311     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
312 #if CMK_ERROR_CHECKING
313     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
314     CMI_SET_CHECKSUM(msg, size);
315 #endif
316
317 #if MPI_POST_RECV
318     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
319         START_EVENT();
320         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
321             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
322         /*END_EVENT(40);*/
323     } else {
324         START_EVENT();
325         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
326             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
327         /*END_EVENT(40);*/
328     }
329 #else
330     START_EVENT();
331 #if CMK_MEM_CHECKPOINT
332     int dstrank = petorank[node];
333 #else
334     int dstrank = node;
335 #endif
336     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
337         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
338     /*END_EVENT(40);*/
339 #endif
340
341 #if CMK_SMP_TRACE_COMMTHREAD
342     traceBeginCommOp(msg);
343     traceChangeLastTimestamp(CpvAccess(projTraceStart));
344     /* traceSendMsgComm must execute after traceBeginCommOp because
345          * we pretend we execute an entry method, and inside this we
346          * pretend we will send another message. Otherwise how could
347          * a message creation just before an entry method invocation?
348          * If such logic is broken, the projections will not trace
349          * messages correctly! -Chao Mei
350          */
351     traceSendMsgComm(msg);
352     traceEndCommOp(msg);
353 #if CMI_MPI_TRACE_MOREDETAILED
354     char tmp[64];
355     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
356     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
357 #endif
358 #endif
359
360     MACHSTATE(3,"}MPI_send end");
361     CpvAccess(MsgQueueLen)++;
362     if (CpvAccess(sent_msgs)==0)
363         CpvAccess(sent_msgs) = smsg;
364     else
365         CpvAccess(end_sent)->next = smsg;
366     CpvAccess(end_sent) = smsg;
367
368 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
369     if (mode == P2P_SYNC || mode == P2P_ASYNC)
370     {
371     while (CpvAccess(MsgQueueLen) > request_max) {
372         CmiReleaseSentMessages();
373         PumpMsgs();
374     }
375     }
376 #endif
377
378     return (CmiCommHandle) &(smsg->req);
379 }
380
381 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
382     /* Ignoring the mode for MPI layer */
383
384     CmiState cs = CmiGetState();
385     SMSG_LIST *msg_tmp;
386     int  rank;
387
388     CmiAssert(destNode != CmiMyNode());
389 #if CMK_SMP
390     if (_thread_provided != MPI_THREAD_MULTIPLE) {
391       EnqueueMsg(msg, size, destNode, mode);
392       return 0;
393     }
394 #endif
395     /* non smp */
396     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
397     msg_tmp->msg = msg;
398     msg_tmp->destpe = destNode;
399     msg_tmp->size = size;
400     msg_tmp->next = 0;
401     msg_tmp->mode = mode;
402     return MPISendOneMsg(msg_tmp);
403 }
404
405 static size_t CmiAllAsyncMsgsSent(void) {
406     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
407     MPI_Status sts;
408     int done;
409
410     while (msg_tmp!=0) {
411         done = 0;
412         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
413             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
414         if (!done)
415             return 0;
416         msg_tmp = msg_tmp->next;
417         /*    MsgQueueLen--; ????? */
418     }
419     return 1;
420 }
421
422 int CmiAsyncMsgSent(CmiCommHandle c) {
423
424     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
425     int done;
426     MPI_Status sts;
427
428     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
429         msg_tmp = msg_tmp->next;
430     if (msg_tmp) {
431         done = 0;
432         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
433             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
434         return ((done)?1:0);
435     } else {
436         return 1;
437     }
438 }
439
440 void CmiReleaseCommHandle(CmiCommHandle c) {
441     return;
442 }
443
444 /* ######Beginning of functions related with communication progress ###### */
445 static void CmiReleaseSentMessages(void) {
446     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
447     SMSG_LIST *prev=0;
448     SMSG_LIST *temp;
449     int done;
450     MPI_Status sts;
451
452 #if CMK_BLUEGENEL
453     MPID_Progress_test();
454 #endif
455
456     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
457     while (msg_tmp!=0) {
458         done =0;
459 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
460         double startT = CmiWallTimer();
461 #endif
462         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
463             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
464         if (done) {
465             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
466             CpvAccess(MsgQueueLen)--;
467             /* Release the message */
468             temp = msg_tmp->next;
469             if (prev==0) /* first message */
470                 CpvAccess(sent_msgs) = temp;
471             else
472                 prev->next = temp;
473             CmiFree(msg_tmp->msg);
474             CmiFree(msg_tmp);
475             msg_tmp = temp;
476         } else {
477             prev = msg_tmp;
478             msg_tmp = msg_tmp->next;
479         }
480 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
481         {
482             double endT = CmiWallTimer();
483             /* only record the event if it takes more than 1ms */
484             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
485         }
486 #endif
487     }
488     CpvAccess(end_sent) = prev;
489     MACHSTATE(2,"} CmiReleaseSentMessages end");
490 }
491
492 static int PumpMsgs(void) {
493     int nbytes, flg, res;
494     char *msg;
495     MPI_Status sts;
496     int recd=0;
497
498 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
499     int recvCnt=0;
500 #endif
501
502 #if CMK_BLUEGENEL
503     MPID_Progress_test();
504 #endif
505
506     MACHSTATE(2,"PumpMsgs begin {");
507
508 #if CMI_DYNAMIC_EXERT_CAP
509     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
510 #endif
511
512     while (1) {
513 #if CMI_EXERT_RECV_CAP
514         if (recvCnt==RECV_CAP) break;
515 #elif CMI_DYNAMIC_EXERT_CAP
516         if (recvCnt >= dynamicRecvCap) break;
517 #endif
518
519         /* First check posted recvs then do  probe unmatched outstanding messages */
520 #if MPI_POST_RECV
521         int completed_index=-1;
522         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
523             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
524         if (flg) {
525             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
526                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
527
528             recd = 1;
529             msg = (char *) CmiAlloc(nbytes);
530             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
531             /* and repost the recv */
532
533             START_EVENT();
534
535             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
536                                            MPI_POST_RECV_SIZE,
537                                            MPI_BYTE,
538                                            MPI_ANY_SOURCE,
539                                            POST_RECV_TAG,
540                                            MPI_COMM_WORLD,
541                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
542                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
543
544             END_EVENT(50);
545
546             CpvAccess(Cmi_posted_recv_total)++;
547         } else {
548             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
549             if (res != MPI_SUCCESS)
550                 CmiAbort("MPI_Iprobe failed\n");
551             if (!flg) break;
552             recd = 1;
553             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
554             msg = (char *) CmiAlloc(nbytes);
555
556             START_EVENT();
557
558             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
559                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
560
561             END_EVENT(30);
562
563             CpvAccess(Cmi_unposted_recv_total)++;
564         }
565 #else
566         /* Original version */
567 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
568         double startT = CmiWallTimer();
569 #endif
570         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
571         if (res != MPI_SUCCESS)
572             CmiAbort("MPI_Iprobe failed\n");
573
574         if (!flg) break;
575 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
576         {
577             double endT = CmiWallTimer();
578             /* only trace the probe that last longer than 1ms */
579             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
580         }
581 #endif
582
583         recd = 1;
584         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
585         msg = (char *) CmiAlloc(nbytes);
586
587         START_EVENT();
588
589         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
590             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
591
592         /*END_EVENT(30);*/
593
594 #endif
595
596 #if CMK_SMP_TRACE_COMMTHREAD
597         traceBeginCommOp(msg);
598         traceChangeLastTimestamp(CpvAccess(projTraceStart));
599         traceEndCommOp(msg);
600 #if CMI_MPI_TRACE_MOREDETAILED
601         char tmp[32];
602         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
603         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
604 #endif
605 #elif CMK_TRACE_COMMOVERHEAD
606         char tmp[32];
607         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
608         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
609 #endif
610
611
612         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
613         CMI_CHECK_CHECKSUM(msg, nbytes);
614 #if CMK_ERROR_CHECKING
615         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
616             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
617             CmiFree(msg);
618             CmiAbort("Abort!\n");
619             continue;
620         }
621 #endif
622
623         handleOneRecvedMsg(nbytes, msg);
624
625 #if CMI_EXERT_RECV_CAP
626         recvCnt++;
627 #elif CMI_DYNAMIC_EXERT_CAP
628         recvCnt++;
629 #if CMK_SMP
630         /* check sendMsgBuf to get the  number of messages that have not been sent
631              * which is only available in SMP mode
632          * MsgQueueLen indicates the number of messages that have not been released
633              * by MPI
634              */
635         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
636                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
637             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
638         }
639 #else
640         /* MsgQueueLen indicates the number of messages that have not been released
641              * by MPI
642              */
643         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
644             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
645         }
646 #endif
647
648 #endif
649
650     }
651
652     MACHSTATE(2,"} PumpMsgs end ");
653     return recd;
654 }
655
656 /* blocking version */
657 static void PumpMsgsBlocking(void) {
658     static int maxbytes = 20000000;
659     static char *buf = NULL;
660     int nbytes, flg;
661     MPI_Status sts;
662     char *msg;
663     int recd=0;
664
665     if (!PCQueueEmpty(CmiGetState()->recv)) return;
666     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
667     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
668     if (CpvAccess(sent_msgs))  return;
669
670 #if 0
671     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
672 #endif
673
674     if (buf == NULL) {
675         buf = (char *) CmiAlloc(maxbytes);
676         _MEMCHECK(buf);
677     }
678
679
680 #if MPI_POST_RECV
681 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
682     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
683 #endif
684
685     START_EVENT();
686
687     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
688         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
689
690     /*END_EVENT(30);*/
691
692     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
693     msg = (char *) CmiAlloc(nbytes);
694     memcpy(msg, buf, nbytes);
695
696 #if CMK_SMP_TRACE_COMMTHREAD
697     traceBeginCommOp(msg);
698     traceChangeLastTimestamp(CpvAccess(projTraceStart));
699     traceEndCommOp(msg);
700 #if CMI_MPI_TRACE_MOREDETAILED
701     char tmp[32];
702     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
703     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
704 #endif
705 #endif
706
707     handleOneRecvedMsg(nbytes, msg);
708 }
709
710
711 #if CMK_SMP
712
713 /* called by communication thread in SMP */
714 static int SendMsgBuf() {
715     SMSG_LIST *msg_tmp;
716     char *msg;
717     int node, rank, size;
718     int i;
719     int sent = 0;
720
721 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
722     int sentCnt = 0;
723 #endif
724
725 #if CMI_DYNAMIC_EXERT_CAP
726     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
727 #endif
728
729     MACHSTATE(2,"SendMsgBuf begin {");
730 #if MULTI_SENDQUEUE
731     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
732         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
733             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
734 #else
735     /* single message sending queue */
736     /* CmiLock(sendMsgBufLock); */
737     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
738     /* CmiUnlock(sendMsgBufLock); */
739     while (NULL != msg_tmp) {
740 #endif
741             MPISendOneMsg(msg_tmp);
742             sent=1;
743
744 #if CMI_EXERT_SEND_CAP
745             if (++sentCnt == SEND_CAP) break;
746 #elif CMI_DYNAMIC_EXERT_CAP
747             if (++sentCnt >= dynamicSendCap) break;
748             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
749                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
750 #endif
751
752 #if ! MULTI_SENDQUEUE
753             /* CmiLock(sendMsgBufLock); */
754             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
755             /* CmiUnlock(sendMsgBufLock); */
756 #endif
757         }
758 #if MULTI_SENDQUEUE
759     }
760 #endif
761     MACHSTATE(2,"}SendMsgBuf end ");
762     return sent;
763 }
764
765 static int MsgQueueEmpty() {
766     int i;
767 #if MULTI_SENDQUEUE
768     for (i=0; i<_Cmi_mynodesize; i++)
769         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
770 #else
771     return PCQueueEmpty(sendMsgBuf);
772 #endif
773     return 1;
774 }
775
776 /* test if all processors recv queues are empty */
777 static int RecvQueueEmpty() {
778     int i;
779     for (i=0; i<_Cmi_mynodesize; i++) {
780         CmiState cs=CmiGetStateN(i);
781         if (!PCQueueEmpty(cs->recv)) return 0;
782     }
783     return 1;
784 }
785
786
787 #define REPORT_COMM_METRICS 0
788 #if REPORT_COMM_METRICS
789 static double pumptime = 0.0;
790                          static double releasetime = 0.0;
791                                                      static double sendtime = 0.0;
792 #endif
793
794 #endif //end of CMK_SMP
795
796 static void AdvanceCommunicationForMPI() {
797 #if REPORT_COMM_METRICS
798     double t1, t2, t3, t4;
799     t1 = CmiWallTimer();
800 #endif
801
802 #if CMK_SMP
803     PumpMsgs();
804
805 #if REPORT_COMM_METRICS
806     t2 = CmiWallTimer();
807 #endif
808
809     CmiReleaseSentMessages();
810 #if REPORT_COMM_METRICS
811     t3 = CmiWallTimer();
812 #endif
813
814     SendMsgBuf();
815
816 #if REPORT_COMM_METRICS
817     t4 = CmiWallTimer();
818     pumptime += (t2-t1);
819     releasetime += (t3-t2);
820     sendtime += (t4-t3);
821 #endif
822
823 #else /* non-SMP case */
824     CmiReleaseSentMessages();
825
826 #if REPORT_COMM_METRICS
827     t2 = CmiWallTimer();
828 #endif
829     PumpMsgs();
830
831 #if REPORT_COMM_METRICS
832     t3 = CmiWallTimer();
833     pumptime += (t3-t2);
834     releasetime += (t2-t1);
835 #endif
836
837 #endif /* end of #if CMK_SMP */
838 }
839 /* ######End of functions related with communication progress ###### */
840
841 static void MachinePostNonLocalForMPI() {
842 #if !CMK_SMP
843     if (no_outstanding_sends) {
844         while (CpvAccess(MsgQueueLen)>0) {
845             AdvanceCommunicationForMPI();
846         }
847     }
848
849     /* FIXME: I don't think the following codes are needed because
850      * it repeats the same job of the next call of CmiGetNonLocal
851      */
852 #if 0
853     if (!msg) {
854         CmiReleaseSentMessages();
855         if (PumpMsgs())
856             return  PCQueuePop(cs->recv);
857         else
858             return 0;
859     }
860 #endif
861 #else
862   if (_thread_provided == MPI_THREAD_MULTIPLE) {
863         CmiReleaseSentMessages();
864         SendMsgBuf();
865   }
866 #endif
867 }
868
869 /* Idle-state related functions: called in non-smp mode */
870 void CmiNotifyIdleForMPI(void) {
871     CmiReleaseSentMessages();
872     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
873 }
874
875 /* Network progress function is used to poll the network when for
876    messages. This flushes receive buffers on some  implementations*/
877 #if CMK_MACHINE_PROGRESS_DEFINED
878 void CmiMachineProgressImpl() {
879 #if !CMK_SMP
880     PumpMsgs();
881 #if CMK_IMMEDIATE_MSG
882     CmiHandleImmediate();
883 #endif
884 #else
885     /*Not implemented yet. Communication server does not seem to be
886       thread safe, so only communication thread call it */
887     if (CmiMyRank() == CmiMyNodeSize())
888         CommunicationServerThread(0);
889 #endif
890 }
891 #endif
892
893 /* ######Beginning of functions related with exiting programs###### */
894 void DrainResourcesForMPI() {
895 #if !CMK_SMP
896     while (!CmiAllAsyncMsgsSent()) {
897         PumpMsgs();
898         CmiReleaseSentMessages();
899     }
900 #else
901     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
902         CmiReleaseSentMessages();
903         SendMsgBuf();
904         PumpMsgs();
905     }
906 #endif
907 #if CMK_MEM_CHECKPOINT
908     if (CmiMyPe() == 0) mpi_end_spare();
909 #endif
910     MACHSTATE(2, "Machine exit barrier begin {");
911     START_EVENT();
912     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
913         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
914     END_EVENT(10);
915     MACHSTATE(2, "} Machine exit barrier end");
916 }
917
918 void LrtsExit() {
919 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
920     int doPrint = 0;
921 #if CMK_SMP
922     if (CmiMyNode()==0) doPrint = 1;
923 #else
924     if (CmiMyPe()==0) doPrint = 1;
925 #endif
926
927     if (doPrint) {
928 #if MPI_POST_RECV
929         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
930 #endif
931     }
932 #endif
933
934 #if REPORT_COMM_METRICS
935 #if CMK_SMP
936     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
937               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
938               pumptime, releasetime, sendtime);
939 #else
940     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
941               CmiMyPe(), pumptime, releasetime, sendtime);
942 #endif
943 #endif
944
945 #if ! CMK_AUTOBUILD
946     signal(SIGINT, signal_int);
947     MPI_Finalize();
948 #endif
949     exit(0);
950 }
951
952 static int machine_exit_idx;
953 static void machine_exit(char *m) {
954     EmergencyExit();
955     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
956     fflush(stdout);
957     CmiNodeBarrier();
958     if (CmiMyRank() == 0) {
959         MPI_Barrier(MPI_COMM_WORLD);
960         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
961         MPI_Abort(MPI_COMM_WORLD, 1);
962     } else {
963         while (1) CmiYield();
964     }
965 }
966
967 static void KillOnAllSigs(int sigNo) {
968     static int already_in_signal_handler = 0;
969     char *m;
970     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
971     already_in_signal_handler = 1;
972 #if CMK_CCS_AVAILABLE
973     if (CpvAccess(cmiArgDebugFlag)) {
974         CpdNotify(CPD_SIGNAL, sigNo);
975         CpdFreeze();
976     }
977 #endif
978     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
979              "Signal: %d\n",CmiMyPe(),sigNo);
980     CmiPrintStackTrace(1);
981
982     m = CmiAlloc(CmiMsgHeaderSizeBytes);
983     CmiSetHandler(m, machine_exit_idx);
984     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
985     machine_exit(m);
986 }
987 /* ######End of functions related with exiting programs###### */
988
989
990 /* ######Beginning of functions related with starting programs###### */
991 static void registerMPITraceEvents() {
992 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
993     traceRegisterUserEvent("MPI_Barrier", 10);
994     traceRegisterUserEvent("MPI_Send", 20);
995     traceRegisterUserEvent("MPI_Recv", 30);
996     traceRegisterUserEvent("MPI_Isend", 40);
997     traceRegisterUserEvent("MPI_Irecv", 50);
998     traceRegisterUserEvent("MPI_Test", 60);
999     traceRegisterUserEvent("MPI_Iprobe", 70);
1000 #endif
1001 }
1002
1003 #if MACHINE_DEBUG_LOG
1004 FILE *debugLog = NULL;
1005 #endif
1006
1007 static char *thread_level_tostring(int thread_level) {
1008 #if CMK_MPI_INIT_THREAD
1009     switch (thread_level) {
1010     case MPI_THREAD_SINGLE:
1011         return "MPI_THREAD_SINGLE";
1012     case MPI_THREAD_FUNNELED:
1013         return "MPI_THREAD_FUNNELED";
1014     case MPI_THREAD_SERIALIZED:
1015         return "MPI_THREAD_SERIALIZED";
1016     case MPI_THREAD_MULTIPLE :
1017         return "MPI_THREAD_MULTIPLE ";
1018     default: {
1019         char *str = (char*)malloc(5);
1020         sprintf(str,"%d", thread_level);
1021         return str;
1022     }
1023     }
1024     return  "unknown";
1025 #else
1026     char *str = (char*)malloc(5);
1027     sprintf(str,"%d", thread_level);
1028     return str;
1029 #endif
1030 }
1031
1032 /**
1033  *  Obtain the number of nodes, my node id, and consuming machine layer
1034  *  specific arguments
1035  */
1036 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1037     int n,i;
1038     int ver, subver;
1039     int provided;
1040     int thread_level;
1041     int myNID;
1042     int largc=*argc;
1043     char** largv=*argv;
1044
1045 #if MACHINE_DEBUG
1046     debugLog=NULL;
1047 #endif
1048 #if CMK_USE_HP_MAIN_FIX
1049 #if FOR_CPLUS
1050     _main(largc,largv);
1051 #endif
1052 #endif
1053
1054 #if CMK_MPI_INIT_THREAD
1055 #if CMK_SMP
1056     thread_level = MPI_THREAD_MULTIPLE;
1057 #else
1058     thread_level = MPI_THREAD_SINGLE;
1059 #endif
1060     MPI_Init_thread(argc, argv, thread_level, &provided);
1061     _thread_provided = provided;
1062 #else
1063     MPI_Init(argc, argv);
1064     thread_level = 0;
1065     provided = -1;
1066 #endif
1067     largc = *argc;
1068     largv = *argv;
1069     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1070     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1071
1072     myNID = *myNodeID;
1073
1074     MPI_Get_version(&ver, &subver);
1075     if (myNID == 0) {
1076         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1077     }
1078
1079     {
1080         int debug = CmiGetArgFlag(largv,"++debug");
1081         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1082         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1083 #if CMK_HAS_GETPID
1084             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1085             fflush(stdout);
1086             if (!debug_no_pause)
1087                 sleep(15);
1088 #else
1089             printf("++debug ignored.\n");
1090 #endif
1091         }
1092     }
1093
1094
1095 #if CMK_MEM_CHECKPOINT
1096     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1097        CmiAssert(num_workpes <= *numNodes);
1098        total_pes = *numNodes;
1099        *numNodes = num_workpes;
1100     }
1101     else
1102        total_pes = num_workpes = *numNodes;
1103     if (*myNodeID == 0)
1104        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1105     petorank = (int *)malloc(sizeof(int) * num_workpes);
1106     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1107     nextrank = num_workpes;
1108
1109     if (*myNodeID >= num_workpes) {    /* is spare processor */
1110       MPI_Status sts;
1111       int vals[2];
1112       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1113       int newpe = vals[0];
1114       CpvAccess(_curRestartPhase) = vals[1];
1115
1116       if (newpe == -1) {
1117           MPI_Barrier(MPI_COMM_WORLD);
1118           MPI_Finalize();
1119           exit(0);
1120       }
1121
1122       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1123         /* update petorank */
1124       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1125       nextrank = *myNodeID + 1;
1126       *myNodeID = newpe;
1127       myNID = newpe;
1128
1129        /* add +restartaftercrash to argv */
1130       char *phase_str;
1131       char **restart_argv;
1132       int i=0;
1133       while(largv[i]!= NULL) i++;
1134       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1135       i=0;
1136       while(largv[i]!= NULL){
1137                 restart_argv[i] = largv[i];
1138                 i++;
1139       }
1140       restart_argv[i] = "+restartaftercrash";
1141       phase_str = (char*)malloc(10);
1142       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1143       restart_argv[i+1]=phase_str;
1144       restart_argv[i+2]=NULL;
1145       *argv = restart_argv;
1146       *argc = i+2;
1147       largc = *argc;
1148       largv = *argv;
1149     }
1150 #endif
1151
1152     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1153     if (idleblock && _Cmi_mynode == 0) {
1154         printf("Charm++: Running in idle blocking mode.\n");
1155     }
1156
1157 #if CMK_CHARMDEBUG
1158     /* setup signal handlers */
1159     signal(SIGSEGV, KillOnAllSigs);
1160     signal(SIGFPE, KillOnAllSigs);
1161     signal(SIGILL, KillOnAllSigs);
1162     signal_int = signal(SIGINT, KillOnAllSigs);
1163     signal(SIGTERM, KillOnAllSigs);
1164     signal(SIGABRT, KillOnAllSigs);
1165 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1166     signal(SIGQUIT, KillOnAllSigs);
1167     signal(SIGBUS, KillOnAllSigs);
1168 #   endif /*UNIX*/
1169 #endif
1170
1171 #if CMK_NO_OUTSTANDING_SENDS
1172     no_outstanding_sends=1;
1173 #endif
1174     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1175         no_outstanding_sends = 1;
1176         if (myNID == 0)
1177             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1178                    no_outstanding_sends?"":" not");
1179     }
1180
1181     request_max=MAX_QLEN;
1182     CmiGetArgInt(largv,"+requestmax",&request_max);
1183     /*printf("request max=%d\n", request_max);*/
1184
1185 #if MPI_POST_RECV
1186     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1187     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1188     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1189     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1190     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1191     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1192     if (myNID==0) {
1193         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1194                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1195     }
1196 #endif
1197
1198 #if CMI_DYNAMIC_EXERT_CAP
1199     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1200     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1201     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1202     if (myNID==0) {
1203         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1204                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1205     }
1206 #endif
1207
1208     /* checksum flag */
1209     if (CmiGetArgFlag(largv,"+checksum")) {
1210 #if CMK_ERROR_CHECKING
1211         checksum_flag = 1;
1212         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1213 #else
1214         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1215 #endif
1216     }
1217
1218     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1219     for (i=0; i<_Cmi_mynodesize+1; i++) {
1220 #if MULTI_SENDQUEUE
1221         procState[i].sendMsgBuf = PCQueueCreate();
1222 #endif
1223         procState[i].recvLock = CmiCreateLock();
1224     }
1225 #if CMK_SMP
1226 #if !MULTI_SENDQUEUE
1227     sendMsgBuf = PCQueueCreate();
1228     sendMsgBufLock = CmiCreateLock();
1229 #endif
1230 #endif
1231 }
1232
1233 static void MachinePreCommonInitForMPI(int everReturn) {
1234
1235 #if MPI_POST_RECV
1236     int doInit = 1;
1237     int i;
1238
1239 #if CMK_SMP
1240     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1241 #endif
1242
1243     /* Currently, in mpi smp, the main thread will be the comm thread, so
1244      *  only the comm thread should post recvs. Cpvs, however, need to be
1245      * created on rank 0 (the ptrs to the actual cpv memory), while
1246      * other ranks are busy waiting for this to finish. So cpv initialize
1247      * routines have to be called on every ranks, although they are only
1248      * useful on comm thread (whose rank is not zero) -Chao Mei
1249      */
1250     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1251     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1252     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1253     CpvInitialize(char*,CmiPostedRecvBuffers);
1254
1255     if (doInit) {
1256         /* Post some extra recvs to help out with incoming messages */
1257         /* On some MPIs the messages are unexpected and thus slow */
1258
1259         /* An array of request handles for posted recvs */
1260         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1261
1262         /* An array of buffers for posted recvs */
1263         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1264
1265         /* Post Recvs */
1266         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1267             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1268                                            MPI_POST_RECV_SIZE,
1269                                            MPI_BYTE,
1270                                            MPI_ANY_SOURCE,
1271                                            POST_RECV_TAG,
1272                                            MPI_COMM_WORLD,
1273                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1274                 CmiAbort("MPI_Irecv failed\n");
1275         }
1276     }
1277 #endif
1278
1279 }
1280
1281 static void MachinePostCommonInitForMPI(int everReturn) {
1282
1283     CmiIdleState *s=CmiNotifyGetState();
1284
1285     CpvInitialize(SMSG_LIST *, sent_msgs);
1286     CpvInitialize(SMSG_LIST *, end_sent);
1287     CpvInitialize(int, MsgQueueLen);
1288     CpvAccess(sent_msgs) = NULL;
1289     CpvAccess(end_sent) = NULL;
1290     CpvAccess(MsgQueueLen) = 0;
1291
1292     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1293
1294 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1295     CpvInitialize(double, projTraceStart);
1296     /* only PE 0 needs to care about registration (to generate sts file). */
1297     if (CmiMyPe() == 0) {
1298         registerMachineUserEventsFunction(&registerMPITraceEvents);
1299     }
1300 #endif
1301
1302 #if CMK_SMP
1303     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1304     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1305     if (_thread_provided == MPI_THREAD_MULTIPLE)
1306       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1307 #else
1308     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1309 #endif
1310
1311 #if MACHINE_DEBUG_LOG
1312     if (CmiMyRank() == 0) {
1313         char ln[200];
1314         sprintf(ln,"debugLog.%d",CmiMyNode());
1315         debugLog=fopen(ln,"w");
1316     }
1317 #endif
1318 }
1319 /* ######End of functions related with starting programs###### */
1320
1321 /***********************************************************************
1322  *
1323  * Abort function:
1324  *
1325  ************************************************************************/
1326
1327 void CmiAbort(const char *message) {
1328     char *m;
1329     /* if CharmDebug is attached simply try to send a message to it */
1330 #if CMK_CCS_AVAILABLE
1331     if (CpvAccess(cmiArgDebugFlag)) {
1332         CpdNotify(CPD_ABORT, message);
1333         CpdFreeze();
1334     }
1335 #endif
1336     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1337              "Reason: %s\n",CmiMyPe(),message);
1338     /*  CmiError(message); */
1339     CmiPrintStackTrace(0);
1340     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1341     CmiSetHandler(m, machine_exit_idx);
1342     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1343     machine_exit(m);
1344     /* Program never reaches here */
1345     MPI_Abort(MPI_COMM_WORLD, 1);
1346 }
1347
1348 /**************************  TIMER FUNCTIONS **************************/
1349 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1350
1351 /* MPI calls are not threadsafe, even the timer on some machines */
1352 static CmiNodeLock  timerLock = 0;
1353                                 static int _absoluteTime = 0;
1354                                                            static double starttimer = 0;
1355                                                                                       static int _is_global = 0;
1356
1357 int CmiTimerIsSynchronized() {
1358     int  flag;
1359     void *v;
1360
1361     /*  check if it using synchronized timer */
1362     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1363         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1364     if (flag) {
1365         _is_global = *(int*)v;
1366         if (_is_global && CmiMyPe() == 0)
1367             printf("Charm++> MPI timer is synchronized\n");
1368     }
1369     return _is_global;
1370 }
1371
1372 int CmiTimerAbsolute() {
1373     return _absoluteTime;
1374 }
1375
1376 double CmiStartTimer() {
1377     return 0.0;
1378 }
1379
1380 double CmiInitTime() {
1381     return starttimer;
1382 }
1383
1384 void CmiTimerInit(char **argv) {
1385     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1386     if (_absoluteTime && CmiMyPe() == 0)
1387         printf("Charm++> absolute MPI timer is used\n");
1388
1389 #if ! CMK_MEM_CHECKPOINT
1390     _is_global = CmiTimerIsSynchronized();
1391 #else
1392     _is_global = 0;
1393 #endif
1394
1395     if (_is_global) {
1396         if (CmiMyRank() == 0) {
1397             double minTimer;
1398 #if CMK_TIMER_USE_XT3_DCLOCK
1399             starttimer = dclock();
1400 #else
1401             starttimer = MPI_Wtime();
1402 #endif
1403
1404             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1405                           MPI_COMM_WORLD );
1406             starttimer = minTimer;
1407         }
1408     } else { /* we don't have a synchronous timer, set our own start time */
1409 #if ! CMK_MEM_CHECKPOINT
1410         CmiBarrier();
1411         CmiBarrier();
1412         CmiBarrier();
1413 #endif
1414 #if CMK_TIMER_USE_XT3_DCLOCK
1415         starttimer = dclock();
1416 #else
1417         starttimer = MPI_Wtime();
1418 #endif
1419     }
1420
1421 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1422     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1423         timerLock = CmiCreateLock();
1424 #endif
1425     CmiNodeAllBarrier();          /* for smp */
1426 }
1427
1428 /**
1429  * Since the timerLock is never created, and is
1430  * always NULL, then all the if-condition inside
1431  * the timer functions could be disabled right
1432  * now in the case of SMP. --Chao Mei
1433  */
1434 double CmiTimer(void) {
1435     double t;
1436 #if 0 && CMK_SMP
1437     if (timerLock) CmiLock(timerLock);
1438 #endif
1439
1440 #if CMK_TIMER_USE_XT3_DCLOCK
1441     t = dclock();
1442 #else
1443     t = MPI_Wtime();
1444 #endif
1445
1446 #if 0 && CMK_SMP
1447     if (timerLock) CmiUnlock(timerLock);
1448 #endif
1449
1450     return _absoluteTime?t: (t-starttimer);
1451 }
1452
1453 double CmiWallTimer(void) {
1454     double t;
1455 #if 0 && CMK_SMP
1456     if (timerLock) CmiLock(timerLock);
1457 #endif
1458
1459 #if CMK_TIMER_USE_XT3_DCLOCK
1460     t = dclock();
1461 #else
1462     t = MPI_Wtime();
1463 #endif
1464
1465 #if 0 && CMK_SMP
1466     if (timerLock) CmiUnlock(timerLock);
1467 #endif
1468
1469     return _absoluteTime? t: (t-starttimer);
1470 }
1471
1472 double CmiCpuTimer(void) {
1473     double t;
1474 #if 0 && CMK_SMP
1475     if (timerLock) CmiLock(timerLock);
1476 #endif
1477 #if CMK_TIMER_USE_XT3_DCLOCK
1478     t = dclock() - starttimer;
1479 #else
1480     t = MPI_Wtime() - starttimer;
1481 #endif
1482 #if 0 && CMK_SMP
1483     if (timerLock) CmiUnlock(timerLock);
1484 #endif
1485     return t;
1486 }
1487
1488 #endif     /* CMK_TIMER_USE_SPECIAL */
1489
1490 /************Barrier Related Functions****************/
1491 /* must be called on all ranks including comm thread in SMP */
1492 int CmiBarrier() {
1493 #if CMK_SMP
1494     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1495     CmiNodeAllBarrier();
1496     if (CmiMyRank() == CmiMyNodeSize())
1497 #else
1498     if (CmiMyRank() == 0)
1499 #endif
1500     {
1501         /**
1502          *  The call of CmiBarrier is usually before the initialization
1503          *  of trace module of Charm++, therefore, the START_EVENT
1504          *  and END_EVENT are disabled here. -Chao Mei
1505          */
1506         /*START_EVENT();*/
1507
1508         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1509             CmiAbort("Timernit: MPI_Barrier failed!\n");
1510
1511         /*END_EVENT(10);*/
1512     }
1513     CmiNodeAllBarrier();
1514     return 0;
1515 }
1516
1517 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1518 int CmiBarrierZero() {
1519     int i;
1520 #if CMK_SMP
1521     if (CmiMyRank() == CmiMyNodeSize())
1522 #else
1523     if (CmiMyRank() == 0)
1524 #endif
1525     {
1526         char msg[1];
1527         MPI_Status sts;
1528         if (CmiMyNode() == 0)  {
1529             for (i=0; i<CmiNumNodes()-1; i++) {
1530                 START_EVENT();
1531
1532                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1533                     CmiPrintf("MPI_Recv failed!\n");
1534
1535                 END_EVENT(30);
1536             }
1537         } else {
1538             START_EVENT();
1539
1540             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1541                 printf("MPI_Send failed!\n");
1542
1543             END_EVENT(20);
1544         }
1545     }
1546     CmiNodeAllBarrier();
1547     return 0;
1548 }
1549
1550
1551 #if CMK_MEM_CHECKPOINT
1552
1553 void mpi_restart_crashed(int pe, int rank)
1554 {
1555     int vals[2];
1556     vals[0] = pe;
1557     vals[1] = CpvAccess(_curRestartPhase)+1;
1558     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1559     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1560 }
1561
1562 /* notify spare processors to exit */
1563 void mpi_end_spare()
1564 {
1565     int i;
1566     for (i=nextrank; i<total_pes; i++) {
1567         int vals[2] = {-1,-1};
1568         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1569     }
1570 }
1571
1572 int find_spare_mpirank(int pe)
1573 {
1574     if (nextrank == total_pes) {
1575       CmiAbort("Charm++> No spare processor available.");
1576     }
1577     petorank[pe] = nextrank;
1578     nextrank++;
1579     return nextrank-1;
1580 }
1581
1582 void CkDieNow()
1583 {
1584     CmiPrintf("[%d] die now.\n", CmiMyPe());
1585
1586       /* release old messages */
1587     while (!CmiAllAsyncMsgsSent()) {
1588         PumpMsgs();
1589         CmiReleaseSentMessages();
1590     }
1591     MPI_Barrier(MPI_COMM_WORLD);
1592     MPI_Finalize();
1593     exit(0);
1594 }
1595
1596 #endif
1597
1598 /*@}*/
1599