avoid TimerInit global synchronization under bgp for FT
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310
311     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
312 #if CMK_ERROR_CHECKING
313     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
314     CMI_SET_CHECKSUM(msg, size);
315 #endif
316
317 #if MPI_POST_RECV
318     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
319         START_EVENT();
320         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
321             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
322         /*END_EVENT(40);*/
323     } else {
324         START_EVENT();
325         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
326             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
327         /*END_EVENT(40);*/
328     }
329 #else
330     START_EVENT();
331 #if CMK_MEM_CHECKPOINT
332     int dstrank = petorank[node];
333 #else
334     int dstrank = node;
335 #endif
336     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
337         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
338     /*END_EVENT(40);*/
339 #endif
340
341 #if CMK_SMP_TRACE_COMMTHREAD
342     traceBeginCommOp(msg);
343     traceChangeLastTimestamp(CpvAccess(projTraceStart));
344     /* traceSendMsgComm must execute after traceBeginCommOp because
345          * we pretend we execute an entry method, and inside this we
346          * pretend we will send another message. Otherwise how could
347          * a message creation just before an entry method invocation?
348          * If such logic is broken, the projections will not trace
349          * messages correctly! -Chao Mei
350          */
351     traceSendMsgComm(msg);
352     traceEndCommOp(msg);
353 #if CMI_MPI_TRACE_MOREDETAILED
354     char tmp[64];
355     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
356     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
357 #endif
358 #endif
359
360     MACHSTATE(3,"}MPI_send end");
361     CpvAccess(MsgQueueLen)++;
362     if (CpvAccess(sent_msgs)==0)
363         CpvAccess(sent_msgs) = smsg;
364     else
365         CpvAccess(end_sent)->next = smsg;
366     CpvAccess(end_sent) = smsg;
367
368 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
369     if (mode == P2P_SYNC || mode == P2P_ASYNC)
370     {
371     while (CpvAccess(MsgQueueLen) > request_max) {
372         CmiReleaseSentMessages();
373         PumpMsgs();
374     }
375     }
376 #endif
377
378     return (CmiCommHandle) &(smsg->req);
379 }
380
381 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
382     /* Ignoring the mode for MPI layer */
383
384     CmiState cs = CmiGetState();
385     SMSG_LIST *msg_tmp;
386     int  rank;
387
388     CmiAssert(destNode != CmiMyNode());
389 #if CMK_SMP
390     if (_thread_provided != MPI_THREAD_MULTIPLE) {
391       EnqueueMsg(msg, size, destNode, mode);
392       return 0;
393     }
394 #endif
395     /* non smp */
396     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
397     msg_tmp->msg = msg;
398     msg_tmp->destpe = destNode;
399     msg_tmp->size = size;
400     msg_tmp->next = 0;
401     msg_tmp->mode = mode;
402     return MPISendOneMsg(msg_tmp);
403 }
404
405 static size_t CmiAllAsyncMsgsSent(void) {
406     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
407     MPI_Status sts;
408     int done;
409
410     while (msg_tmp!=0) {
411         done = 0;
412         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
413             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
414         if (!done)
415             return 0;
416         msg_tmp = msg_tmp->next;
417         /*    MsgQueueLen--; ????? */
418     }
419     return 1;
420 }
421
422 int CmiAsyncMsgSent(CmiCommHandle c) {
423
424     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
425     int done;
426     MPI_Status sts;
427
428     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
429         msg_tmp = msg_tmp->next;
430     if (msg_tmp) {
431         done = 0;
432         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
433             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
434         return ((done)?1:0);
435     } else {
436         return 1;
437     }
438 }
439
440 void CmiReleaseCommHandle(CmiCommHandle c) {
441     return;
442 }
443
444 /* ######Beginning of functions related with communication progress ###### */
445 static void CmiReleaseSentMessages(void) {
446     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
447     SMSG_LIST *prev=0;
448     SMSG_LIST *temp;
449     int done;
450     MPI_Status sts;
451
452 #if CMK_BLUEGENEL
453     MPID_Progress_test();
454 #endif
455
456     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
457     while (msg_tmp!=0) {
458         done =0;
459 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
460         double startT = CmiWallTimer();
461 #endif
462         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
463             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
464         if (done) {
465             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
466             CpvAccess(MsgQueueLen)--;
467             /* Release the message */
468             temp = msg_tmp->next;
469             if (prev==0) /* first message */
470                 CpvAccess(sent_msgs) = temp;
471             else
472                 prev->next = temp;
473             CmiFree(msg_tmp->msg);
474             CmiFree(msg_tmp);
475             msg_tmp = temp;
476         } else {
477             prev = msg_tmp;
478             msg_tmp = msg_tmp->next;
479         }
480 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
481         {
482             double endT = CmiWallTimer();
483             /* only record the event if it takes more than 1ms */
484             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
485         }
486 #endif
487     }
488     CpvAccess(end_sent) = prev;
489     MACHSTATE(2,"} CmiReleaseSentMessages end");
490 }
491
492 static int PumpMsgs(void) {
493     int nbytes, flg, res;
494     char *msg;
495     MPI_Status sts;
496     int recd=0;
497
498 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
499     int recvCnt=0;
500 #endif
501
502 #if CMK_BLUEGENEL
503     MPID_Progress_test();
504 #endif
505
506     MACHSTATE(2,"PumpMsgs begin {");
507
508 #if CMI_DYNAMIC_EXERT_CAP
509     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
510 #endif
511
512     while (1) {
513 #if CMI_EXERT_RECV_CAP
514         if (recvCnt==RECV_CAP) break;
515 #elif CMI_DYNAMIC_EXERT_CAP
516         if (recvCnt >= dynamicRecvCap) break;
517 #endif
518
519         /* First check posted recvs then do  probe unmatched outstanding messages */
520 #if MPI_POST_RECV
521         int completed_index=-1;
522         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
523             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
524         if (flg) {
525             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
526                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
527
528             recd = 1;
529             msg = (char *) CmiAlloc(nbytes);
530             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
531             /* and repost the recv */
532
533             START_EVENT();
534
535             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
536                                            MPI_POST_RECV_SIZE,
537                                            MPI_BYTE,
538                                            MPI_ANY_SOURCE,
539                                            POST_RECV_TAG,
540                                            MPI_COMM_WORLD,
541                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
542                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
543
544             END_EVENT(50);
545
546             CpvAccess(Cmi_posted_recv_total)++;
547         } else {
548             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
549             if (res != MPI_SUCCESS)
550                 CmiAbort("MPI_Iprobe failed\n");
551             if (!flg) break;
552             recd = 1;
553             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
554             msg = (char *) CmiAlloc(nbytes);
555
556             START_EVENT();
557
558             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
559                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
560
561             END_EVENT(30);
562
563             CpvAccess(Cmi_unposted_recv_total)++;
564         }
565 #else
566         /* Original version */
567 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
568         double startT = CmiWallTimer();
569 #endif
570         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
571         if (res != MPI_SUCCESS)
572             CmiAbort("MPI_Iprobe failed\n");
573
574         if (!flg) break;
575 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
576         {
577             double endT = CmiWallTimer();
578             /* only trace the probe that last longer than 1ms */
579             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
580         }
581 #endif
582
583         recd = 1;
584         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
585         msg = (char *) CmiAlloc(nbytes);
586
587         START_EVENT();
588
589         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
590             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
591
592         /*END_EVENT(30);*/
593
594 #endif
595
596 #if CMK_SMP_TRACE_COMMTHREAD
597         traceBeginCommOp(msg);
598         traceChangeLastTimestamp(CpvAccess(projTraceStart));
599         traceEndCommOp(msg);
600 #if CMI_MPI_TRACE_MOREDETAILED
601         char tmp[32];
602         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
603         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
604 #endif
605 #elif CMK_TRACE_COMMOVERHEAD
606         char tmp[32];
607         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
608         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
609 #endif
610
611
612         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
613         CMI_CHECK_CHECKSUM(msg, nbytes);
614 #if CMK_ERROR_CHECKING
615         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
616             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
617             CmiFree(msg);
618             CmiAbort("Abort!\n");
619             continue;
620         }
621 #endif
622
623         handleOneRecvedMsg(nbytes, msg);
624
625 #if CMI_EXERT_RECV_CAP
626         recvCnt++;
627 #elif CMI_DYNAMIC_EXERT_CAP
628         recvCnt++;
629 #if CMK_SMP
630         /* check sendMsgBuf to get the  number of messages that have not been sent
631              * which is only available in SMP mode
632          * MsgQueueLen indicates the number of messages that have not been released
633              * by MPI
634              */
635         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
636                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
637             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
638         }
639 #else
640         /* MsgQueueLen indicates the number of messages that have not been released
641              * by MPI
642              */
643         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
644             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
645         }
646 #endif
647
648 #endif
649
650     }
651
652     MACHSTATE(2,"} PumpMsgs end ");
653     return recd;
654 }
655
656 /* blocking version */
657 static void PumpMsgsBlocking(void) {
658     static int maxbytes = 20000000;
659     static char *buf = NULL;
660     int nbytes, flg;
661     MPI_Status sts;
662     char *msg;
663     int recd=0;
664
665     if (!PCQueueEmpty(CmiGetState()->recv)) return;
666     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
667     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
668     if (CpvAccess(sent_msgs))  return;
669
670 #if 0
671     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
672 #endif
673
674     if (buf == NULL) {
675         buf = (char *) CmiAlloc(maxbytes);
676         _MEMCHECK(buf);
677     }
678
679
680 #if MPI_POST_RECV
681 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
682     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
683 #endif
684
685     START_EVENT();
686
687     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
688         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
689
690     /*END_EVENT(30);*/
691
692     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
693     msg = (char *) CmiAlloc(nbytes);
694     memcpy(msg, buf, nbytes);
695
696 #if CMK_SMP_TRACE_COMMTHREAD
697     traceBeginCommOp(msg);
698     traceChangeLastTimestamp(CpvAccess(projTraceStart));
699     traceEndCommOp(msg);
700 #if CMI_MPI_TRACE_MOREDETAILED
701     char tmp[32];
702     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
703     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
704 #endif
705 #endif
706
707     handleOneRecvedMsg(nbytes, msg);
708 }
709
710
711 #if CMK_SMP
712
713 /* called by communication thread in SMP */
714 static int SendMsgBuf() {
715     SMSG_LIST *msg_tmp;
716     char *msg;
717     int node, rank, size;
718     int i;
719     int sent = 0;
720
721 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
722     int sentCnt = 0;
723 #endif
724
725 #if CMI_DYNAMIC_EXERT_CAP
726     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
727 #endif
728
729     MACHSTATE(2,"SendMsgBuf begin {");
730 #if MULTI_SENDQUEUE
731     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
732         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
733             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
734 #else
735     /* single message sending queue */
736     /* CmiLock(sendMsgBufLock); */
737     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
738     /* CmiUnlock(sendMsgBufLock); */
739     while (NULL != msg_tmp) {
740 #endif
741             MPISendOneMsg(msg_tmp);
742             sent=1;
743
744 #if CMI_EXERT_SEND_CAP
745             if (++sentCnt == SEND_CAP) break;
746 #elif CMI_DYNAMIC_EXERT_CAP
747             if (++sentCnt >= dynamicSendCap) break;
748             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
749                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
750 #endif
751
752 #if ! MULTI_SENDQUEUE
753             /* CmiLock(sendMsgBufLock); */
754             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
755             /* CmiUnlock(sendMsgBufLock); */
756 #endif
757         }
758 #if MULTI_SENDQUEUE
759     }
760 #endif
761     MACHSTATE(2,"}SendMsgBuf end ");
762     return sent;
763 }
764
765 static int MsgQueueEmpty() {
766     int i;
767 #if MULTI_SENDQUEUE
768     for (i=0; i<_Cmi_mynodesize; i++)
769         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
770 #else
771     return PCQueueEmpty(sendMsgBuf);
772 #endif
773     return 1;
774 }
775
776 /* test if all processors recv queues are empty */
777 static int RecvQueueEmpty() {
778     int i;
779     for (i=0; i<_Cmi_mynodesize; i++) {
780         CmiState cs=CmiGetStateN(i);
781         if (!PCQueueEmpty(cs->recv)) return 0;
782     }
783     return 1;
784 }
785
786
787 #define REPORT_COMM_METRICS 0
788 #if REPORT_COMM_METRICS
789 static double pumptime = 0.0;
790                          static double releasetime = 0.0;
791                                                      static double sendtime = 0.0;
792 #endif
793
794 #endif //end of CMK_SMP
795
796 static void AdvanceCommunicationForMPI() {
797 #if REPORT_COMM_METRICS
798     double t1, t2, t3, t4;
799     t1 = CmiWallTimer();
800 #endif
801
802 #if CMK_SMP
803     PumpMsgs();
804
805 #if REPORT_COMM_METRICS
806     t2 = CmiWallTimer();
807 #endif
808
809     CmiReleaseSentMessages();
810 #if REPORT_COMM_METRICS
811     t3 = CmiWallTimer();
812 #endif
813
814     SendMsgBuf();
815
816 #if REPORT_COMM_METRICS
817     t4 = CmiWallTimer();
818     pumptime += (t2-t1);
819     releasetime += (t3-t2);
820     sendtime += (t4-t3);
821 #endif
822
823 #else /* non-SMP case */
824     CmiReleaseSentMessages();
825
826 #if REPORT_COMM_METRICS
827     t2 = CmiWallTimer();
828 #endif
829     PumpMsgs();
830
831 #if REPORT_COMM_METRICS
832     t3 = CmiWallTimer();
833     pumptime += (t3-t2);
834     releasetime += (t2-t1);
835 #endif
836
837 #endif /* end of #if CMK_SMP */
838 }
839 /* ######End of functions related with communication progress ###### */
840
841 static void MachinePostNonLocalForMPI() {
842 #if !CMK_SMP
843     if (no_outstanding_sends) {
844         while (CpvAccess(MsgQueueLen)>0) {
845             AdvanceCommunicationForMPI();
846         }
847     }
848
849     /* FIXME: I don't think the following codes are needed because
850      * it repeats the same job of the next call of CmiGetNonLocal
851      */
852 #if 0
853     if (!msg) {
854         CmiReleaseSentMessages();
855         if (PumpMsgs())
856             return  PCQueuePop(cs->recv);
857         else
858             return 0;
859     }
860 #endif
861 #else
862   if (_thread_provided == MPI_THREAD_MULTIPLE) {
863         CmiReleaseSentMessages();
864         SendMsgBuf();
865   }
866 #endif
867 }
868
869 /* Idle-state related functions: called in non-smp mode */
870 void CmiNotifyIdleForMPI(void) {
871     CmiReleaseSentMessages();
872     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
873 }
874
875 /* Network progress function is used to poll the network when for
876    messages. This flushes receive buffers on some  implementations*/
877 #if CMK_MACHINE_PROGRESS_DEFINED
878 void CmiMachineProgressImpl() {
879 #if !CMK_SMP
880     PumpMsgs();
881 #if CMK_IMMEDIATE_MSG
882     CmiHandleImmediate();
883 #endif
884 #else
885     /*Not implemented yet. Communication server does not seem to be
886       thread safe, so only communication thread call it */
887     if (CmiMyRank() == CmiMyNodeSize())
888         CommunicationServerThread(0);
889 #endif
890 }
891 #endif
892
893 /* ######Beginning of functions related with exiting programs###### */
894 void DrainResourcesForMPI() {
895 #if !CMK_SMP
896     while (!CmiAllAsyncMsgsSent()) {
897         PumpMsgs();
898         CmiReleaseSentMessages();
899     }
900 #else
901     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
902         CmiReleaseSentMessages();
903         SendMsgBuf();
904         PumpMsgs();
905     }
906 #endif
907 #if CMK_MEM_CHECKPOINT
908     if (CmiMyPe() == 0) mpi_end_spare();
909 #endif
910     MACHSTATE(2, "Machine exit barrier begin {");
911     START_EVENT();
912     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
913         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
914     END_EVENT(10);
915     MACHSTATE(2, "} Machine exit barrier end");
916 }
917
918 void LrtsExit() {
919 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
920     int doPrint = 0;
921 #if CMK_SMP
922     if (CmiMyNode()==0) doPrint = 1;
923 #else
924     if (CmiMyPe()==0) doPrint = 1;
925 #endif
926
927     if (doPrint) {
928 #if MPI_POST_RECV
929         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
930 #endif
931     }
932 #endif
933
934 #if REPORT_COMM_METRICS
935 #if CMK_SMP
936     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
937               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
938               pumptime, releasetime, sendtime);
939 #else
940     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
941               CmiMyPe(), pumptime, releasetime, sendtime);
942 #endif
943 #endif
944
945 #if ! CMK_AUTOBUILD
946     signal(SIGINT, signal_int);
947     MPI_Finalize();
948 #endif
949     exit(0);
950 }
951
952 static int machine_exit_idx;
953 static void machine_exit(char *m) {
954     EmergencyExit();
955     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
956     fflush(stdout);
957     CmiNodeBarrier();
958     if (CmiMyRank() == 0) {
959         MPI_Barrier(MPI_COMM_WORLD);
960         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
961         MPI_Abort(MPI_COMM_WORLD, 1);
962     } else {
963         while (1) CmiYield();
964     }
965 }
966
967 static void KillOnAllSigs(int sigNo) {
968     static int already_in_signal_handler = 0;
969     char *m;
970     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
971     already_in_signal_handler = 1;
972 #if CMK_CCS_AVAILABLE
973     if (CpvAccess(cmiArgDebugFlag)) {
974         CpdNotify(CPD_SIGNAL, sigNo);
975         CpdFreeze();
976     }
977 #endif
978     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
979              "Signal: %d\n",CmiMyPe(),sigNo);
980     CmiPrintStackTrace(1);
981
982     m = CmiAlloc(CmiMsgHeaderSizeBytes);
983     CmiSetHandler(m, machine_exit_idx);
984     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
985     machine_exit(m);
986 }
987 /* ######End of functions related with exiting programs###### */
988
989
990 /* ######Beginning of functions related with starting programs###### */
991 static void registerMPITraceEvents() {
992 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
993     traceRegisterUserEvent("MPI_Barrier", 10);
994     traceRegisterUserEvent("MPI_Send", 20);
995     traceRegisterUserEvent("MPI_Recv", 30);
996     traceRegisterUserEvent("MPI_Isend", 40);
997     traceRegisterUserEvent("MPI_Irecv", 50);
998     traceRegisterUserEvent("MPI_Test", 60);
999     traceRegisterUserEvent("MPI_Iprobe", 70);
1000 #endif
1001 }
1002
1003 #if MACHINE_DEBUG_LOG
1004 FILE *debugLog = NULL;
1005 #endif
1006
1007 static char *thread_level_tostring(int thread_level) {
1008 #if CMK_MPI_INIT_THREAD
1009     switch (thread_level) {
1010     case MPI_THREAD_SINGLE:
1011         return "MPI_THREAD_SINGLE";
1012     case MPI_THREAD_FUNNELED:
1013         return "MPI_THREAD_FUNNELED";
1014     case MPI_THREAD_SERIALIZED:
1015         return "MPI_THREAD_SERIALIZED";
1016     case MPI_THREAD_MULTIPLE :
1017         return "MPI_THREAD_MULTIPLE ";
1018     default: {
1019         char *str = (char*)malloc(5);
1020         sprintf(str,"%d", thread_level);
1021         return str;
1022     }
1023     }
1024     return  "unknown";
1025 #else
1026     char *str = (char*)malloc(5);
1027     sprintf(str,"%d", thread_level);
1028     return str;
1029 #endif
1030 }
1031
1032 /**
1033  *  Obtain the number of nodes, my node id, and consuming machine layer
1034  *  specific arguments
1035  */
1036 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1037     int n,i;
1038     int ver, subver;
1039     int provided;
1040     int thread_level;
1041     int myNID;
1042     int largc=*argc;
1043     char** largv=*argv;
1044
1045 #if MACHINE_DEBUG
1046     debugLog=NULL;
1047 #endif
1048 #if CMK_USE_HP_MAIN_FIX
1049 #if FOR_CPLUS
1050     _main(largc,largv);
1051 #endif
1052 #endif
1053
1054 #if CMK_MPI_INIT_THREAD
1055 #if CMK_SMP
1056     thread_level = MPI_THREAD_MULTIPLE;
1057 #else
1058     thread_level = MPI_THREAD_SINGLE;
1059 #endif
1060     MPI_Init_thread(argc, argv, thread_level, &provided);
1061     _thread_provided = provided;
1062 #else
1063     MPI_Init(argc, argv);
1064     thread_level = 0;
1065     provided = -1;
1066 #endif
1067     largc = *argc;
1068     largv = *argv;
1069     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1070     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1071
1072     myNID = *myNodeID;
1073
1074     MPI_Get_version(&ver, &subver);
1075     if (myNID == 0) {
1076         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1077     }
1078
1079     {
1080         int debug = CmiGetArgFlag(largv,"++debug");
1081         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1082         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1083 #if CMK_HAS_GETPID
1084             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1085             fflush(stdout);
1086             if (!debug_no_pause)
1087                 sleep(15);
1088 #else
1089             printf("++debug ignored.\n");
1090 #endif
1091         }
1092     }
1093
1094
1095 #if CMK_MEM_CHECKPOINT
1096     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1097        CmiAssert(num_workpes <= *numNodes);
1098        total_pes = *numNodes;
1099        *numNodes = num_workpes;
1100        if (*myNodeID == 0)
1101            CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1102     }
1103     else
1104        num_workpes = *numNodes;
1105     petorank = (int *)malloc(sizeof(int) * num_workpes);
1106     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1107     nextrank = num_workpes;
1108
1109     if (*myNodeID >= num_workpes) {    /* is spare processor */
1110       MPI_Status sts;
1111       int vals[2];
1112       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1113       int newpe = vals[0];
1114       CpvAccess(_curRestartPhase) = vals[1];
1115
1116       if (newpe == -1) {
1117           MPI_Barrier(MPI_COMM_WORLD);
1118           MPI_Finalize();
1119           exit(0);
1120       }
1121
1122         /* update petorank */
1123       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1124       nextrank = *myNodeID + 1;
1125       *myNodeID = newpe;
1126       myNID = newpe;
1127
1128        /* add +restartaftercrash to argv */
1129       char *phase_str;
1130       char **restart_argv;
1131       int i=0;
1132       while(largv[i]!= NULL) i++;
1133       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1134       i=0;
1135       while(largv[i]!= NULL){
1136                 restart_argv[i] = largv[i];
1137                 i++;
1138       }
1139       restart_argv[i] = "+restartaftercrash";
1140       phase_str = (char*)malloc(10);
1141       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1142       restart_argv[i+1]=phase_str;
1143       restart_argv[i+2]=NULL;
1144       *argv = restart_argv;
1145       *argc = i+2;
1146       largc = *argc;
1147       largv = *argv;
1148     }
1149 #endif
1150
1151     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1152     if (idleblock && _Cmi_mynode == 0) {
1153         printf("Charm++: Running in idle blocking mode.\n");
1154     }
1155
1156 #if CMK_CHARMDEBUG
1157     /* setup signal handlers */
1158     signal(SIGSEGV, KillOnAllSigs);
1159     signal(SIGFPE, KillOnAllSigs);
1160     signal(SIGILL, KillOnAllSigs);
1161     signal_int = signal(SIGINT, KillOnAllSigs);
1162     signal(SIGTERM, KillOnAllSigs);
1163     signal(SIGABRT, KillOnAllSigs);
1164 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1165     signal(SIGQUIT, KillOnAllSigs);
1166     signal(SIGBUS, KillOnAllSigs);
1167 #   endif /*UNIX*/
1168 #endif
1169
1170 #if CMK_NO_OUTSTANDING_SENDS
1171     no_outstanding_sends=1;
1172 #endif
1173     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1174         no_outstanding_sends = 1;
1175         if (myNID == 0)
1176             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1177                    no_outstanding_sends?"":" not");
1178     }
1179
1180     request_max=MAX_QLEN;
1181     CmiGetArgInt(largv,"+requestmax",&request_max);
1182     /*printf("request max=%d\n", request_max);*/
1183
1184 #if MPI_POST_RECV
1185     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1186     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1187     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1188     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1189     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1190     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1191     if (myNID==0) {
1192         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1193                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1194     }
1195 #endif
1196
1197 #if CMI_DYNAMIC_EXERT_CAP
1198     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1199     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1200     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1201     if (myNID==0) {
1202         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1203                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1204     }
1205 #endif
1206
1207     /* checksum flag */
1208     if (CmiGetArgFlag(largv,"+checksum")) {
1209 #if CMK_ERROR_CHECKING
1210         checksum_flag = 1;
1211         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1212 #else
1213         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1214 #endif
1215     }
1216
1217     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1218     for (i=0; i<_Cmi_mynodesize+1; i++) {
1219 #if MULTI_SENDQUEUE
1220         procState[i].sendMsgBuf = PCQueueCreate();
1221 #endif
1222         procState[i].recvLock = CmiCreateLock();
1223     }
1224 #if CMK_SMP
1225 #if !MULTI_SENDQUEUE
1226     sendMsgBuf = PCQueueCreate();
1227     sendMsgBufLock = CmiCreateLock();
1228 #endif
1229 #endif
1230 }
1231
1232 static void MachinePreCommonInitForMPI(int everReturn) {
1233
1234 #if MPI_POST_RECV
1235     int doInit = 1;
1236     int i;
1237
1238 #if CMK_SMP
1239     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1240 #endif
1241
1242     /* Currently, in mpi smp, the main thread will be the comm thread, so
1243      *  only the comm thread should post recvs. Cpvs, however, need to be
1244      * created on rank 0 (the ptrs to the actual cpv memory), while
1245      * other ranks are busy waiting for this to finish. So cpv initialize
1246      * routines have to be called on every ranks, although they are only
1247      * useful on comm thread (whose rank is not zero) -Chao Mei
1248      */
1249     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1250     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1251     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1252     CpvInitialize(char*,CmiPostedRecvBuffers);
1253
1254     if (doInit) {
1255         /* Post some extra recvs to help out with incoming messages */
1256         /* On some MPIs the messages are unexpected and thus slow */
1257
1258         /* An array of request handles for posted recvs */
1259         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1260
1261         /* An array of buffers for posted recvs */
1262         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1263
1264         /* Post Recvs */
1265         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1266             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1267                                            MPI_POST_RECV_SIZE,
1268                                            MPI_BYTE,
1269                                            MPI_ANY_SOURCE,
1270                                            POST_RECV_TAG,
1271                                            MPI_COMM_WORLD,
1272                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1273                 CmiAbort("MPI_Irecv failed\n");
1274         }
1275     }
1276 #endif
1277
1278 }
1279
1280 static void MachinePostCommonInitForMPI(int everReturn) {
1281
1282     CmiIdleState *s=CmiNotifyGetState();
1283
1284     CpvInitialize(SMSG_LIST *, sent_msgs);
1285     CpvInitialize(SMSG_LIST *, end_sent);
1286     CpvInitialize(int, MsgQueueLen);
1287     CpvAccess(sent_msgs) = NULL;
1288     CpvAccess(end_sent) = NULL;
1289     CpvAccess(MsgQueueLen) = 0;
1290
1291     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1292
1293 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1294     CpvInitialize(double, projTraceStart);
1295     /* only PE 0 needs to care about registration (to generate sts file). */
1296     if (CmiMyPe() == 0) {
1297         registerMachineUserEventsFunction(&registerMPITraceEvents);
1298     }
1299 #endif
1300
1301 #if CMK_SMP
1302     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1303     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1304     if (_thread_provided == MPI_THREAD_MULTIPLE)
1305       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1306 #else
1307     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1308 #endif
1309
1310 #if MACHINE_DEBUG_LOG
1311     if (CmiMyRank() == 0) {
1312         char ln[200];
1313         sprintf(ln,"debugLog.%d",CmiMyNode());
1314         debugLog=fopen(ln,"w");
1315     }
1316 #endif
1317 }
1318 /* ######End of functions related with starting programs###### */
1319
1320 /***********************************************************************
1321  *
1322  * Abort function:
1323  *
1324  ************************************************************************/
1325
1326 void CmiAbort(const char *message) {
1327     char *m;
1328     /* if CharmDebug is attached simply try to send a message to it */
1329 #if CMK_CCS_AVAILABLE
1330     if (CpvAccess(cmiArgDebugFlag)) {
1331         CpdNotify(CPD_ABORT, message);
1332         CpdFreeze();
1333     }
1334 #endif
1335     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1336              "Reason: %s\n",CmiMyPe(),message);
1337     /*  CmiError(message); */
1338     CmiPrintStackTrace(0);
1339     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1340     CmiSetHandler(m, machine_exit_idx);
1341     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1342     machine_exit(m);
1343     /* Program never reaches here */
1344     MPI_Abort(MPI_COMM_WORLD, 1);
1345 }
1346
1347 /**************************  TIMER FUNCTIONS **************************/
1348 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1349
1350 /* MPI calls are not threadsafe, even the timer on some machines */
1351 static CmiNodeLock  timerLock = 0;
1352                                 static int _absoluteTime = 0;
1353                                                            static double starttimer = 0;
1354                                                                                       static int _is_global = 0;
1355
1356 int CmiTimerIsSynchronized() {
1357     int  flag;
1358     void *v;
1359
1360     /*  check if it using synchronized timer */
1361     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1362         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1363     if (flag) {
1364         _is_global = *(int*)v;
1365         if (_is_global && CmiMyPe() == 0)
1366             printf("Charm++> MPI timer is synchronized\n");
1367     }
1368     return _is_global;
1369 }
1370
1371 int CmiTimerAbsolute() {
1372     return _absoluteTime;
1373 }
1374
1375 double CmiStartTimer() {
1376     return 0.0;
1377 }
1378
1379 double CmiInitTime() {
1380     return starttimer;
1381 }
1382
1383 void CmiTimerInit(char **argv) {
1384     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1385     if (_absoluteTime && CmiMyPe() == 0)
1386         printf("Charm++> absolute MPI timer is used\n");
1387
1388 #if ! CMK_MEM_CHECKPOINT
1389     _is_global = CmiTimerIsSynchronized();
1390 #else
1391     _is_global = 0;
1392 #endif
1393
1394     if (_is_global) {
1395         if (CmiMyRank() == 0) {
1396             double minTimer;
1397 #if CMK_TIMER_USE_XT3_DCLOCK
1398             starttimer = dclock();
1399 #else
1400             starttimer = MPI_Wtime();
1401 #endif
1402
1403             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1404                           MPI_COMM_WORLD );
1405             starttimer = minTimer;
1406         }
1407     } else { /* we don't have a synchronous timer, set our own start time */
1408 #if ! CMK_MEM_CHECKPOINT
1409         CmiBarrier();
1410         CmiBarrier();
1411         CmiBarrier();
1412 #endif
1413 #if CMK_TIMER_USE_XT3_DCLOCK
1414         starttimer = dclock();
1415 #else
1416         starttimer = MPI_Wtime();
1417 #endif
1418     }
1419
1420 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1421     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1422         timerLock = CmiCreateLock();
1423 #endif
1424     CmiNodeAllBarrier();          /* for smp */
1425 }
1426
1427 /**
1428  * Since the timerLock is never created, and is
1429  * always NULL, then all the if-condition inside
1430  * the timer functions could be disabled right
1431  * now in the case of SMP. --Chao Mei
1432  */
1433 double CmiTimer(void) {
1434     double t;
1435 #if 0 && CMK_SMP
1436     if (timerLock) CmiLock(timerLock);
1437 #endif
1438
1439 #if CMK_TIMER_USE_XT3_DCLOCK
1440     t = dclock();
1441 #else
1442     t = MPI_Wtime();
1443 #endif
1444
1445 #if 0 && CMK_SMP
1446     if (timerLock) CmiUnlock(timerLock);
1447 #endif
1448
1449     return _absoluteTime?t: (t-starttimer);
1450 }
1451
1452 double CmiWallTimer(void) {
1453     double t;
1454 #if 0 && CMK_SMP
1455     if (timerLock) CmiLock(timerLock);
1456 #endif
1457
1458 #if CMK_TIMER_USE_XT3_DCLOCK
1459     t = dclock();
1460 #else
1461     t = MPI_Wtime();
1462 #endif
1463
1464 #if 0 && CMK_SMP
1465     if (timerLock) CmiUnlock(timerLock);
1466 #endif
1467
1468     return _absoluteTime? t: (t-starttimer);
1469 }
1470
1471 double CmiCpuTimer(void) {
1472     double t;
1473 #if 0 && CMK_SMP
1474     if (timerLock) CmiLock(timerLock);
1475 #endif
1476 #if CMK_TIMER_USE_XT3_DCLOCK
1477     t = dclock() - starttimer;
1478 #else
1479     t = MPI_Wtime() - starttimer;
1480 #endif
1481 #if 0 && CMK_SMP
1482     if (timerLock) CmiUnlock(timerLock);
1483 #endif
1484     return t;
1485 }
1486
1487 #endif     /* CMK_TIMER_USE_SPECIAL */
1488
1489 /************Barrier Related Functions****************/
1490 /* must be called on all ranks including comm thread in SMP */
1491 int CmiBarrier() {
1492 #if CMK_SMP
1493     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1494     CmiNodeAllBarrier();
1495     if (CmiMyRank() == CmiMyNodeSize())
1496 #else
1497     if (CmiMyRank() == 0)
1498 #endif
1499     {
1500         /**
1501          *  The call of CmiBarrier is usually before the initialization
1502          *  of trace module of Charm++, therefore, the START_EVENT
1503          *  and END_EVENT are disabled here. -Chao Mei
1504          */
1505         /*START_EVENT();*/
1506
1507         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1508             CmiAbort("Timernit: MPI_Barrier failed!\n");
1509
1510         /*END_EVENT(10);*/
1511     }
1512     CmiNodeAllBarrier();
1513     return 0;
1514 }
1515
1516 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1517 int CmiBarrierZero() {
1518     int i;
1519 #if CMK_SMP
1520     if (CmiMyRank() == CmiMyNodeSize())
1521 #else
1522     if (CmiMyRank() == 0)
1523 #endif
1524     {
1525         char msg[1];
1526         MPI_Status sts;
1527         if (CmiMyNode() == 0)  {
1528             for (i=0; i<CmiNumNodes()-1; i++) {
1529                 START_EVENT();
1530
1531                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1532                     CmiPrintf("MPI_Recv failed!\n");
1533
1534                 END_EVENT(30);
1535             }
1536         } else {
1537             START_EVENT();
1538
1539             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1540                 printf("MPI_Send failed!\n");
1541
1542             END_EVENT(20);
1543         }
1544     }
1545     CmiNodeAllBarrier();
1546     return 0;
1547 }
1548
1549
1550 #if CMK_MEM_CHECKPOINT
1551
1552 void mpi_restart_crashed(int pe, int rank)
1553 {
1554     int vals[2];
1555     vals[0] = pe;
1556     vals[1] = CpvAccess(_curRestartPhase)+1;
1557     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1558     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1559 }
1560
1561 /* notify spare processors to exit */
1562 void mpi_end_spare()
1563 {
1564     int i;
1565     for (i=nextrank; i<total_pes; i++) {
1566         int vals[2] = {-1,-1};
1567         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1568     }
1569 }
1570
1571 int find_spare_mpirank(int pe)
1572 {
1573     if (nextrank == total_pes) {
1574       CmiAbort("Charm++> ran out of spare processors");
1575     }
1576     petorank[pe] = nextrank;
1577     nextrank++;
1578     return nextrank-1;
1579 }
1580
1581 void CkDieNow()
1582 {
1583     CmiPrintf("[%d] die now.\n", CmiMyPe());
1584
1585       /* release old messages */
1586     while (!CmiAllAsyncMsgsSent()) {
1587         PumpMsgs();
1588         CmiReleaseSentMessages();
1589     }
1590     MPI_Barrier(MPI_COMM_WORLD);
1591     MPI_Finalize();
1592     exit(0);
1593 }
1594
1595 #endif
1596
1597 /*@}*/
1598