minor cleanup
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310
311     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
312 #if CMK_ERROR_CHECKING
313     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
314     CMI_SET_CHECKSUM(msg, size);
315 #endif
316
317 #if MPI_POST_RECV
318     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
319         START_EVENT();
320         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
321             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
322         /*END_EVENT(40);*/
323     } else {
324         START_EVENT();
325         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
326             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
327         /*END_EVENT(40);*/
328     }
329 #else
330     START_EVENT();
331 #if CMK_MEM_CHECKPOINT
332     int dstrank = petorank[node];
333 #else
334     int dstrank = node;
335 #endif
336     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
337         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
338     /*END_EVENT(40);*/
339 #endif
340
341 #if CMK_SMP_TRACE_COMMTHREAD
342     traceBeginCommOp(msg);
343     traceChangeLastTimestamp(CpvAccess(projTraceStart));
344     /* traceSendMsgComm must execute after traceBeginCommOp because
345          * we pretend we execute an entry method, and inside this we
346          * pretend we will send another message. Otherwise how could
347          * a message creation just before an entry method invocation?
348          * If such logic is broken, the projections will not trace
349          * messages correctly! -Chao Mei
350          */
351     traceSendMsgComm(msg);
352     traceEndCommOp(msg);
353 #if CMI_MPI_TRACE_MOREDETAILED
354     char tmp[64];
355     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
356     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
357 #endif
358 #endif
359
360     MACHSTATE(3,"}MPI_send end");
361     CpvAccess(MsgQueueLen)++;
362     if (CpvAccess(sent_msgs)==0)
363         CpvAccess(sent_msgs) = smsg;
364     else
365         CpvAccess(end_sent)->next = smsg;
366     CpvAccess(end_sent) = smsg;
367
368 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
369     if (mode == P2P_SYNC || mode == P2P_ASYNC)
370     {
371     while (CpvAccess(MsgQueueLen) > request_max) {
372         CmiReleaseSentMessages();
373         PumpMsgs();
374     }
375     }
376 #endif
377
378     return (CmiCommHandle) &(smsg->req);
379 }
380
381 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
382     /* Ignoring the mode for MPI layer */
383
384     CmiState cs = CmiGetState();
385     SMSG_LIST *msg_tmp;
386     int  rank;
387
388     CmiAssert(destNode != CmiMyNode());
389 #if CMK_SMP
390     if (_thread_provided != MPI_THREAD_MULTIPLE) {
391       EnqueueMsg(msg, size, destNode, mode);
392       return 0;
393     }
394 #endif
395     /* non smp */
396     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
397     msg_tmp->msg = msg;
398     msg_tmp->destpe = destNode;
399     msg_tmp->size = size;
400     msg_tmp->next = 0;
401     msg_tmp->mode = mode;
402     return MPISendOneMsg(msg_tmp);
403 }
404
405 static size_t CmiAllAsyncMsgsSent(void) {
406     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
407     MPI_Status sts;
408     int done;
409
410     while (msg_tmp!=0) {
411         done = 0;
412         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
413             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
414         if (!done)
415             return 0;
416         msg_tmp = msg_tmp->next;
417         /*    MsgQueueLen--; ????? */
418     }
419     return 1;
420 }
421
422 int CmiAsyncMsgSent(CmiCommHandle c) {
423
424     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
425     int done;
426     MPI_Status sts;
427
428     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
429         msg_tmp = msg_tmp->next;
430     if (msg_tmp) {
431         done = 0;
432         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
433             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
434         return ((done)?1:0);
435     } else {
436         return 1;
437     }
438 }
439
440 void CmiReleaseCommHandle(CmiCommHandle c) {
441     return;
442 }
443
444 /* ######Beginning of functions related with communication progress ###### */
445 static void CmiReleaseSentMessages(void) {
446     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
447     SMSG_LIST *prev=0;
448     SMSG_LIST *temp;
449     int done;
450     MPI_Status sts;
451
452 #if CMK_BLUEGENEL
453     MPID_Progress_test();
454 #endif
455
456     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
457     while (msg_tmp!=0) {
458         done =0;
459 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
460         double startT = CmiWallTimer();
461 #endif
462         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
463             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
464         if (done) {
465             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
466             CpvAccess(MsgQueueLen)--;
467             /* Release the message */
468             temp = msg_tmp->next;
469             if (prev==0) /* first message */
470                 CpvAccess(sent_msgs) = temp;
471             else
472                 prev->next = temp;
473             CmiFree(msg_tmp->msg);
474             CmiFree(msg_tmp);
475             msg_tmp = temp;
476         } else {
477             prev = msg_tmp;
478             msg_tmp = msg_tmp->next;
479         }
480 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
481         {
482             double endT = CmiWallTimer();
483             /* only record the event if it takes more than 1ms */
484             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
485         }
486 #endif
487     }
488     CpvAccess(end_sent) = prev;
489     MACHSTATE(2,"} CmiReleaseSentMessages end");
490 }
491
492 static int PumpMsgs(void) {
493     int nbytes, flg, res;
494     char *msg;
495     MPI_Status sts;
496     int recd=0;
497
498 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
499     int recvCnt=0;
500 #endif
501
502 #if CMK_BLUEGENEL
503     MPID_Progress_test();
504 #endif
505
506     MACHSTATE(2,"PumpMsgs begin {");
507
508 #if CMI_DYNAMIC_EXERT_CAP
509     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
510 #endif
511
512     while (1) {
513 #if CMI_EXERT_RECV_CAP
514         if (recvCnt==RECV_CAP) break;
515 #elif CMI_DYNAMIC_EXERT_CAP
516         if (recvCnt >= dynamicRecvCap) break;
517 #endif
518
519         /* First check posted recvs then do  probe unmatched outstanding messages */
520 #if MPI_POST_RECV
521         int completed_index=-1;
522         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
523             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
524         if (flg) {
525             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
526                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
527
528             recd = 1;
529             msg = (char *) CmiAlloc(nbytes);
530             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
531             /* and repost the recv */
532
533             START_EVENT();
534
535             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
536                                            MPI_POST_RECV_SIZE,
537                                            MPI_BYTE,
538                                            MPI_ANY_SOURCE,
539                                            POST_RECV_TAG,
540                                            MPI_COMM_WORLD,
541                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
542                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
543
544             END_EVENT(50);
545
546             CpvAccess(Cmi_posted_recv_total)++;
547         } else {
548             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
549             if (res != MPI_SUCCESS)
550                 CmiAbort("MPI_Iprobe failed\n");
551             if (!flg) break;
552             recd = 1;
553             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
554             msg = (char *) CmiAlloc(nbytes);
555
556             START_EVENT();
557
558             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
559                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
560
561             END_EVENT(30);
562
563             CpvAccess(Cmi_unposted_recv_total)++;
564         }
565 #else
566         /* Original version */
567 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
568         double startT = CmiWallTimer();
569 #endif
570         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
571         if (res != MPI_SUCCESS)
572             CmiAbort("MPI_Iprobe failed\n");
573
574         if (!flg) break;
575 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
576         {
577             double endT = CmiWallTimer();
578             /* only trace the probe that last longer than 1ms */
579             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
580         }
581 #endif
582
583         recd = 1;
584         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
585         msg = (char *) CmiAlloc(nbytes);
586
587         START_EVENT();
588
589         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
590             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
591
592         /*END_EVENT(30);*/
593
594 #endif
595
596 #if CMK_SMP_TRACE_COMMTHREAD
597         traceBeginCommOp(msg);
598         traceChangeLastTimestamp(CpvAccess(projTraceStart));
599         traceEndCommOp(msg);
600 #if CMI_MPI_TRACE_MOREDETAILED
601         char tmp[32];
602         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
603         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
604 #endif
605 #elif CMK_TRACE_COMMOVERHEAD
606         char tmp[32];
607         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
608         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
609 #endif
610
611
612         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
613         CMI_CHECK_CHECKSUM(msg, nbytes);
614 #if CMK_ERROR_CHECKING
615         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
616             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
617             CmiFree(msg);
618             CmiAbort("Abort!\n");
619             continue;
620         }
621 #endif
622
623         handleOneRecvedMsg(nbytes, msg);
624
625 #if CMI_EXERT_RECV_CAP
626         recvCnt++;
627 #elif CMI_DYNAMIC_EXERT_CAP
628         recvCnt++;
629 #if CMK_SMP
630         /* check sendMsgBuf to get the  number of messages that have not been sent
631              * which is only available in SMP mode
632          * MsgQueueLen indicates the number of messages that have not been released
633              * by MPI
634              */
635         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
636                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
637             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
638         }
639 #else
640         /* MsgQueueLen indicates the number of messages that have not been released
641              * by MPI
642              */
643         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
644             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
645         }
646 #endif
647
648 #endif
649
650     }
651
652     MACHSTATE(2,"} PumpMsgs end ");
653     return recd;
654 }
655
656 /* blocking version */
657 static void PumpMsgsBlocking(void) {
658     static int maxbytes = 20000000;
659     static char *buf = NULL;
660     int nbytes, flg;
661     MPI_Status sts;
662     char *msg;
663     int recd=0;
664
665     if (!PCQueueEmpty(CmiGetState()->recv)) return;
666     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
667     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
668     if (CpvAccess(sent_msgs))  return;
669
670 #if 0
671     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
672 #endif
673
674     if (buf == NULL) {
675         buf = (char *) CmiAlloc(maxbytes);
676         _MEMCHECK(buf);
677     }
678
679
680 #if MPI_POST_RECV
681 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
682     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
683 #endif
684
685     START_EVENT();
686
687     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
688         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
689
690     /*END_EVENT(30);*/
691
692     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
693     msg = (char *) CmiAlloc(nbytes);
694     memcpy(msg, buf, nbytes);
695
696 #if CMK_SMP_TRACE_COMMTHREAD
697     traceBeginCommOp(msg);
698     traceChangeLastTimestamp(CpvAccess(projTraceStart));
699     traceEndCommOp(msg);
700 #if CMI_MPI_TRACE_MOREDETAILED
701     char tmp[32];
702     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
703     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
704 #endif
705 #endif
706
707     handleOneRecvedMsg(nbytes, msg);
708 }
709
710
711 #if CMK_SMP
712
713 /* called by communication thread in SMP */
714 static int SendMsgBuf() {
715     SMSG_LIST *msg_tmp;
716     char *msg;
717     int node, rank, size;
718     int i;
719     int sent = 0;
720
721 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
722     int sentCnt = 0;
723 #endif
724
725 #if CMI_DYNAMIC_EXERT_CAP
726     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
727 #endif
728
729     MACHSTATE(2,"SendMsgBuf begin {");
730 #if MULTI_SENDQUEUE
731     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
732         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
733             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
734 #else
735     /* single message sending queue */
736     /* CmiLock(sendMsgBufLock); */
737     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
738     /* CmiUnlock(sendMsgBufLock); */
739     while (NULL != msg_tmp) {
740 #endif
741             MPISendOneMsg(msg_tmp);
742             sent=1;
743
744 #if CMI_EXERT_SEND_CAP
745             if (++sentCnt == SEND_CAP) break;
746 #elif CMI_DYNAMIC_EXERT_CAP
747             if (++sentCnt >= dynamicSendCap) break;
748             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
749                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
750 #endif
751
752 #if ! MULTI_SENDQUEUE
753             /* CmiLock(sendMsgBufLock); */
754             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
755             /* CmiUnlock(sendMsgBufLock); */
756 #endif
757         }
758 #if MULTI_SENDQUEUE
759     }
760 #endif
761     MACHSTATE(2,"}SendMsgBuf end ");
762     return sent;
763 }
764
765 static int MsgQueueEmpty() {
766     int i;
767 #if MULTI_SENDQUEUE
768     for (i=0; i<_Cmi_mynodesize; i++)
769         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
770 #else
771     return PCQueueEmpty(sendMsgBuf);
772 #endif
773     return 1;
774 }
775
776 /* test if all processors recv queues are empty */
777 static int RecvQueueEmpty() {
778     int i;
779     for (i=0; i<_Cmi_mynodesize; i++) {
780         CmiState cs=CmiGetStateN(i);
781         if (!PCQueueEmpty(cs->recv)) return 0;
782     }
783     return 1;
784 }
785
786
787 #define REPORT_COMM_METRICS 0
788 #if REPORT_COMM_METRICS
789 static double pumptime = 0.0;
790                          static double releasetime = 0.0;
791                                                      static double sendtime = 0.0;
792 #endif
793
794 #endif //end of CMK_SMP
795
796 static void AdvanceCommunicationForMPI() {
797 #if REPORT_COMM_METRICS
798     double t1, t2, t3, t4;
799     t1 = CmiWallTimer();
800 #endif
801
802 #if CMK_SMP
803     PumpMsgs();
804
805 #if REPORT_COMM_METRICS
806     t2 = CmiWallTimer();
807 #endif
808
809     CmiReleaseSentMessages();
810 #if REPORT_COMM_METRICS
811     t3 = CmiWallTimer();
812 #endif
813
814     SendMsgBuf();
815
816 #if REPORT_COMM_METRICS
817     t4 = CmiWallTimer();
818     pumptime += (t2-t1);
819     releasetime += (t3-t2);
820     sendtime += (t4-t3);
821 #endif
822
823 #else /* non-SMP case */
824     CmiReleaseSentMessages();
825
826 #if REPORT_COMM_METRICS
827     t2 = CmiWallTimer();
828 #endif
829     PumpMsgs();
830
831 #if REPORT_COMM_METRICS
832     t3 = CmiWallTimer();
833     pumptime += (t3-t2);
834     releasetime += (t2-t1);
835 #endif
836
837 #endif /* end of #if CMK_SMP */
838 }
839 /* ######End of functions related with communication progress ###### */
840
841 static void MachinePostNonLocalForMPI() {
842 #if !CMK_SMP
843     if (no_outstanding_sends) {
844         while (CpvAccess(MsgQueueLen)>0) {
845             AdvanceCommunicationForMPI();
846         }
847     }
848
849     /* FIXME: I don't think the following codes are needed because
850      * it repeats the same job of the next call of CmiGetNonLocal
851      */
852 #if 0
853     if (!msg) {
854         CmiReleaseSentMessages();
855         if (PumpMsgs())
856             return  PCQueuePop(cs->recv);
857         else
858             return 0;
859     }
860 #endif
861 #else
862   if (_thread_provided == MPI_THREAD_MULTIPLE) {
863         CmiReleaseSentMessages();
864         SendMsgBuf();
865   }
866 #endif
867 }
868
869 /* Idle-state related functions: called in non-smp mode */
870 void CmiNotifyIdleForMPI(void) {
871     CmiReleaseSentMessages();
872     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
873 }
874
875 /* Network progress function is used to poll the network when for
876    messages. This flushes receive buffers on some  implementations*/
877 #if CMK_MACHINE_PROGRESS_DEFINED
878 void CmiMachineProgressImpl() {
879 #if !CMK_SMP
880     PumpMsgs();
881 #if CMK_IMMEDIATE_MSG
882     CmiHandleImmediate();
883 #endif
884 #else
885     /*Not implemented yet. Communication server does not seem to be
886       thread safe, so only communication thread call it */
887     if (CmiMyRank() == CmiMyNodeSize())
888         CommunicationServerThread(0);
889 #endif
890 }
891 #endif
892
893 /* ######Beginning of functions related with exiting programs###### */
894 void DrainResourcesForMPI() {
895 #if !CMK_SMP
896     while (!CmiAllAsyncMsgsSent()) {
897         PumpMsgs();
898         CmiReleaseSentMessages();
899     }
900 #else
901     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
902         CmiReleaseSentMessages();
903         SendMsgBuf();
904         PumpMsgs();
905     }
906 #endif
907 #if CMK_MEM_CHECKPOINT
908     if (CmiMyPe() == 0) mpi_end_spare();
909 #endif
910     MACHSTATE(2, "Machine exit barrier begin {");
911     START_EVENT();
912     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
913         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
914     END_EVENT(10);
915     MACHSTATE(2, "} Machine exit barrier end");
916 }
917
918 void LrtsExit() {
919 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
920     int doPrint = 0;
921 #if CMK_SMP
922     if (CmiMyNode()==0) doPrint = 1;
923 #else
924     if (CmiMyPe()==0) doPrint = 1;
925 #endif
926
927     if (doPrint) {
928 #if MPI_POST_RECV
929         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
930 #endif
931     }
932 #endif
933
934 #if REPORT_COMM_METRICS
935 #if CMK_SMP
936     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
937               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
938               pumptime, releasetime, sendtime);
939 #else
940     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
941               CmiMyPe(), pumptime, releasetime, sendtime);
942 #endif
943 #endif
944
945 #if ! CMK_AUTOBUILD
946     signal(SIGINT, signal_int);
947     MPI_Finalize();
948 #endif
949     exit(0);
950 }
951
952 static int machine_exit_idx;
953 static void machine_exit(char *m) {
954     EmergencyExit();
955     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
956     fflush(stdout);
957     CmiNodeBarrier();
958     if (CmiMyRank() == 0) {
959         MPI_Barrier(MPI_COMM_WORLD);
960         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
961         MPI_Abort(MPI_COMM_WORLD, 1);
962     } else {
963         while (1) CmiYield();
964     }
965 }
966
967 static void KillOnAllSigs(int sigNo) {
968     static int already_in_signal_handler = 0;
969     char *m;
970     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
971     already_in_signal_handler = 1;
972 #if CMK_CCS_AVAILABLE
973     if (CpvAccess(cmiArgDebugFlag)) {
974         CpdNotify(CPD_SIGNAL, sigNo);
975         CpdFreeze();
976     }
977 #endif
978     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
979              "Signal: %d\n",CmiMyPe(),sigNo);
980     CmiPrintStackTrace(1);
981
982     m = CmiAlloc(CmiMsgHeaderSizeBytes);
983     CmiSetHandler(m, machine_exit_idx);
984     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
985     machine_exit(m);
986 }
987 /* ######End of functions related with exiting programs###### */
988
989
990 /* ######Beginning of functions related with starting programs###### */
991 static void registerMPITraceEvents() {
992 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
993     traceRegisterUserEvent("MPI_Barrier", 10);
994     traceRegisterUserEvent("MPI_Send", 20);
995     traceRegisterUserEvent("MPI_Recv", 30);
996     traceRegisterUserEvent("MPI_Isend", 40);
997     traceRegisterUserEvent("MPI_Irecv", 50);
998     traceRegisterUserEvent("MPI_Test", 60);
999     traceRegisterUserEvent("MPI_Iprobe", 70);
1000 #endif
1001 }
1002
1003 #if MACHINE_DEBUG_LOG
1004 FILE *debugLog = NULL;
1005 #endif
1006
1007 static char *thread_level_tostring(int thread_level) {
1008 #if CMK_MPI_INIT_THREAD
1009     switch (thread_level) {
1010     case MPI_THREAD_SINGLE:
1011         return "MPI_THREAD_SINGLE";
1012     case MPI_THREAD_FUNNELED:
1013         return "MPI_THREAD_FUNNELED";
1014     case MPI_THREAD_SERIALIZED:
1015         return "MPI_THREAD_SERIALIZED";
1016     case MPI_THREAD_MULTIPLE :
1017         return "MPI_THREAD_MULTIPLE ";
1018     default: {
1019         char *str = (char*)malloc(5);
1020         sprintf(str,"%d", thread_level);
1021         return str;
1022     }
1023     }
1024     return  "unknown";
1025 #else
1026     char *str = (char*)malloc(5);
1027     sprintf(str,"%d", thread_level);
1028     return str;
1029 #endif
1030 }
1031
1032 /**
1033  *  Obtain the number of nodes, my node id, and consuming machine layer
1034  *  specific arguments
1035  */
1036 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1037     int n,i;
1038     int ver, subver;
1039     int provided;
1040     int thread_level;
1041     int myNID;
1042     int largc=*argc;
1043     char** largv=*argv;
1044
1045 #if MACHINE_DEBUG
1046     debugLog=NULL;
1047 #endif
1048 #if CMK_USE_HP_MAIN_FIX
1049 #if FOR_CPLUS
1050     _main(largc,largv);
1051 #endif
1052 #endif
1053
1054 #if CMK_MPI_INIT_THREAD
1055 #if CMK_SMP
1056     thread_level = MPI_THREAD_MULTIPLE;
1057 #else
1058     thread_level = MPI_THREAD_SINGLE;
1059 #endif
1060     MPI_Init_thread(argc, argv, thread_level, &provided);
1061     _thread_provided = provided;
1062 #else
1063     MPI_Init(argc, argv);
1064     thread_level = 0;
1065     provided = -1;
1066 #endif
1067     largc = *argc;
1068     largv = *argv;
1069     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1070     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1071
1072     myNID = *myNodeID;
1073
1074     MPI_Get_version(&ver, &subver);
1075     if (myNID == 0) {
1076         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1077     }
1078
1079     {
1080         int debug = CmiGetArgFlag(largv,"++debug");
1081         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1082         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1083 #if CMK_HAS_GETPID
1084             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1085             fflush(stdout);
1086             if (!debug_no_pause)
1087                 sleep(15);
1088 #else
1089             printf("++debug ignored.\n");
1090 #endif
1091         }
1092     }
1093
1094
1095 #if CMK_MEM_CHECKPOINT
1096     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1097        CmiAssert(num_workpes <= *numNodes);
1098        total_pes = *numNodes;
1099        *numNodes = num_workpes;
1100     }
1101     else
1102        num_workpes = *numNodes;
1103     petorank = (int *)malloc(sizeof(int) * num_workpes);
1104     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1105     nextrank = num_workpes;
1106
1107     if (*myNodeID >= num_workpes) {    /* is spare processor */
1108       MPI_Status sts;
1109       int vals[2];
1110       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1111       int newpe = vals[0];
1112       CpvAccess(_curRestartPhase) = vals[1];
1113
1114       if (newpe == -1) {
1115           MPI_Barrier(MPI_COMM_WORLD);
1116           MPI_Finalize();
1117           exit(0);
1118       }
1119
1120         /* update petorank */
1121       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1122       nextrank = *myNodeID + 1;
1123       *myNodeID = newpe;
1124       myNID = newpe;
1125
1126        /* add +restartaftercrash to argv */
1127       char *phase_str;
1128       char **restart_argv;
1129       int i=0;
1130       while(largv[i]!= NULL) i++;
1131       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1132       i=0;
1133       while(largv[i]!= NULL){
1134                 restart_argv[i] = largv[i];
1135                 i++;
1136       }
1137       restart_argv[i] = "+restartaftercrash";
1138       phase_str = (char*)malloc(10);
1139       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1140       restart_argv[i+1]=phase_str;
1141       restart_argv[i+2]=NULL;
1142       *argv = restart_argv;
1143       *argc = i+2;
1144       largc = *argc;
1145       largv = *argv;
1146     }
1147 #endif
1148
1149     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1150     if (idleblock && _Cmi_mynode == 0) {
1151         printf("Charm++: Running in idle blocking mode.\n");
1152     }
1153
1154 #if CMK_CHARMDEBUG
1155     /* setup signal handlers */
1156     signal(SIGSEGV, KillOnAllSigs);
1157     signal(SIGFPE, KillOnAllSigs);
1158     signal(SIGILL, KillOnAllSigs);
1159     signal_int = signal(SIGINT, KillOnAllSigs);
1160     signal(SIGTERM, KillOnAllSigs);
1161     signal(SIGABRT, KillOnAllSigs);
1162 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1163     signal(SIGQUIT, KillOnAllSigs);
1164     signal(SIGBUS, KillOnAllSigs);
1165 #   endif /*UNIX*/
1166 #endif
1167
1168 #if CMK_NO_OUTSTANDING_SENDS
1169     no_outstanding_sends=1;
1170 #endif
1171     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1172         no_outstanding_sends = 1;
1173         if (myNID == 0)
1174             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1175                    no_outstanding_sends?"":" not");
1176     }
1177
1178     request_max=MAX_QLEN;
1179     CmiGetArgInt(largv,"+requestmax",&request_max);
1180     /*printf("request max=%d\n", request_max);*/
1181
1182 #if MPI_POST_RECV
1183     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1184     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1185     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1186     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1187     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1188     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1189     if (myNID==0) {
1190         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1191                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1192     }
1193 #endif
1194
1195 #if CMI_DYNAMIC_EXERT_CAP
1196     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1197     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1198     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1199     if (myNID==0) {
1200         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1201                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1202     }
1203 #endif
1204
1205     /* checksum flag */
1206     if (CmiGetArgFlag(largv,"+checksum")) {
1207 #if CMK_ERROR_CHECKING
1208         checksum_flag = 1;
1209         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1210 #else
1211         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1212 #endif
1213     }
1214
1215     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1216     for (i=0; i<_Cmi_mynodesize+1; i++) {
1217 #if MULTI_SENDQUEUE
1218         procState[i].sendMsgBuf = PCQueueCreate();
1219 #endif
1220         procState[i].recvLock = CmiCreateLock();
1221     }
1222 #if CMK_SMP
1223 #if !MULTI_SENDQUEUE
1224     sendMsgBuf = PCQueueCreate();
1225     sendMsgBufLock = CmiCreateLock();
1226 #endif
1227 #endif
1228 }
1229
1230 static void MachinePreCommonInitForMPI(int everReturn) {
1231
1232 #if MPI_POST_RECV
1233     int doInit = 1;
1234     int i;
1235
1236 #if CMK_SMP
1237     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1238 #endif
1239
1240     /* Currently, in mpi smp, the main thread will be the comm thread, so
1241      *  only the comm thread should post recvs. Cpvs, however, need to be
1242      * created on rank 0 (the ptrs to the actual cpv memory), while
1243      * other ranks are busy waiting for this to finish. So cpv initialize
1244      * routines have to be called on every ranks, although they are only
1245      * useful on comm thread (whose rank is not zero) -Chao Mei
1246      */
1247     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1248     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1249     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1250     CpvInitialize(char*,CmiPostedRecvBuffers);
1251
1252     if (doInit) {
1253         /* Post some extra recvs to help out with incoming messages */
1254         /* On some MPIs the messages are unexpected and thus slow */
1255
1256         /* An array of request handles for posted recvs */
1257         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1258
1259         /* An array of buffers for posted recvs */
1260         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1261
1262         /* Post Recvs */
1263         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1264             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1265                                            MPI_POST_RECV_SIZE,
1266                                            MPI_BYTE,
1267                                            MPI_ANY_SOURCE,
1268                                            POST_RECV_TAG,
1269                                            MPI_COMM_WORLD,
1270                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1271                 CmiAbort("MPI_Irecv failed\n");
1272         }
1273     }
1274 #endif
1275
1276 }
1277
1278 static void MachinePostCommonInitForMPI(int everReturn) {
1279
1280     CmiIdleState *s=CmiNotifyGetState();
1281
1282     CpvInitialize(SMSG_LIST *, sent_msgs);
1283     CpvInitialize(SMSG_LIST *, end_sent);
1284     CpvInitialize(int, MsgQueueLen);
1285     CpvAccess(sent_msgs) = NULL;
1286     CpvAccess(end_sent) = NULL;
1287     CpvAccess(MsgQueueLen) = 0;
1288
1289     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1290
1291 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1292     CpvInitialize(double, projTraceStart);
1293     /* only PE 0 needs to care about registration (to generate sts file). */
1294     if (CmiMyPe() == 0) {
1295         registerMachineUserEventsFunction(&registerMPITraceEvents);
1296     }
1297 #endif
1298
1299 #if CMK_SMP
1300     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1301     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1302     if (_thread_provided == MPI_THREAD_MULTIPLE)
1303       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1304 #else
1305     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1306 #endif
1307
1308 #if MACHINE_DEBUG_LOG
1309     if (CmiMyRank() == 0) {
1310         char ln[200];
1311         sprintf(ln,"debugLog.%d",CmiMyNode());
1312         debugLog=fopen(ln,"w");
1313     }
1314 #endif
1315 }
1316 /* ######End of functions related with starting programs###### */
1317
1318 /***********************************************************************
1319  *
1320  * Abort function:
1321  *
1322  ************************************************************************/
1323
1324 void CmiAbort(const char *message) {
1325     char *m;
1326     /* if CharmDebug is attached simply try to send a message to it */
1327 #if CMK_CCS_AVAILABLE
1328     if (CpvAccess(cmiArgDebugFlag)) {
1329         CpdNotify(CPD_ABORT, message);
1330         CpdFreeze();
1331     }
1332 #endif
1333     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1334              "Reason: %s\n",CmiMyPe(),message);
1335     /*  CmiError(message); */
1336     CmiPrintStackTrace(0);
1337     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1338     CmiSetHandler(m, machine_exit_idx);
1339     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1340     machine_exit(m);
1341     /* Program never reaches here */
1342     MPI_Abort(MPI_COMM_WORLD, 1);
1343 }
1344
1345 /**************************  TIMER FUNCTIONS **************************/
1346 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1347
1348 /* MPI calls are not threadsafe, even the timer on some machines */
1349 static CmiNodeLock  timerLock = 0;
1350                                 static int _absoluteTime = 0;
1351                                                            static double starttimer = 0;
1352                                                                                       static int _is_global = 0;
1353
1354 int CmiTimerIsSynchronized() {
1355     int  flag;
1356     void *v;
1357
1358     /*  check if it using synchronized timer */
1359     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1360         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1361     if (flag) {
1362         _is_global = *(int*)v;
1363         if (_is_global && CmiMyPe() == 0)
1364             printf("Charm++> MPI timer is synchronized\n");
1365     }
1366     return _is_global;
1367 }
1368
1369 int CmiTimerAbsolute() {
1370     return _absoluteTime;
1371 }
1372
1373 double CmiStartTimer() {
1374     return 0.0;
1375 }
1376
1377 double CmiInitTime() {
1378     return starttimer;
1379 }
1380
1381 void CmiTimerInit(char **argv) {
1382     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1383     if (_absoluteTime && CmiMyPe() == 0)
1384         printf("Charm++> absolute MPI timer is used\n");
1385
1386     _is_global = CmiTimerIsSynchronized();
1387
1388     if (_is_global) {
1389         if (CmiMyRank() == 0) {
1390             double minTimer;
1391 #if CMK_TIMER_USE_XT3_DCLOCK
1392             starttimer = dclock();
1393 #else
1394             starttimer = MPI_Wtime();
1395 #endif
1396
1397             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1398                           MPI_COMM_WORLD );
1399             starttimer = minTimer;
1400         }
1401     } else { /* we don't have a synchronous timer, set our own start time */
1402 #if ! CMK_MEM_CHECKPOINT
1403         CmiBarrier();
1404         CmiBarrier();
1405         CmiBarrier();
1406 #endif
1407 #if CMK_TIMER_USE_XT3_DCLOCK
1408         starttimer = dclock();
1409 #else
1410         starttimer = MPI_Wtime();
1411 #endif
1412     }
1413
1414 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1415     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1416         timerLock = CmiCreateLock();
1417 #endif
1418     CmiNodeAllBarrier();          /* for smp */
1419 }
1420
1421 /**
1422  * Since the timerLock is never created, and is
1423  * always NULL, then all the if-condition inside
1424  * the timer functions could be disabled right
1425  * now in the case of SMP. --Chao Mei
1426  */
1427 double CmiTimer(void) {
1428     double t;
1429 #if 0 && CMK_SMP
1430     if (timerLock) CmiLock(timerLock);
1431 #endif
1432
1433 #if CMK_TIMER_USE_XT3_DCLOCK
1434     t = dclock();
1435 #else
1436     t = MPI_Wtime();
1437 #endif
1438
1439 #if 0 && CMK_SMP
1440     if (timerLock) CmiUnlock(timerLock);
1441 #endif
1442
1443     return _absoluteTime?t: (t-starttimer);
1444 }
1445
1446 double CmiWallTimer(void) {
1447     double t;
1448 #if 0 && CMK_SMP
1449     if (timerLock) CmiLock(timerLock);
1450 #endif
1451
1452 #if CMK_TIMER_USE_XT3_DCLOCK
1453     t = dclock();
1454 #else
1455     t = MPI_Wtime();
1456 #endif
1457
1458 #if 0 && CMK_SMP
1459     if (timerLock) CmiUnlock(timerLock);
1460 #endif
1461
1462     return _absoluteTime? t: (t-starttimer);
1463 }
1464
1465 double CmiCpuTimer(void) {
1466     double t;
1467 #if 0 && CMK_SMP
1468     if (timerLock) CmiLock(timerLock);
1469 #endif
1470 #if CMK_TIMER_USE_XT3_DCLOCK
1471     t = dclock() - starttimer;
1472 #else
1473     t = MPI_Wtime() - starttimer;
1474 #endif
1475 #if 0 && CMK_SMP
1476     if (timerLock) CmiUnlock(timerLock);
1477 #endif
1478     return t;
1479 }
1480
1481 #endif     /* CMK_TIMER_USE_SPECIAL */
1482
1483 /************Barrier Related Functions****************/
1484 /* must be called on all ranks including comm thread in SMP */
1485 int CmiBarrier() {
1486 #if CMK_SMP
1487     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1488     CmiNodeAllBarrier();
1489     if (CmiMyRank() == CmiMyNodeSize())
1490 #else
1491     if (CmiMyRank() == 0)
1492 #endif
1493     {
1494         /**
1495          *  The call of CmiBarrier is usually before the initialization
1496          *  of trace module of Charm++, therefore, the START_EVENT
1497          *  and END_EVENT are disabled here. -Chao Mei
1498          */
1499         /*START_EVENT();*/
1500
1501         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1502             CmiAbort("Timernit: MPI_Barrier failed!\n");
1503
1504         /*END_EVENT(10);*/
1505     }
1506     CmiNodeAllBarrier();
1507     return 0;
1508 }
1509
1510 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1511 int CmiBarrierZero() {
1512     int i;
1513 #if CMK_SMP
1514     if (CmiMyRank() == CmiMyNodeSize())
1515 #else
1516     if (CmiMyRank() == 0)
1517 #endif
1518     {
1519         char msg[1];
1520         MPI_Status sts;
1521         if (CmiMyNode() == 0)  {
1522             for (i=0; i<CmiNumNodes()-1; i++) {
1523                 START_EVENT();
1524
1525                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1526                     CmiPrintf("MPI_Recv failed!\n");
1527
1528                 END_EVENT(30);
1529             }
1530         } else {
1531             START_EVENT();
1532
1533             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1534                 printf("MPI_Send failed!\n");
1535
1536             END_EVENT(20);
1537         }
1538     }
1539     CmiNodeAllBarrier();
1540     return 0;
1541 }
1542
1543
1544 #if CMK_MEM_CHECKPOINT
1545
1546 void mpi_restart_crashed(int pe, int rank)
1547 {
1548     int vals[2];
1549     vals[0] = pe;
1550     vals[1] = CpvAccess(_curRestartPhase)+1;
1551     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1552     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1553 }
1554
1555 /* notify spare processors to exit */
1556 void mpi_end_spare()
1557 {
1558     int i;
1559     for (i=nextrank; i<total_pes; i++) {
1560         int vals[2] = {-1,-1};
1561         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1562     }
1563 }
1564
1565 int find_spare_mpirank(int pe)
1566 {
1567     if (nextrank == total_pes) {
1568       CmiAbort("Charm++> ran out of spare processors");
1569     }
1570     petorank[pe] = nextrank;
1571     nextrank++;
1572     return nextrank-1;
1573 }
1574
1575 void CkDieNow()
1576 {
1577     CmiPrintf("[%d] die now.\n", CmiMyPe());
1578
1579       /* release old messages */
1580     while (!CmiAllAsyncMsgsSent()) {
1581         PumpMsgs();
1582         CmiReleaseSentMessages();
1583     }
1584     MPI_Barrier(MPI_COMM_WORLD);
1585     MPI_Finalize();
1586     exit(0);
1587 }
1588
1589 #endif
1590
1591 /*@}*/
1592