fix a bug when multiple crashes. The spare processors need to be updated with the...
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310
311     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
312 #if CMK_ERROR_CHECKING
313     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
314     CMI_SET_CHECKSUM(msg, size);
315 #endif
316
317 #if MPI_POST_RECV
318     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
319         START_EVENT();
320         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
321             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
322         /*END_EVENT(40);*/
323     } else {
324         START_EVENT();
325         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
326             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
327         /*END_EVENT(40);*/
328     }
329 #else
330     START_EVENT();
331 #if CMK_MEM_CHECKPOINT
332     int dstrank = petorank[node];
333 #else
334     int dstrank = node;
335 #endif
336     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
337         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
338     /*END_EVENT(40);*/
339 #endif
340
341 #if CMK_SMP_TRACE_COMMTHREAD
342     traceBeginCommOp(msg);
343     traceChangeLastTimestamp(CpvAccess(projTraceStart));
344     /* traceSendMsgComm must execute after traceBeginCommOp because
345          * we pretend we execute an entry method, and inside this we
346          * pretend we will send another message. Otherwise how could
347          * a message creation just before an entry method invocation?
348          * If such logic is broken, the projections will not trace
349          * messages correctly! -Chao Mei
350          */
351     traceSendMsgComm(msg);
352     traceEndCommOp(msg);
353 #if CMI_MPI_TRACE_MOREDETAILED
354     char tmp[64];
355     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
356     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
357 #endif
358 #endif
359
360     MACHSTATE(3,"}MPI_send end");
361     CpvAccess(MsgQueueLen)++;
362     if (CpvAccess(sent_msgs)==0)
363         CpvAccess(sent_msgs) = smsg;
364     else
365         CpvAccess(end_sent)->next = smsg;
366     CpvAccess(end_sent) = smsg;
367
368 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
369     if (mode == P2P_SYNC || mode == P2P_ASYNC)
370     {
371     while (CpvAccess(MsgQueueLen) > request_max) {
372         CmiReleaseSentMessages();
373         PumpMsgs();
374     }
375     }
376 #endif
377
378     return (CmiCommHandle) &(smsg->req);
379 }
380
381 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
382     /* Ignoring the mode for MPI layer */
383
384     CmiState cs = CmiGetState();
385     SMSG_LIST *msg_tmp;
386     int  rank;
387
388     CmiAssert(destNode != CmiMyNode());
389 #if CMK_SMP
390     if (_thread_provided != MPI_THREAD_MULTIPLE) {
391       EnqueueMsg(msg, size, destNode, mode);
392       return 0;
393     }
394 #endif
395     /* non smp */
396     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
397     msg_tmp->msg = msg;
398     msg_tmp->destpe = destNode;
399     msg_tmp->size = size;
400     msg_tmp->next = 0;
401     msg_tmp->mode = mode;
402     return MPISendOneMsg(msg_tmp);
403 }
404
405 static size_t CmiAllAsyncMsgsSent(void) {
406     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
407     MPI_Status sts;
408     int done;
409
410     while (msg_tmp!=0) {
411         done = 0;
412         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
413             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
414         if (!done)
415             return 0;
416         msg_tmp = msg_tmp->next;
417         /*    MsgQueueLen--; ????? */
418     }
419     return 1;
420 }
421
422 int CmiAsyncMsgSent(CmiCommHandle c) {
423
424     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
425     int done;
426     MPI_Status sts;
427
428     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
429         msg_tmp = msg_tmp->next;
430     if (msg_tmp) {
431         done = 0;
432         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
433             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
434         return ((done)?1:0);
435     } else {
436         return 1;
437     }
438 }
439
440 void CmiReleaseCommHandle(CmiCommHandle c) {
441     return;
442 }
443
444 /* ######Beginning of functions related with communication progress ###### */
445 static void CmiReleaseSentMessages(void) {
446     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
447     SMSG_LIST *prev=0;
448     SMSG_LIST *temp;
449     int done;
450     MPI_Status sts;
451
452 #if CMK_BLUEGENEL
453     MPID_Progress_test();
454 #endif
455
456     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
457     while (msg_tmp!=0) {
458         done =0;
459 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
460         double startT = CmiWallTimer();
461 #endif
462         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
463             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
464         if (done) {
465             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
466             CpvAccess(MsgQueueLen)--;
467             /* Release the message */
468             temp = msg_tmp->next;
469             if (prev==0) /* first message */
470                 CpvAccess(sent_msgs) = temp;
471             else
472                 prev->next = temp;
473             CmiFree(msg_tmp->msg);
474             CmiFree(msg_tmp);
475             msg_tmp = temp;
476         } else {
477             prev = msg_tmp;
478             msg_tmp = msg_tmp->next;
479         }
480 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
481         {
482             double endT = CmiWallTimer();
483             /* only record the event if it takes more than 1ms */
484             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
485         }
486 #endif
487     }
488     CpvAccess(end_sent) = prev;
489     MACHSTATE(2,"} CmiReleaseSentMessages end");
490 }
491
492 static int PumpMsgs(void) {
493     int nbytes, flg, res;
494     char *msg;
495     MPI_Status sts;
496     int recd=0;
497
498 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
499     int recvCnt=0;
500 #endif
501
502 #if CMK_BLUEGENEL
503     MPID_Progress_test();
504 #endif
505
506     MACHSTATE(2,"PumpMsgs begin {");
507
508 #if CMI_DYNAMIC_EXERT_CAP
509     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
510 #endif
511
512     while (1) {
513 #if CMI_EXERT_RECV_CAP
514         if (recvCnt==RECV_CAP) break;
515 #elif CMI_DYNAMIC_EXERT_CAP
516         if (recvCnt >= dynamicRecvCap) break;
517 #endif
518
519         /* First check posted recvs then do  probe unmatched outstanding messages */
520 #if MPI_POST_RECV
521         int completed_index=-1;
522         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
523             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
524         if (flg) {
525             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
526                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
527
528             recd = 1;
529             msg = (char *) CmiAlloc(nbytes);
530             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
531             /* and repost the recv */
532
533             START_EVENT();
534
535             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
536                                            MPI_POST_RECV_SIZE,
537                                            MPI_BYTE,
538                                            MPI_ANY_SOURCE,
539                                            POST_RECV_TAG,
540                                            MPI_COMM_WORLD,
541                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
542                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
543
544             END_EVENT(50);
545
546             CpvAccess(Cmi_posted_recv_total)++;
547         } else {
548             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
549             if (res != MPI_SUCCESS)
550                 CmiAbort("MPI_Iprobe failed\n");
551             if (!flg) break;
552             recd = 1;
553             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
554             msg = (char *) CmiAlloc(nbytes);
555
556             START_EVENT();
557
558             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
559                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
560
561             END_EVENT(30);
562
563             CpvAccess(Cmi_unposted_recv_total)++;
564         }
565 #else
566         /* Original version */
567 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
568         double startT = CmiWallTimer();
569 #endif
570         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
571         if (res != MPI_SUCCESS)
572             CmiAbort("MPI_Iprobe failed\n");
573
574         if (!flg) break;
575 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
576         {
577             double endT = CmiWallTimer();
578             /* only trace the probe that last longer than 1ms */
579             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
580         }
581 #endif
582
583         recd = 1;
584         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
585         msg = (char *) CmiAlloc(nbytes);
586
587         START_EVENT();
588
589         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
590             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
591
592         /*END_EVENT(30);*/
593
594 #endif
595
596 #if CMK_SMP_TRACE_COMMTHREAD
597         traceBeginCommOp(msg);
598         traceChangeLastTimestamp(CpvAccess(projTraceStart));
599         traceEndCommOp(msg);
600 #if CMI_MPI_TRACE_MOREDETAILED
601         char tmp[32];
602         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
603         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
604 #endif
605 #elif CMK_TRACE_COMMOVERHEAD
606         char tmp[32];
607         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
608         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
609 #endif
610
611
612         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
613         CMI_CHECK_CHECKSUM(msg, nbytes);
614 #if CMK_ERROR_CHECKING
615         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
616             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
617             CmiFree(msg);
618             CmiAbort("Abort!\n");
619             continue;
620         }
621 #endif
622
623         handleOneRecvedMsg(nbytes, msg);
624
625 #if CMI_EXERT_RECV_CAP
626         recvCnt++;
627 #elif CMI_DYNAMIC_EXERT_CAP
628         recvCnt++;
629 #if CMK_SMP
630         /* check sendMsgBuf to get the  number of messages that have not been sent
631              * which is only available in SMP mode
632          * MsgQueueLen indicates the number of messages that have not been released
633              * by MPI
634              */
635         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
636                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
637             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
638         }
639 #else
640         /* MsgQueueLen indicates the number of messages that have not been released
641              * by MPI
642              */
643         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
644             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
645         }
646 #endif
647
648 #endif
649
650     }
651
652     MACHSTATE(2,"} PumpMsgs end ");
653     return recd;
654 }
655
656 /* blocking version */
657 static void PumpMsgsBlocking(void) {
658     static int maxbytes = 20000000;
659     static char *buf = NULL;
660     int nbytes, flg;
661     MPI_Status sts;
662     char *msg;
663     int recd=0;
664
665     if (!PCQueueEmpty(CmiGetState()->recv)) return;
666     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
667     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
668     if (CpvAccess(sent_msgs))  return;
669
670 #if 0
671     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
672 #endif
673
674     if (buf == NULL) {
675         buf = (char *) CmiAlloc(maxbytes);
676         _MEMCHECK(buf);
677     }
678
679
680 #if MPI_POST_RECV
681 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
682     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
683 #endif
684
685     START_EVENT();
686
687     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
688         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
689
690     /*END_EVENT(30);*/
691
692     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
693     msg = (char *) CmiAlloc(nbytes);
694     memcpy(msg, buf, nbytes);
695
696 #if CMK_SMP_TRACE_COMMTHREAD
697     traceBeginCommOp(msg);
698     traceChangeLastTimestamp(CpvAccess(projTraceStart));
699     traceEndCommOp(msg);
700 #if CMI_MPI_TRACE_MOREDETAILED
701     char tmp[32];
702     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
703     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
704 #endif
705 #endif
706
707     handleOneRecvedMsg(nbytes, msg);
708 }
709
710
711 #if CMK_SMP
712
713 /* called by communication thread in SMP */
714 static int SendMsgBuf() {
715     SMSG_LIST *msg_tmp;
716     char *msg;
717     int node, rank, size;
718     int i;
719     int sent = 0;
720
721 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
722     int sentCnt = 0;
723 #endif
724
725 #if CMI_DYNAMIC_EXERT_CAP
726     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
727 #endif
728
729     MACHSTATE(2,"SendMsgBuf begin {");
730 #if MULTI_SENDQUEUE
731     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
732         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
733             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
734 #else
735     /* single message sending queue */
736     /* CmiLock(sendMsgBufLock); */
737     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
738     /* CmiUnlock(sendMsgBufLock); */
739     while (NULL != msg_tmp) {
740 #endif
741             MPISendOneMsg(msg_tmp);
742             sent=1;
743
744 #if CMI_EXERT_SEND_CAP
745             if (++sentCnt == SEND_CAP) break;
746 #elif CMI_DYNAMIC_EXERT_CAP
747             if (++sentCnt >= dynamicSendCap) break;
748             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
749                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
750 #endif
751
752 #if ! MULTI_SENDQUEUE
753             /* CmiLock(sendMsgBufLock); */
754             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
755             /* CmiUnlock(sendMsgBufLock); */
756 #endif
757         }
758 #if MULTI_SENDQUEUE
759     }
760 #endif
761     MACHSTATE(2,"}SendMsgBuf end ");
762     return sent;
763 }
764
765 static int MsgQueueEmpty() {
766     int i;
767 #if MULTI_SENDQUEUE
768     for (i=0; i<_Cmi_mynodesize; i++)
769         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
770 #else
771     return PCQueueEmpty(sendMsgBuf);
772 #endif
773     return 1;
774 }
775
776 /* test if all processors recv queues are empty */
777 static int RecvQueueEmpty() {
778     int i;
779     for (i=0; i<_Cmi_mynodesize; i++) {
780         CmiState cs=CmiGetStateN(i);
781         if (!PCQueueEmpty(cs->recv)) return 0;
782     }
783     return 1;
784 }
785
786
787 #define REPORT_COMM_METRICS 0
788 #if REPORT_COMM_METRICS
789 static double pumptime = 0.0;
790                          static double releasetime = 0.0;
791                                                      static double sendtime = 0.0;
792 #endif
793
794 #endif //end of CMK_SMP
795
796 static void AdvanceCommunicationForMPI() {
797 #if REPORT_COMM_METRICS
798     double t1, t2, t3, t4;
799     t1 = CmiWallTimer();
800 #endif
801
802 #if CMK_SMP
803     PumpMsgs();
804
805 #if REPORT_COMM_METRICS
806     t2 = CmiWallTimer();
807 #endif
808
809     CmiReleaseSentMessages();
810 #if REPORT_COMM_METRICS
811     t3 = CmiWallTimer();
812 #endif
813
814     SendMsgBuf();
815
816 #if REPORT_COMM_METRICS
817     t4 = CmiWallTimer();
818     pumptime += (t2-t1);
819     releasetime += (t3-t2);
820     sendtime += (t4-t3);
821 #endif
822
823 #else /* non-SMP case */
824     CmiReleaseSentMessages();
825
826 #if REPORT_COMM_METRICS
827     t2 = CmiWallTimer();
828 #endif
829     PumpMsgs();
830
831 #if REPORT_COMM_METRICS
832     t3 = CmiWallTimer();
833     pumptime += (t3-t2);
834     releasetime += (t2-t1);
835 #endif
836
837 #endif /* end of #if CMK_SMP */
838 }
839 /* ######End of functions related with communication progress ###### */
840
841 static void MachinePostNonLocalForMPI() {
842 #if !CMK_SMP
843     if (no_outstanding_sends) {
844         while (CpvAccess(MsgQueueLen)>0) {
845             AdvanceCommunicationForMPI();
846         }
847     }
848
849     /* FIXME: I don't think the following codes are needed because
850      * it repeats the same job of the next call of CmiGetNonLocal
851      */
852 #if 0
853     if (!msg) {
854         CmiReleaseSentMessages();
855         if (PumpMsgs())
856             return  PCQueuePop(cs->recv);
857         else
858             return 0;
859     }
860 #endif
861 #else
862   if (_thread_provided == MPI_THREAD_MULTIPLE) {
863         CmiReleaseSentMessages();
864         SendMsgBuf();
865   }
866 #endif
867 }
868
869 /* Idle-state related functions: called in non-smp mode */
870 void CmiNotifyIdleForMPI(void) {
871     CmiReleaseSentMessages();
872     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
873 }
874
875 /* Network progress function is used to poll the network when for
876    messages. This flushes receive buffers on some  implementations*/
877 #if CMK_MACHINE_PROGRESS_DEFINED
878 void CmiMachineProgressImpl() {
879 #if !CMK_SMP
880     PumpMsgs();
881 #if CMK_IMMEDIATE_MSG
882     CmiHandleImmediate();
883 #endif
884 #else
885     /*Not implemented yet. Communication server does not seem to be
886       thread safe, so only communication thread call it */
887     if (CmiMyRank() == CmiMyNodeSize())
888         CommunicationServerThread(0);
889 #endif
890 }
891 #endif
892
893 /* ######Beginning of functions related with exiting programs###### */
894 void DrainResourcesForMPI() {
895 #if !CMK_SMP
896     while (!CmiAllAsyncMsgsSent()) {
897         PumpMsgs();
898         CmiReleaseSentMessages();
899     }
900 #else
901     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
902         CmiReleaseSentMessages();
903         SendMsgBuf();
904         PumpMsgs();
905     }
906 #endif
907 #if CMK_MEM_CHECKPOINT
908     if (CmiMyPe() == 0) mpi_end_spare();
909 #endif
910     MACHSTATE(2, "Machine exit barrier begin {");
911     START_EVENT();
912     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
913         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
914     END_EVENT(10);
915     MACHSTATE(2, "} Machine exit barrier end");
916 }
917
918 void LrtsExit() {
919 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
920     int doPrint = 0;
921 #if CMK_SMP
922     if (CmiMyNode()==0) doPrint = 1;
923 #else
924     if (CmiMyPe()==0) doPrint = 1;
925 #endif
926
927     if (doPrint) {
928 #if MPI_POST_RECV
929         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
930 #endif
931     }
932 #endif
933
934 #if REPORT_COMM_METRICS
935 #if CMK_SMP
936     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
937               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
938               pumptime, releasetime, sendtime);
939 #else
940     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
941               CmiMyPe(), pumptime, releasetime, sendtime);
942 #endif
943 #endif
944
945 #if ! CMK_AUTOBUILD
946     signal(SIGINT, signal_int);
947     MPI_Finalize();
948 #endif
949     exit(0);
950 }
951
952 static int machine_exit_idx;
953 static void machine_exit(char *m) {
954     EmergencyExit();
955     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
956     fflush(stdout);
957     CmiNodeBarrier();
958     if (CmiMyRank() == 0) {
959         MPI_Barrier(MPI_COMM_WORLD);
960         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
961         MPI_Abort(MPI_COMM_WORLD, 1);
962     } else {
963         while (1) CmiYield();
964     }
965 }
966
967 static void KillOnAllSigs(int sigNo) {
968     static int already_in_signal_handler = 0;
969     char *m;
970     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
971     already_in_signal_handler = 1;
972 #if CMK_CCS_AVAILABLE
973     if (CpvAccess(cmiArgDebugFlag)) {
974         CpdNotify(CPD_SIGNAL, sigNo);
975         CpdFreeze();
976     }
977 #endif
978     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
979              "Signal: %d\n",CmiMyPe(),sigNo);
980     CmiPrintStackTrace(1);
981
982     m = CmiAlloc(CmiMsgHeaderSizeBytes);
983     CmiSetHandler(m, machine_exit_idx);
984     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
985     machine_exit(m);
986 }
987 /* ######End of functions related with exiting programs###### */
988
989
990 /* ######Beginning of functions related with starting programs###### */
991 static void registerMPITraceEvents() {
992 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
993     traceRegisterUserEvent("MPI_Barrier", 10);
994     traceRegisterUserEvent("MPI_Send", 20);
995     traceRegisterUserEvent("MPI_Recv", 30);
996     traceRegisterUserEvent("MPI_Isend", 40);
997     traceRegisterUserEvent("MPI_Irecv", 50);
998     traceRegisterUserEvent("MPI_Test", 60);
999     traceRegisterUserEvent("MPI_Iprobe", 70);
1000 #endif
1001 }
1002
1003 #if MACHINE_DEBUG_LOG
1004 FILE *debugLog = NULL;
1005 #endif
1006
1007 static char *thread_level_tostring(int thread_level) {
1008 #if CMK_MPI_INIT_THREAD
1009     switch (thread_level) {
1010     case MPI_THREAD_SINGLE:
1011         return "MPI_THREAD_SINGLE";
1012     case MPI_THREAD_FUNNELED:
1013         return "MPI_THREAD_FUNNELED";
1014     case MPI_THREAD_SERIALIZED:
1015         return "MPI_THREAD_SERIALIZED";
1016     case MPI_THREAD_MULTIPLE :
1017         return "MPI_THREAD_MULTIPLE ";
1018     default: {
1019         char *str = (char*)malloc(5);
1020         sprintf(str,"%d", thread_level);
1021         return str;
1022     }
1023     }
1024     return  "unknown";
1025 #else
1026     char *str = (char*)malloc(5);
1027     sprintf(str,"%d", thread_level);
1028     return str;
1029 #endif
1030 }
1031
1032 /**
1033  *  Obtain the number of nodes, my node id, and consuming machine layer
1034  *  specific arguments
1035  */
1036 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1037     int n,i;
1038     int ver, subver;
1039     int provided;
1040     int thread_level;
1041     int myNID;
1042     int largc=*argc;
1043     char** largv=*argv;
1044
1045 #if MACHINE_DEBUG
1046     debugLog=NULL;
1047 #endif
1048 #if CMK_USE_HP_MAIN_FIX
1049 #if FOR_CPLUS
1050     _main(largc,largv);
1051 #endif
1052 #endif
1053
1054 #if CMK_MPI_INIT_THREAD
1055 #if CMK_SMP
1056     thread_level = MPI_THREAD_MULTIPLE;
1057 #else
1058     thread_level = MPI_THREAD_SINGLE;
1059 #endif
1060     MPI_Init_thread(argc, argv, thread_level, &provided);
1061     _thread_provided = provided;
1062 #else
1063     MPI_Init(argc, argv);
1064     thread_level = 0;
1065     provided = -1;
1066 #endif
1067     largc = *argc;
1068     largv = *argv;
1069     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1070     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1071
1072     myNID = *myNodeID;
1073
1074     MPI_Get_version(&ver, &subver);
1075     if (myNID == 0) {
1076         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1077     }
1078
1079     {
1080         int debug = CmiGetArgFlag(largv,"++debug");
1081         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1082         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1083 #if CMK_HAS_GETPID
1084             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1085             fflush(stdout);
1086             if (!debug_no_pause)
1087                 sleep(15);
1088 #else
1089             printf("++debug ignored.\n");
1090 #endif
1091         }
1092     }
1093
1094
1095 #if CMK_MEM_CHECKPOINT
1096     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1097        CmiAssert(num_workpes <= *numNodes);
1098        total_pes = *numNodes;
1099        *numNodes = num_workpes;
1100     }
1101     else
1102        num_workpes = *numNodes;
1103     petorank = (int *)malloc(sizeof(int) * num_workpes);
1104     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1105     nextrank = num_workpes;
1106
1107     if (*myNodeID >= num_workpes) {
1108       MPI_Status sts;
1109       int vals[2];
1110       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1111       int newpe = vals[0];
1112       CpvAccess(_curRestartPhase) = vals[1];
1113
1114       if (newpe == -1) {
1115           MPI_Barrier(MPI_COMM_WORLD);
1116           MPI_Finalize();
1117           exit(0);
1118       }
1119
1120         /* update petorank */
1121       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1122       nextrank = *myNodeID + 1;
1123       *myNodeID = newpe;
1124       myNID = newpe;
1125
1126       // set identity
1127       // add +restartaftercrash to argv
1128       char *phase_str;
1129       char **restart_argv;
1130       int i=0;
1131       while(largv[i]!= NULL) i++;
1132       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1133       i=0;
1134       while(largv[i]!= NULL){
1135                 restart_argv[i] = largv[i];
1136                 i++;
1137       }
1138       restart_argv[i] = "+restartaftercrash";
1139       phase_str = (char*)malloc(10);
1140       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1141       restart_argv[i+1]=phase_str;
1142       restart_argv[i+2]=NULL;
1143       *argv = restart_argv;
1144       *argc = i+2;
1145       largc = *argc;
1146       largv = *argv;
1147       /* i=0; while(largv[i]!= NULL) { printf("%s\n", largv[i]); i++; }  */
1148     }
1149 #endif
1150
1151     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1152     if (idleblock && _Cmi_mynode == 0) {
1153         printf("Charm++: Running in idle blocking mode.\n");
1154     }
1155
1156 #if CMK_CHARMDEBUG
1157     /* setup signal handlers */
1158     signal(SIGSEGV, KillOnAllSigs);
1159     signal(SIGFPE, KillOnAllSigs);
1160     signal(SIGILL, KillOnAllSigs);
1161     signal_int = signal(SIGINT, KillOnAllSigs);
1162     signal(SIGTERM, KillOnAllSigs);
1163     signal(SIGABRT, KillOnAllSigs);
1164 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1165     signal(SIGQUIT, KillOnAllSigs);
1166     signal(SIGBUS, KillOnAllSigs);
1167 #   endif /*UNIX*/
1168 #endif
1169
1170 #if CMK_NO_OUTSTANDING_SENDS
1171     no_outstanding_sends=1;
1172 #endif
1173     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1174         no_outstanding_sends = 1;
1175         if (myNID == 0)
1176             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1177                    no_outstanding_sends?"":" not");
1178     }
1179
1180     request_max=MAX_QLEN;
1181     CmiGetArgInt(largv,"+requestmax",&request_max);
1182     /*printf("request max=%d\n", request_max);*/
1183
1184 #if MPI_POST_RECV
1185     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1186     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1187     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1188     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1189     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1190     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1191     if (myNID==0) {
1192         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1193                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1194     }
1195 #endif
1196
1197 #if CMI_DYNAMIC_EXERT_CAP
1198     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1199     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1200     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1201     if (myNID==0) {
1202         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1203                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1204     }
1205 #endif
1206
1207     /* checksum flag */
1208     if (CmiGetArgFlag(largv,"+checksum")) {
1209 #if CMK_ERROR_CHECKING
1210         checksum_flag = 1;
1211         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1212 #else
1213         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1214 #endif
1215     }
1216
1217     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1218     for (i=0; i<_Cmi_mynodesize+1; i++) {
1219 #if MULTI_SENDQUEUE
1220         procState[i].sendMsgBuf = PCQueueCreate();
1221 #endif
1222         procState[i].recvLock = CmiCreateLock();
1223     }
1224 #if CMK_SMP
1225 #if !MULTI_SENDQUEUE
1226     sendMsgBuf = PCQueueCreate();
1227     sendMsgBufLock = CmiCreateLock();
1228 #endif
1229 #endif
1230 }
1231
1232 static void MachinePreCommonInitForMPI(int everReturn) {
1233
1234 #if MPI_POST_RECV
1235     int doInit = 1;
1236     int i;
1237
1238 #if CMK_SMP
1239     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1240 #endif
1241
1242     /* Currently, in mpi smp, the main thread will be the comm thread, so
1243      *  only the comm thread should post recvs. Cpvs, however, need to be
1244      * created on rank 0 (the ptrs to the actual cpv memory), while
1245      * other ranks are busy waiting for this to finish. So cpv initialize
1246      * routines have to be called on every ranks, although they are only
1247      * useful on comm thread (whose rank is not zero) -Chao Mei
1248      */
1249     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1250     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1251     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1252     CpvInitialize(char*,CmiPostedRecvBuffers);
1253
1254     if (doInit) {
1255         /* Post some extra recvs to help out with incoming messages */
1256         /* On some MPIs the messages are unexpected and thus slow */
1257
1258         /* An array of request handles for posted recvs */
1259         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1260
1261         /* An array of buffers for posted recvs */
1262         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1263
1264         /* Post Recvs */
1265         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1266             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1267                                            MPI_POST_RECV_SIZE,
1268                                            MPI_BYTE,
1269                                            MPI_ANY_SOURCE,
1270                                            POST_RECV_TAG,
1271                                            MPI_COMM_WORLD,
1272                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1273                 CmiAbort("MPI_Irecv failed\n");
1274         }
1275     }
1276 #endif
1277
1278 }
1279
1280 static void MachinePostCommonInitForMPI(int everReturn) {
1281
1282     CmiIdleState *s=CmiNotifyGetState();
1283
1284     CpvInitialize(SMSG_LIST *, sent_msgs);
1285     CpvInitialize(SMSG_LIST *, end_sent);
1286     CpvInitialize(int, MsgQueueLen);
1287     CpvAccess(sent_msgs) = NULL;
1288     CpvAccess(end_sent) = NULL;
1289     CpvAccess(MsgQueueLen) = 0;
1290
1291     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1292
1293 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1294     CpvInitialize(double, projTraceStart);
1295     /* only PE 0 needs to care about registration (to generate sts file). */
1296     if (CmiMyPe() == 0) {
1297         registerMachineUserEventsFunction(&registerMPITraceEvents);
1298     }
1299 #endif
1300
1301 #if CMK_SMP
1302     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1303     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1304     if (_thread_provided == MPI_THREAD_MULTIPLE)
1305       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1306 #else
1307     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1308 #endif
1309
1310 #if MACHINE_DEBUG_LOG
1311     if (CmiMyRank() == 0) {
1312         char ln[200];
1313         sprintf(ln,"debugLog.%d",CmiMyNode());
1314         debugLog=fopen(ln,"w");
1315     }
1316 #endif
1317 }
1318 /* ######End of functions related with starting programs###### */
1319
1320 /***********************************************************************
1321  *
1322  * Abort function:
1323  *
1324  ************************************************************************/
1325
1326 void CmiAbort(const char *message) {
1327     char *m;
1328     /* if CharmDebug is attached simply try to send a message to it */
1329 #if CMK_CCS_AVAILABLE
1330     if (CpvAccess(cmiArgDebugFlag)) {
1331         CpdNotify(CPD_ABORT, message);
1332         CpdFreeze();
1333     }
1334 #endif
1335     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1336              "Reason: %s\n",CmiMyPe(),message);
1337     /*  CmiError(message); */
1338     CmiPrintStackTrace(0);
1339     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1340     CmiSetHandler(m, machine_exit_idx);
1341     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1342     machine_exit(m);
1343     /* Program never reaches here */
1344     MPI_Abort(MPI_COMM_WORLD, 1);
1345 }
1346
1347 /**************************  TIMER FUNCTIONS **************************/
1348 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1349
1350 /* MPI calls are not threadsafe, even the timer on some machines */
1351 static CmiNodeLock  timerLock = 0;
1352                                 static int _absoluteTime = 0;
1353                                                            static double starttimer = 0;
1354                                                                                       static int _is_global = 0;
1355
1356 int CmiTimerIsSynchronized() {
1357     int  flag;
1358     void *v;
1359
1360     /*  check if it using synchronized timer */
1361     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1362         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1363     if (flag) {
1364         _is_global = *(int*)v;
1365         if (_is_global && CmiMyPe() == 0)
1366             printf("Charm++> MPI timer is synchronized\n");
1367     }
1368     return _is_global;
1369 }
1370
1371 int CmiTimerAbsolute() {
1372     return _absoluteTime;
1373 }
1374
1375 double CmiStartTimer() {
1376     return 0.0;
1377 }
1378
1379 double CmiInitTime() {
1380     return starttimer;
1381 }
1382
1383 void CmiTimerInit(char **argv) {
1384     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1385     if (_absoluteTime && CmiMyPe() == 0)
1386         printf("Charm++> absolute MPI timer is used\n");
1387
1388     _is_global = CmiTimerIsSynchronized();
1389
1390     if (_is_global) {
1391         if (CmiMyRank() == 0) {
1392             double minTimer;
1393 #if CMK_TIMER_USE_XT3_DCLOCK
1394             starttimer = dclock();
1395 #else
1396             starttimer = MPI_Wtime();
1397 #endif
1398
1399             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1400                           MPI_COMM_WORLD );
1401             starttimer = minTimer;
1402         }
1403     } else { /* we don't have a synchronous timer, set our own start time */
1404 #if ! CMK_MEM_CHECKPOINT
1405         CmiBarrier();
1406         CmiBarrier();
1407         CmiBarrier();
1408 #endif
1409 #if CMK_TIMER_USE_XT3_DCLOCK
1410         starttimer = dclock();
1411 #else
1412         starttimer = MPI_Wtime();
1413 #endif
1414     }
1415
1416 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1417     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1418         timerLock = CmiCreateLock();
1419 #endif
1420     CmiNodeAllBarrier();          /* for smp */
1421 }
1422
1423 /**
1424  * Since the timerLock is never created, and is
1425  * always NULL, then all the if-condition inside
1426  * the timer functions could be disabled right
1427  * now in the case of SMP. --Chao Mei
1428  */
1429 double CmiTimer(void) {
1430     double t;
1431 #if 0 && CMK_SMP
1432     if (timerLock) CmiLock(timerLock);
1433 #endif
1434
1435 #if CMK_TIMER_USE_XT3_DCLOCK
1436     t = dclock();
1437 #else
1438     t = MPI_Wtime();
1439 #endif
1440
1441 #if 0 && CMK_SMP
1442     if (timerLock) CmiUnlock(timerLock);
1443 #endif
1444
1445     return _absoluteTime?t: (t-starttimer);
1446 }
1447
1448 double CmiWallTimer(void) {
1449     double t;
1450 #if 0 && CMK_SMP
1451     if (timerLock) CmiLock(timerLock);
1452 #endif
1453
1454 #if CMK_TIMER_USE_XT3_DCLOCK
1455     t = dclock();
1456 #else
1457     t = MPI_Wtime();
1458 #endif
1459
1460 #if 0 && CMK_SMP
1461     if (timerLock) CmiUnlock(timerLock);
1462 #endif
1463
1464     return _absoluteTime? t: (t-starttimer);
1465 }
1466
1467 double CmiCpuTimer(void) {
1468     double t;
1469 #if 0 && CMK_SMP
1470     if (timerLock) CmiLock(timerLock);
1471 #endif
1472 #if CMK_TIMER_USE_XT3_DCLOCK
1473     t = dclock() - starttimer;
1474 #else
1475     t = MPI_Wtime() - starttimer;
1476 #endif
1477 #if 0 && CMK_SMP
1478     if (timerLock) CmiUnlock(timerLock);
1479 #endif
1480     return t;
1481 }
1482
1483 #endif     /* CMK_TIMER_USE_SPECIAL */
1484
1485 /************Barrier Related Functions****************/
1486 /* must be called on all ranks including comm thread in SMP */
1487 int CmiBarrier() {
1488 #if CMK_SMP
1489     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1490     CmiNodeAllBarrier();
1491     if (CmiMyRank() == CmiMyNodeSize())
1492 #else
1493     if (CmiMyRank() == 0)
1494 #endif
1495     {
1496         /**
1497          *  The call of CmiBarrier is usually before the initialization
1498          *  of trace module of Charm++, therefore, the START_EVENT
1499          *  and END_EVENT are disabled here. -Chao Mei
1500          */
1501         /*START_EVENT();*/
1502
1503         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1504             CmiAbort("Timernit: MPI_Barrier failed!\n");
1505
1506         /*END_EVENT(10);*/
1507     }
1508     CmiNodeAllBarrier();
1509     return 0;
1510 }
1511
1512 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1513 int CmiBarrierZero() {
1514     int i;
1515 #if CMK_SMP
1516     if (CmiMyRank() == CmiMyNodeSize())
1517 #else
1518     if (CmiMyRank() == 0)
1519 #endif
1520     {
1521         char msg[1];
1522         MPI_Status sts;
1523         if (CmiMyNode() == 0)  {
1524             for (i=0; i<CmiNumNodes()-1; i++) {
1525                 START_EVENT();
1526
1527                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1528                     CmiPrintf("MPI_Recv failed!\n");
1529
1530                 END_EVENT(30);
1531             }
1532         } else {
1533             START_EVENT();
1534
1535             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1536                 printf("MPI_Send failed!\n");
1537
1538             END_EVENT(20);
1539         }
1540     }
1541     CmiNodeAllBarrier();
1542     return 0;
1543 }
1544
1545
1546 #if CMK_MEM_CHECKPOINT
1547
1548 void mpi_restart_crashed(int pe, int rank)
1549 {
1550     int vals[2];
1551     vals[0] = pe;
1552     vals[1] = CpvAccess(_curRestartPhase)+1;
1553     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1554     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1555 }
1556
1557 /* notify spare processors to exit */
1558 void mpi_end_spare()
1559 {
1560     int i;
1561     for (i=nextrank; i<total_pes; i++) {
1562         int vals[2] = {-1,-1};
1563         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1564     }
1565 }
1566
1567 int find_spare_mpirank(int pe)
1568 {
1569     if (nextrank == total_pes) {
1570       CmiAbort("Charm++> ran out of spare processors");
1571     }
1572     petorank[pe] = nextrank;
1573     nextrank++;
1574     return nextrank-1;
1575 }
1576
1577 void CkDieNow()
1578 {
1579     CmiPrintf("[%d] die now.\n", CmiMyPe());
1580
1581       /* release old messages */
1582     while (!CmiAllAsyncMsgsSent()) {
1583         PumpMsgs();
1584         CmiReleaseSentMessages();
1585     }
1586     MPI_Barrier(MPI_COMM_WORLD);
1587     MPI_Finalize();
1588     exit(0);
1589 }
1590
1591 #endif
1592
1593 /*@}*/
1594