demonstrate inmem checkpoint restart on MPI layer:
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int *ranktope = NULL;
212 int  nextrank;
213 void mpi_end_crashed();
214 #endif
215
216 /* =====Beginning of Declarations of Machine Specific Functions===== */
217 /* Utility functions */
218 #if CMK_BLUEGENEL
219 extern void MPID_Progress_test();
220 #endif
221 static size_t CmiAllAsyncMsgsSent(void);
222 static void CmiReleaseSentMessages(void);
223 static int PumpMsgs(void);
224 static void PumpMsgsBlocking(void);
225
226 #if CMK_SMP
227 static int MsgQueueEmpty();
228 static int RecvQueueEmpty();
229 static int SendMsgBuf();
230 static  void EnqueueMsg(void *m, int size, int node, int mode);
231 #endif
232
233 /* The machine-specific send function */
234 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
235 #define LrtsSendFunc MachineSpecificSendForMPI
236
237 /* ### Beginning of Machine-startup Related Functions ### */
238 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
239 #define LrtsInit MachineInitForMPI
240
241 static void MachinePreCommonInitForMPI(int everReturn);
242 static void MachinePostCommonInitForMPI(int everReturn);
243 #define LrtsPreCommonInit MachinePreCommonInitForMPI
244 #define LrtsPostCommonInit MachinePostCommonInitForMPI
245 /* ### End of Machine-startup Related Functions ### */
246
247 /* ### Beginning of Machine-running Related Functions ### */
248 static void AdvanceCommunicationForMPI();
249 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
250
251 static void DrainResourcesForMPI(); /* used when exit */
252 #define LrtsDrainResources DrainResourcesForMPI
253
254 static void MachineExitForMPI();
255 #define LrtsExit MachineExitForMPI
256 /* ### End of Machine-running Related Functions ### */
257
258 /* ### Beginning of Idle-state Related Functions ### */
259 void CmiNotifyIdleForMPI(void);
260 /* ### End of Idle-state Related Functions ### */
261
262 static void MachinePostNonLocalForMPI();
263 #define LrtsPostNonLocal MachinePostNonLocalForMPI
264
265 /* =====End of Declarations of Machine Specific Functions===== */
266
267 /**
268  *  Macros that overwrites the common codes, such as
269  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
270  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
271  *  CMK_OFFLOAD_BCAST_PROCESS etc.
272  */
273 #define CMK_HAS_SIZE_IN_MSGHDR 0
274 #include "machine-lrts.h"
275 #include "machine-common-core.c"
276
277 /* The machine specific msg-sending function */
278
279 #if CMK_SMP
280 static void EnqueueMsg(void *m, int size, int node, int mode) {
281     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
282     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
283     msg_tmp->msg = m;
284     msg_tmp->size = size;
285     msg_tmp->destpe = node;
286     msg_tmp->next = 0;
287     msg_tmp->mode = mode;
288
289 #if CMK_SMP_TRACE_COMMTHREAD
290     msg_tmp->srcpe = CmiMyPe();
291 #endif
292
293 #if MULTI_SENDQUEUE
294     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
295 #else
296     /*CmiLock(sendMsgBufLock);*/
297     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
298     /*CmiUnlock(sendMsgBufLock);*/
299 #endif
300
301     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
302 }
303 #endif
304
305 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
306 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
307     int node = smsg->destpe;
308     int size = smsg->size;
309     char *msg = smsg->msg;
310     int mode = smsg->mode;
311
312     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
313 #if CMK_ERROR_CHECKING
314     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
315     CMI_SET_CHECKSUM(msg, size);
316 #endif
317
318 #if MPI_POST_RECV
319     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
320         START_EVENT();
321         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
322             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
323         /*END_EVENT(40);*/
324     } else {
325         START_EVENT();
326         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
327             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
328         /*END_EVENT(40);*/
329     }
330 #else
331     START_EVENT();
332 #if CMK_MEM_CHECKPOINT
333     int dstrank = petorank[node];
334 #else
335     int dstrank = node;
336 #endif
337     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
338         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
339     /*END_EVENT(40);*/
340 #endif
341
342 #if CMK_SMP_TRACE_COMMTHREAD
343     traceBeginCommOp(msg);
344     traceChangeLastTimestamp(CpvAccess(projTraceStart));
345     /* traceSendMsgComm must execute after traceBeginCommOp because
346          * we pretend we execute an entry method, and inside this we
347          * pretend we will send another message. Otherwise how could
348          * a message creation just before an entry method invocation?
349          * If such logic is broken, the projections will not trace
350          * messages correctly! -Chao Mei
351          */
352     traceSendMsgComm(msg);
353     traceEndCommOp(msg);
354 #if CMI_MPI_TRACE_MOREDETAILED
355     char tmp[64];
356     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
357     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
358 #endif
359 #endif
360
361     MACHSTATE(3,"}MPI_send end");
362     CpvAccess(MsgQueueLen)++;
363     if (CpvAccess(sent_msgs)==0)
364         CpvAccess(sent_msgs) = smsg;
365     else
366         CpvAccess(end_sent)->next = smsg;
367     CpvAccess(end_sent) = smsg;
368
369 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
370     if (mode == P2P_SYNC || mode == P2P_ASYNC)
371     {
372     while (CpvAccess(MsgQueueLen) > request_max) {
373         CmiReleaseSentMessages();
374         PumpMsgs();
375     }
376     }
377 #endif
378
379     return (CmiCommHandle) &(smsg->req);
380 }
381
382 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
383     /* Ignoring the mode for MPI layer */
384
385     CmiState cs = CmiGetState();
386     SMSG_LIST *msg_tmp;
387     int  rank;
388
389     CmiAssert(destNode != CmiMyNode());
390 #if CMK_SMP
391     if (_thread_provided != MPI_THREAD_MULTIPLE) {
392       EnqueueMsg(msg, size, destNode, mode);
393       return 0;
394     }
395 #endif
396     /* non smp */
397     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
398     msg_tmp->msg = msg;
399     msg_tmp->destpe = destNode;
400     msg_tmp->size = size;
401     msg_tmp->next = 0;
402     msg_tmp->mode = mode;
403     return MPISendOneMsg(msg_tmp);
404 }
405
406 static size_t CmiAllAsyncMsgsSent(void) {
407     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
408     MPI_Status sts;
409     int done;
410
411     while (msg_tmp!=0) {
412         done = 0;
413         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
414             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
415         if (!done)
416             return 0;
417         msg_tmp = msg_tmp->next;
418         /*    MsgQueueLen--; ????? */
419     }
420     return 1;
421 }
422
423 int CmiAsyncMsgSent(CmiCommHandle c) {
424
425     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
426     int done;
427     MPI_Status sts;
428
429     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
430         msg_tmp = msg_tmp->next;
431     if (msg_tmp) {
432         done = 0;
433         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
434             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
435         return ((done)?1:0);
436     } else {
437         return 1;
438     }
439 }
440
441 void CmiReleaseCommHandle(CmiCommHandle c) {
442     return;
443 }
444
445 /* ######Beginning of functions related with communication progress ###### */
446 static void CmiReleaseSentMessages(void) {
447     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
448     SMSG_LIST *prev=0;
449     SMSG_LIST *temp;
450     int done;
451     MPI_Status sts;
452
453 #if CMK_BLUEGENEL
454     MPID_Progress_test();
455 #endif
456
457     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
458     while (msg_tmp!=0) {
459         done =0;
460 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
461         double startT = CmiWallTimer();
462 #endif
463         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
464             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
465         if (done) {
466             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
467             CpvAccess(MsgQueueLen)--;
468             /* Release the message */
469             temp = msg_tmp->next;
470             if (prev==0) /* first message */
471                 CpvAccess(sent_msgs) = temp;
472             else
473                 prev->next = temp;
474             CmiFree(msg_tmp->msg);
475             CmiFree(msg_tmp);
476             msg_tmp = temp;
477         } else {
478             prev = msg_tmp;
479             msg_tmp = msg_tmp->next;
480         }
481 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
482         {
483             double endT = CmiWallTimer();
484             /* only record the event if it takes more than 1ms */
485             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
486         }
487 #endif
488     }
489     CpvAccess(end_sent) = prev;
490     MACHSTATE(2,"} CmiReleaseSentMessages end");
491 }
492
493 static int PumpMsgs(void) {
494     int nbytes, flg, res;
495     char *msg;
496     MPI_Status sts;
497     int recd=0;
498
499 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
500     int recvCnt=0;
501 #endif
502
503 #if CMK_BLUEGENEL
504     MPID_Progress_test();
505 #endif
506
507     MACHSTATE(2,"PumpMsgs begin {");
508
509 #if CMI_DYNAMIC_EXERT_CAP
510     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
511 #endif
512
513     while (1) {
514 #if CMI_EXERT_RECV_CAP
515         if (recvCnt==RECV_CAP) break;
516 #elif CMI_DYNAMIC_EXERT_CAP
517         if (recvCnt >= dynamicRecvCap) break;
518 #endif
519
520         /* First check posted recvs then do  probe unmatched outstanding messages */
521 #if MPI_POST_RECV
522         int completed_index=-1;
523         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
524             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
525         if (flg) {
526             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
527                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
528
529             recd = 1;
530             msg = (char *) CmiAlloc(nbytes);
531             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
532             /* and repost the recv */
533
534             START_EVENT();
535
536             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
537                                            MPI_POST_RECV_SIZE,
538                                            MPI_BYTE,
539                                            MPI_ANY_SOURCE,
540                                            POST_RECV_TAG,
541                                            MPI_COMM_WORLD,
542                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
543                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
544
545             END_EVENT(50);
546
547             CpvAccess(Cmi_posted_recv_total)++;
548         } else {
549             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
550             if (res != MPI_SUCCESS)
551                 CmiAbort("MPI_Iprobe failed\n");
552             if (!flg) break;
553             recd = 1;
554             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
555             msg = (char *) CmiAlloc(nbytes);
556
557             START_EVENT();
558
559             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
560                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
561
562             END_EVENT(30);
563
564             CpvAccess(Cmi_unposted_recv_total)++;
565         }
566 #else
567         /* Original version */
568 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
569         double startT = CmiWallTimer();
570 #endif
571         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
572         if (res != MPI_SUCCESS)
573             CmiAbort("MPI_Iprobe failed\n");
574
575         if (!flg) break;
576 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
577         {
578             double endT = CmiWallTimer();
579             /* only trace the probe that last longer than 1ms */
580             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
581         }
582 #endif
583
584         recd = 1;
585         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
586         msg = (char *) CmiAlloc(nbytes);
587
588         START_EVENT();
589
590         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
591             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
592
593         /*END_EVENT(30);*/
594
595 #endif
596
597 #if CMK_SMP_TRACE_COMMTHREAD
598         traceBeginCommOp(msg);
599         traceChangeLastTimestamp(CpvAccess(projTraceStart));
600         traceEndCommOp(msg);
601 #if CMI_MPI_TRACE_MOREDETAILED
602         char tmp[32];
603         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
604         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
605 #endif
606 #elif CMK_TRACE_COMMOVERHEAD
607         char tmp[32];
608         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
609         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
610 #endif
611
612
613         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
614         CMI_CHECK_CHECKSUM(msg, nbytes);
615 #if CMK_ERROR_CHECKING
616         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
617             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
618             CmiFree(msg);
619             CmiAbort("Abort!\n");
620             continue;
621         }
622 #endif
623
624         handleOneRecvedMsg(nbytes, msg);
625
626 #if CMI_EXERT_RECV_CAP
627         recvCnt++;
628 #elif CMI_DYNAMIC_EXERT_CAP
629         recvCnt++;
630 #if CMK_SMP
631         /* check sendMsgBuf to get the  number of messages that have not been sent
632              * which is only available in SMP mode
633          * MsgQueueLen indicates the number of messages that have not been released
634              * by MPI
635              */
636         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
637                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
638             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
639         }
640 #else
641         /* MsgQueueLen indicates the number of messages that have not been released
642              * by MPI
643              */
644         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
645             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
646         }
647 #endif
648
649 #endif
650
651     }
652
653     MACHSTATE(2,"} PumpMsgs end ");
654     return recd;
655 }
656
657 /* blocking version */
658 static void PumpMsgsBlocking(void) {
659     static int maxbytes = 20000000;
660     static char *buf = NULL;
661     int nbytes, flg;
662     MPI_Status sts;
663     char *msg;
664     int recd=0;
665
666     if (!PCQueueEmpty(CmiGetState()->recv)) return;
667     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
668     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
669     if (CpvAccess(sent_msgs))  return;
670
671 #if 0
672     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
673 #endif
674
675     if (buf == NULL) {
676         buf = (char *) CmiAlloc(maxbytes);
677         _MEMCHECK(buf);
678     }
679
680
681 #if MPI_POST_RECV
682 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
683     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
684 #endif
685
686     START_EVENT();
687
688     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
689         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
690
691     /*END_EVENT(30);*/
692
693     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
694     msg = (char *) CmiAlloc(nbytes);
695     memcpy(msg, buf, nbytes);
696
697 #if CMK_SMP_TRACE_COMMTHREAD
698     traceBeginCommOp(msg);
699     traceChangeLastTimestamp(CpvAccess(projTraceStart));
700     traceEndCommOp(msg);
701 #if CMI_MPI_TRACE_MOREDETAILED
702     char tmp[32];
703     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
704     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
705 #endif
706 #endif
707
708     handleOneRecvedMsg(nbytes, msg);
709 }
710
711
712 #if CMK_SMP
713
714 /* called by communication thread in SMP */
715 static int SendMsgBuf() {
716     SMSG_LIST *msg_tmp;
717     char *msg;
718     int node, rank, size;
719     int i;
720     int sent = 0;
721
722 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
723     int sentCnt = 0;
724 #endif
725
726 #if CMI_DYNAMIC_EXERT_CAP
727     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
728 #endif
729
730     MACHSTATE(2,"SendMsgBuf begin {");
731 #if MULTI_SENDQUEUE
732     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
733         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
734             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
735 #else
736     /* single message sending queue */
737     /* CmiLock(sendMsgBufLock); */
738     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
739     /* CmiUnlock(sendMsgBufLock); */
740     while (NULL != msg_tmp) {
741 #endif
742             MPISendOneMsg(msg_tmp);
743             sent=1;
744
745 #if CMI_EXERT_SEND_CAP
746             if (++sentCnt == SEND_CAP) break;
747 #elif CMI_DYNAMIC_EXERT_CAP
748             if (++sentCnt >= dynamicSendCap) break;
749             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
750                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
751 #endif
752
753 #if ! MULTI_SENDQUEUE
754             /* CmiLock(sendMsgBufLock); */
755             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
756             /* CmiUnlock(sendMsgBufLock); */
757 #endif
758         }
759 #if MULTI_SENDQUEUE
760     }
761 #endif
762     MACHSTATE(2,"}SendMsgBuf end ");
763     return sent;
764 }
765
766 static int MsgQueueEmpty() {
767     int i;
768 #if MULTI_SENDQUEUE
769     for (i=0; i<_Cmi_mynodesize; i++)
770         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
771 #else
772     return PCQueueEmpty(sendMsgBuf);
773 #endif
774     return 1;
775 }
776
777 /* test if all processors recv queues are empty */
778 static int RecvQueueEmpty() {
779     int i;
780     for (i=0; i<_Cmi_mynodesize; i++) {
781         CmiState cs=CmiGetStateN(i);
782         if (!PCQueueEmpty(cs->recv)) return 0;
783     }
784     return 1;
785 }
786
787
788 #define REPORT_COMM_METRICS 0
789 #if REPORT_COMM_METRICS
790 static double pumptime = 0.0;
791                          static double releasetime = 0.0;
792                                                      static double sendtime = 0.0;
793 #endif
794
795 #endif //end of CMK_SMP
796
797 static void AdvanceCommunicationForMPI() {
798 #if REPORT_COMM_METRICS
799     double t1, t2, t3, t4;
800     t1 = CmiWallTimer();
801 #endif
802
803 #if CMK_SMP
804     PumpMsgs();
805
806 #if REPORT_COMM_METRICS
807     t2 = CmiWallTimer();
808 #endif
809
810     CmiReleaseSentMessages();
811 #if REPORT_COMM_METRICS
812     t3 = CmiWallTimer();
813 #endif
814
815     SendMsgBuf();
816
817 #if REPORT_COMM_METRICS
818     t4 = CmiWallTimer();
819     pumptime += (t2-t1);
820     releasetime += (t3-t2);
821     sendtime += (t4-t3);
822 #endif
823
824 #else /* non-SMP case */
825     CmiReleaseSentMessages();
826
827 #if REPORT_COMM_METRICS
828     t2 = CmiWallTimer();
829 #endif
830     PumpMsgs();
831
832 #if REPORT_COMM_METRICS
833     t3 = CmiWallTimer();
834     pumptime += (t3-t2);
835     releasetime += (t2-t1);
836 #endif
837
838 #endif /* end of #if CMK_SMP */
839 }
840 /* ######End of functions related with communication progress ###### */
841
842 static void MachinePostNonLocalForMPI() {
843 #if !CMK_SMP
844     if (no_outstanding_sends) {
845         while (CpvAccess(MsgQueueLen)>0) {
846             AdvanceCommunicationForMPI();
847         }
848     }
849
850     /* FIXME: I don't think the following codes are needed because
851      * it repeats the same job of the next call of CmiGetNonLocal
852      */
853 #if 0
854     if (!msg) {
855         CmiReleaseSentMessages();
856         if (PumpMsgs())
857             return  PCQueuePop(cs->recv);
858         else
859             return 0;
860     }
861 #endif
862 #else
863   if (_thread_provided == MPI_THREAD_MULTIPLE) {
864         CmiReleaseSentMessages();
865         SendMsgBuf();
866   }
867 #endif
868 }
869
870 /* Idle-state related functions: called in non-smp mode */
871 void CmiNotifyIdleForMPI(void) {
872     CmiReleaseSentMessages();
873     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
874 }
875
876 /* Network progress function is used to poll the network when for
877    messages. This flushes receive buffers on some  implementations*/
878 #if CMK_MACHINE_PROGRESS_DEFINED
879 void CmiMachineProgressImpl() {
880 #if !CMK_SMP
881     PumpMsgs();
882 #if CMK_IMMEDIATE_MSG
883     CmiHandleImmediate();
884 #endif
885 #else
886     /*Not implemented yet. Communication server does not seem to be
887       thread safe, so only communication thread call it */
888     if (CmiMyRank() == CmiMyNodeSize())
889         CommunicationServerThread(0);
890 #endif
891 }
892 #endif
893
894 /* ######Beginning of functions related with exiting programs###### */
895 void DrainResourcesForMPI() {
896 #if !CMK_SMP
897     while (!CmiAllAsyncMsgsSent()) {
898         PumpMsgs();
899         CmiReleaseSentMessages();
900     }
901 #else
902     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
903         CmiReleaseSentMessages();
904         SendMsgBuf();
905         PumpMsgs();
906     }
907 #endif
908     MACHSTATE(2, "Machine exit barrier begin {");
909     START_EVENT();
910     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
911         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
912     END_EVENT(10);
913     MACHSTATE(2, "} Machine exit barrier end");
914 }
915
916 void LrtsExit() {
917 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
918     int doPrint = 0;
919 #if CMK_SMP
920     if (CmiMyNode()==0) doPrint = 1;
921 #else
922     if (CmiMyPe()==0) doPrint = 1;
923 #endif
924
925     if (doPrint) {
926 #if MPI_POST_RECV
927         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
928 #endif
929     }
930 #endif
931
932 #if REPORT_COMM_METRICS
933 #if CMK_SMP
934     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
935               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
936               pumptime, releasetime, sendtime);
937 #else
938     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
939               CmiMyPe(), pumptime, releasetime, sendtime);
940 #endif
941 #endif
942
943 #if ! CMK_AUTOBUILD
944     signal(SIGINT, signal_int);
945 #if CMK_MEM_CHECKPOINT
946     if (CmiMyPe() == 1) mpi_end_crashed();
947 #endif
948     MPI_Finalize();
949 #endif
950     exit(0);
951 }
952
953 static int machine_exit_idx;
954 static void machine_exit(char *m) {
955     EmergencyExit();
956     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
957     fflush(stdout);
958     CmiNodeBarrier();
959     if (CmiMyRank() == 0) {
960         MPI_Barrier(MPI_COMM_WORLD);
961         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
962         MPI_Abort(MPI_COMM_WORLD, 1);
963     } else {
964         while (1) CmiYield();
965     }
966 }
967
968 static void KillOnAllSigs(int sigNo) {
969     static int already_in_signal_handler = 0;
970     char *m;
971     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
972     already_in_signal_handler = 1;
973 #if CMK_CCS_AVAILABLE
974     if (CpvAccess(cmiArgDebugFlag)) {
975         CpdNotify(CPD_SIGNAL, sigNo);
976         CpdFreeze();
977     }
978 #endif
979     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
980              "Signal: %d\n",CmiMyPe(),sigNo);
981     CmiPrintStackTrace(1);
982
983     m = CmiAlloc(CmiMsgHeaderSizeBytes);
984     CmiSetHandler(m, machine_exit_idx);
985     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
986     machine_exit(m);
987 }
988 /* ######End of functions related with exiting programs###### */
989
990
991 /* ######Beginning of functions related with starting programs###### */
992 static void registerMPITraceEvents() {
993 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
994     traceRegisterUserEvent("MPI_Barrier", 10);
995     traceRegisterUserEvent("MPI_Send", 20);
996     traceRegisterUserEvent("MPI_Recv", 30);
997     traceRegisterUserEvent("MPI_Isend", 40);
998     traceRegisterUserEvent("MPI_Irecv", 50);
999     traceRegisterUserEvent("MPI_Test", 60);
1000     traceRegisterUserEvent("MPI_Iprobe", 70);
1001 #endif
1002 }
1003
1004 #if MACHINE_DEBUG_LOG
1005 FILE *debugLog = NULL;
1006 #endif
1007
1008 static char *thread_level_tostring(int thread_level) {
1009 #if CMK_MPI_INIT_THREAD
1010     switch (thread_level) {
1011     case MPI_THREAD_SINGLE:
1012         return "MPI_THREAD_SINGLE";
1013     case MPI_THREAD_FUNNELED:
1014         return "MPI_THREAD_FUNNELED";
1015     case MPI_THREAD_SERIALIZED:
1016         return "MPI_THREAD_SERIALIZED";
1017     case MPI_THREAD_MULTIPLE :
1018         return "MPI_THREAD_MULTIPLE ";
1019     default: {
1020         char *str = (char*)malloc(5);
1021         sprintf(str,"%d", thread_level);
1022         return str;
1023     }
1024     }
1025     return  "unknown";
1026 #else
1027     char *str = (char*)malloc(5);
1028     sprintf(str,"%d", thread_level);
1029     return str;
1030 #endif
1031 }
1032
1033 /**
1034  *  Obtain the number of nodes, my node id, and consuming machine layer
1035  *  specific arguments
1036  */
1037 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1038     int n,i;
1039     int ver, subver;
1040     int provided;
1041     int thread_level;
1042     int myNID;
1043     int largc=*argc;
1044     char** largv=*argv;
1045
1046 #if MACHINE_DEBUG
1047     debugLog=NULL;
1048 #endif
1049 #if CMK_USE_HP_MAIN_FIX
1050 #if FOR_CPLUS
1051     _main(largc,largv);
1052 #endif
1053 #endif
1054
1055 #if CMK_MPI_INIT_THREAD
1056 #if CMK_SMP
1057     thread_level = MPI_THREAD_MULTIPLE;
1058 #else
1059     thread_level = MPI_THREAD_SINGLE;
1060 #endif
1061     MPI_Init_thread(argc, argv, thread_level, &provided);
1062     _thread_provided = provided;
1063 #else
1064     MPI_Init(argc, argv);
1065     thread_level = 0;
1066     provided = -1;
1067 #endif
1068     largc = *argc;
1069     largv = *argv;
1070     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1071     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1072
1073     myNID = *myNodeID;
1074
1075     MPI_Get_version(&ver, &subver);
1076     if (myNID == 0) {
1077         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1078     }
1079
1080     {
1081         int debug = CmiGetArgFlag(largv,"++debug");
1082         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1083         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1084 #if CMK_HAS_GETPID
1085             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1086             fflush(stdout);
1087             if (!debug_no_pause)
1088                 sleep(15);
1089 #else
1090             printf("++debug ignored.\n");
1091 #endif
1092         }
1093     }
1094
1095
1096 #if CMK_MEM_CHECKPOINT
1097     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1098        CmiAssert(num_workpes <= *numNodes);
1099        total_pes = *numNodes;
1100        *numNodes = num_workpes;
1101     }
1102     else
1103        num_workpes = *numNodes;
1104     petorank = (int *)malloc(sizeof(int) * num_workpes);
1105     ranktope = (int *)malloc(sizeof(int) * num_workpes);
1106     for (i=0; i<num_workpes; i++)  petorank[i] = ranktope[i] = i;
1107     nextrank = num_workpes;
1108
1109     char msg[1];
1110     MPI_Status sts;
1111
1112     if (*myNodeID >= num_workpes) {
1113       int vals[2];
1114       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1115       int newpe = vals[0];
1116       CpvAccess(_curRestartPhase) = vals[1];
1117
1118       ranktope[*myNodeID] = newpe;
1119       petorank[newpe] = *myNodeID;
1120       *myNodeID = newpe;
1121       myNID = newpe;
1122
1123       // set identity
1124       // add +restartaftercrash to argv
1125       char *phase_str;
1126       char **restart_argv;
1127       int i=0;
1128       while(largv[i]!= NULL) i++;
1129       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1130       i=0;
1131       while(largv[i]!= NULL){
1132                 restart_argv[i] = largv[i];
1133                 i++;
1134       }
1135       restart_argv[i] = "+restartaftercrash";
1136       phase_str = (char*)malloc(10);
1137       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1138       restart_argv[i+1]=phase_str;
1139       restart_argv[i+2]=NULL;
1140       *argv = restart_argv;
1141       *argc = i+2;
1142       largc = *argc;
1143       largv = *argv;
1144 i=0;
1145       while(largv[i]!= NULL) { printf("%s\n", largv[i]); i++; }
1146     }
1147 #endif
1148
1149     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1150     if (idleblock && _Cmi_mynode == 0) {
1151         printf("Charm++: Running in idle blocking mode.\n");
1152     }
1153
1154 #if CMK_CHARMDEBUG
1155     /* setup signal handlers */
1156     signal(SIGSEGV, KillOnAllSigs);
1157     signal(SIGFPE, KillOnAllSigs);
1158     signal(SIGILL, KillOnAllSigs);
1159     signal_int = signal(SIGINT, KillOnAllSigs);
1160     signal(SIGTERM, KillOnAllSigs);
1161     signal(SIGABRT, KillOnAllSigs);
1162 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1163     signal(SIGQUIT, KillOnAllSigs);
1164     signal(SIGBUS, KillOnAllSigs);
1165 #   endif /*UNIX*/
1166 #endif
1167
1168 #if CMK_NO_OUTSTANDING_SENDS
1169     no_outstanding_sends=1;
1170 #endif
1171     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1172         no_outstanding_sends = 1;
1173         if (myNID == 0)
1174             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1175                    no_outstanding_sends?"":" not");
1176     }
1177
1178     request_max=MAX_QLEN;
1179     CmiGetArgInt(largv,"+requestmax",&request_max);
1180     /*printf("request max=%d\n", request_max);*/
1181
1182 #if MPI_POST_RECV
1183     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1184     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1185     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1186     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1187     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1188     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1189     if (myNID==0) {
1190         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1191                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1192     }
1193 #endif
1194
1195 #if CMI_DYNAMIC_EXERT_CAP
1196     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1197     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1198     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1199     if (myNID==0) {
1200         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1201                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1202     }
1203 #endif
1204
1205     /* checksum flag */
1206     if (CmiGetArgFlag(largv,"+checksum")) {
1207 #if CMK_ERROR_CHECKING
1208         checksum_flag = 1;
1209         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1210 #else
1211         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1212 #endif
1213     }
1214
1215     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1216     for (i=0; i<_Cmi_mynodesize+1; i++) {
1217 #if MULTI_SENDQUEUE
1218         procState[i].sendMsgBuf = PCQueueCreate();
1219 #endif
1220         procState[i].recvLock = CmiCreateLock();
1221     }
1222 #if CMK_SMP
1223 #if !MULTI_SENDQUEUE
1224     sendMsgBuf = PCQueueCreate();
1225     sendMsgBufLock = CmiCreateLock();
1226 #endif
1227 #endif
1228 }
1229
1230 static void MachinePreCommonInitForMPI(int everReturn) {
1231
1232 #if MPI_POST_RECV
1233     int doInit = 1;
1234     int i;
1235
1236 #if CMK_SMP
1237     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1238 #endif
1239
1240     /* Currently, in mpi smp, the main thread will be the comm thread, so
1241      *  only the comm thread should post recvs. Cpvs, however, need to be
1242      * created on rank 0 (the ptrs to the actual cpv memory), while
1243      * other ranks are busy waiting for this to finish. So cpv initialize
1244      * routines have to be called on every ranks, although they are only
1245      * useful on comm thread (whose rank is not zero) -Chao Mei
1246      */
1247     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1248     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1249     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1250     CpvInitialize(char*,CmiPostedRecvBuffers);
1251
1252     if (doInit) {
1253         /* Post some extra recvs to help out with incoming messages */
1254         /* On some MPIs the messages are unexpected and thus slow */
1255
1256         /* An array of request handles for posted recvs */
1257         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1258
1259         /* An array of buffers for posted recvs */
1260         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1261
1262         /* Post Recvs */
1263         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1264             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1265                                            MPI_POST_RECV_SIZE,
1266                                            MPI_BYTE,
1267                                            MPI_ANY_SOURCE,
1268                                            POST_RECV_TAG,
1269                                            MPI_COMM_WORLD,
1270                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1271                 CmiAbort("MPI_Irecv failed\n");
1272         }
1273     }
1274 #endif
1275
1276 }
1277
1278 static void MachinePostCommonInitForMPI(int everReturn) {
1279
1280     CmiIdleState *s=CmiNotifyGetState();
1281
1282     CpvInitialize(SMSG_LIST *, sent_msgs);
1283     CpvInitialize(SMSG_LIST *, end_sent);
1284     CpvInitialize(int, MsgQueueLen);
1285     CpvAccess(sent_msgs) = NULL;
1286     CpvAccess(end_sent) = NULL;
1287     CpvAccess(MsgQueueLen) = 0;
1288
1289     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1290
1291 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1292     CpvInitialize(double, projTraceStart);
1293     /* only PE 0 needs to care about registration (to generate sts file). */
1294     if (CmiMyPe() == 0) {
1295         registerMachineUserEventsFunction(&registerMPITraceEvents);
1296     }
1297 #endif
1298
1299 #if CMK_SMP
1300     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1301     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1302     if (_thread_provided == MPI_THREAD_MULTIPLE)
1303       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1304 #else
1305     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1306 #endif
1307
1308 #if MACHINE_DEBUG_LOG
1309     if (CmiMyRank() == 0) {
1310         char ln[200];
1311         sprintf(ln,"debugLog.%d",CmiMyNode());
1312         debugLog=fopen(ln,"w");
1313     }
1314 #endif
1315 }
1316 /* ######End of functions related with starting programs###### */
1317
1318 /***********************************************************************
1319  *
1320  * Abort function:
1321  *
1322  ************************************************************************/
1323
1324 void CmiAbort(const char *message) {
1325     char *m;
1326     /* if CharmDebug is attached simply try to send a message to it */
1327 #if CMK_CCS_AVAILABLE
1328     if (CpvAccess(cmiArgDebugFlag)) {
1329         CpdNotify(CPD_ABORT, message);
1330         CpdFreeze();
1331     }
1332 #endif
1333     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1334              "Reason: %s\n",CmiMyPe(),message);
1335     /*  CmiError(message); */
1336     CmiPrintStackTrace(0);
1337     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1338     CmiSetHandler(m, machine_exit_idx);
1339     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1340     machine_exit(m);
1341     /* Program never reaches here */
1342     MPI_Abort(MPI_COMM_WORLD, 1);
1343 }
1344
1345 /**************************  TIMER FUNCTIONS **************************/
1346 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1347
1348 /* MPI calls are not threadsafe, even the timer on some machines */
1349 static CmiNodeLock  timerLock = 0;
1350                                 static int _absoluteTime = 0;
1351                                                            static double starttimer = 0;
1352                                                                                       static int _is_global = 0;
1353
1354 int CmiTimerIsSynchronized() {
1355     int  flag;
1356     void *v;
1357
1358     /*  check if it using synchronized timer */
1359     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1360         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1361     if (flag) {
1362         _is_global = *(int*)v;
1363         if (_is_global && CmiMyPe() == 0)
1364             printf("Charm++> MPI timer is synchronized\n");
1365     }
1366     return _is_global;
1367 }
1368
1369 int CmiTimerAbsolute() {
1370     return _absoluteTime;
1371 }
1372
1373 double CmiStartTimer() {
1374     return 0.0;
1375 }
1376
1377 double CmiInitTime() {
1378     return starttimer;
1379 }
1380
1381 void CmiTimerInit(char **argv) {
1382     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1383     if (_absoluteTime && CmiMyPe() == 0)
1384         printf("Charm++> absolute MPI timer is used\n");
1385
1386     _is_global = CmiTimerIsSynchronized();
1387
1388     if (_is_global) {
1389         if (CmiMyRank() == 0) {
1390             double minTimer;
1391 #if CMK_TIMER_USE_XT3_DCLOCK
1392             starttimer = dclock();
1393 #else
1394             starttimer = MPI_Wtime();
1395 #endif
1396
1397             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1398                           MPI_COMM_WORLD );
1399             starttimer = minTimer;
1400         }
1401     } else { /* we don't have a synchronous timer, set our own start time */
1402 #if ! CMK_MEM_CHECKPOINT
1403         CmiBarrier();
1404         CmiBarrier();
1405         CmiBarrier();
1406 #endif
1407 #if CMK_TIMER_USE_XT3_DCLOCK
1408         starttimer = dclock();
1409 #else
1410         starttimer = MPI_Wtime();
1411 #endif
1412     }
1413
1414 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1415     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1416         timerLock = CmiCreateLock();
1417 #endif
1418     CmiNodeAllBarrier();          /* for smp */
1419 }
1420
1421 /**
1422  * Since the timerLock is never created, and is
1423  * always NULL, then all the if-condition inside
1424  * the timer functions could be disabled right
1425  * now in the case of SMP. --Chao Mei
1426  */
1427 double CmiTimer(void) {
1428     double t;
1429 #if 0 && CMK_SMP
1430     if (timerLock) CmiLock(timerLock);
1431 #endif
1432
1433 #if CMK_TIMER_USE_XT3_DCLOCK
1434     t = dclock();
1435 #else
1436     t = MPI_Wtime();
1437 #endif
1438
1439 #if 0 && CMK_SMP
1440     if (timerLock) CmiUnlock(timerLock);
1441 #endif
1442
1443     return _absoluteTime?t: (t-starttimer);
1444 }
1445
1446 double CmiWallTimer(void) {
1447     double t;
1448 #if 0 && CMK_SMP
1449     if (timerLock) CmiLock(timerLock);
1450 #endif
1451
1452 #if CMK_TIMER_USE_XT3_DCLOCK
1453     t = dclock();
1454 #else
1455     t = MPI_Wtime();
1456 #endif
1457
1458 #if 0 && CMK_SMP
1459     if (timerLock) CmiUnlock(timerLock);
1460 #endif
1461
1462     return _absoluteTime? t: (t-starttimer);
1463 }
1464
1465 double CmiCpuTimer(void) {
1466     double t;
1467 #if 0 && CMK_SMP
1468     if (timerLock) CmiLock(timerLock);
1469 #endif
1470 #if CMK_TIMER_USE_XT3_DCLOCK
1471     t = dclock() - starttimer;
1472 #else
1473     t = MPI_Wtime() - starttimer;
1474 #endif
1475 #if 0 && CMK_SMP
1476     if (timerLock) CmiUnlock(timerLock);
1477 #endif
1478     return t;
1479 }
1480
1481 #endif     /* CMK_TIMER_USE_SPECIAL */
1482
1483 /************Barrier Related Functions****************/
1484 /* must be called on all ranks including comm thread in SMP */
1485 int CmiBarrier() {
1486 #if CMK_SMP
1487     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1488     CmiNodeAllBarrier();
1489     if (CmiMyRank() == CmiMyNodeSize())
1490 #else
1491     if (CmiMyRank() == 0)
1492 #endif
1493     {
1494         /**
1495          *  The call of CmiBarrier is usually before the initialization
1496          *  of trace module of Charm++, therefore, the START_EVENT
1497          *  and END_EVENT are disabled here. -Chao Mei
1498          */
1499         /*START_EVENT();*/
1500
1501         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1502             CmiAbort("Timernit: MPI_Barrier failed!\n");
1503
1504         /*END_EVENT(10);*/
1505     }
1506     CmiNodeAllBarrier();
1507     return 0;
1508 }
1509
1510 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1511 int CmiBarrierZero() {
1512     int i;
1513 #if CMK_SMP
1514     if (CmiMyRank() == CmiMyNodeSize())
1515 #else
1516     if (CmiMyRank() == 0)
1517 #endif
1518     {
1519         char msg[1];
1520         MPI_Status sts;
1521         if (CmiMyNode() == 0)  {
1522             for (i=0; i<CmiNumNodes()-1; i++) {
1523                 START_EVENT();
1524
1525                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1526                     CmiPrintf("MPI_Recv failed!\n");
1527
1528                 END_EVENT(30);
1529             }
1530         } else {
1531             START_EVENT();
1532
1533             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1534                 printf("MPI_Send failed!\n");
1535
1536             END_EVENT(20);
1537         }
1538     }
1539     CmiNodeAllBarrier();
1540     return 0;
1541 }
1542
1543 #if CMK_MEM_CHECKPOINT
1544
1545 extern void mpi_restart(int argc, char **argv);
1546
1547 void mpi_restart_crashed(int pe, int rank)
1548 {
1549     int vals[2];
1550     vals[0] = pe;
1551     vals[1] = CpvAccess(_curRestartPhase)+1;
1552     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1553 }
1554
1555 void mpi_end_crashed()
1556 {
1557     int i;
1558     for (i=0; i<total_pes; i++) {
1559       if (ranktope[i] == -1) {
1560         char msg[1];
1561 printf("[%d] end crash: %d\n", CmiMyPe(), i);
1562         MPI_Send((void *)msg,1,MPI_BYTE,i,FAIL_TAG,MPI_COMM_WORLD);
1563       }
1564     }
1565 }
1566
1567 int find_spare_mpirank(int pe)
1568 {
1569     if (nextrank == total_pes) {
1570       CmiAbort("Charm++> ran out of spared processors");
1571     }
1572     ranktope[petorank[pe]] = -1;
1573     petorank[pe] = nextrank;
1574     ranktope[nextrank] = pe;
1575     nextrank++;
1576     return nextrank-1;
1577 }
1578
1579 void CkDieNow()
1580 {
1581     char msg[1];
1582     char phase_str[10];
1583     MPI_Status sts;
1584
1585     CmiPrintf("[%d] die now\n", CmiMyPe());
1586
1587     MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1588     MPI_Finalize();
1589
1590 #if 0
1591     CmiPrintf("[%d] Restarted\n", CmiMyPe());
1592
1593     char **restart_argv;
1594     int i=0;
1595     while(Cmi_argvcopy[i]!= NULL) i++;
1596     restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1597     i=0;
1598     while(Cmi_argvcopy[i]!= NULL){
1599                 restart_argv[i] = Cmi_argvcopy[i];
1600                 i++;
1601     }
1602     restart_argv[i] = "+restartaftercrash";
1603     sprintf(phase_str,"%d", ++CpvAccess(_curRestartPhase));
1604     restart_argv[i+1]=phase_str;
1605     restart_argv[i+2]=NULL;
1606
1607     mpi_restart(CmiGetArgc(restart_argv), restart_argv);
1608 #endif
1609 }
1610 #endif
1611
1612 /*@}*/
1613