Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310     int dstrank;
311
312     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
313 #if CMK_ERROR_CHECKING
314     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
315     CMI_SET_CHECKSUM(msg, size);
316 #endif
317
318 #if MPI_POST_RECV
319     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
320         START_EVENT();
321         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
322             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
323         /*END_EVENT(40);*/
324     } else {
325         START_EVENT();
326         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
327             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
328         /*END_EVENT(40);*/
329     }
330 #else
331     START_EVENT();
332 #if CMK_MEM_CHECKPOINT
333     dstrank = petorank[node];
334 #else
335     dstrank = node;
336 #endif
337     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
338         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
339     /*END_EVENT(40);*/
340 #endif
341
342 #if CMK_SMP_TRACE_COMMTHREAD
343     traceBeginCommOp(msg);
344     traceChangeLastTimestamp(CpvAccess(projTraceStart));
345     /* traceSendMsgComm must execute after traceBeginCommOp because
346          * we pretend we execute an entry method, and inside this we
347          * pretend we will send another message. Otherwise how could
348          * a message creation just before an entry method invocation?
349          * If such logic is broken, the projections will not trace
350          * messages correctly! -Chao Mei
351          */
352     traceSendMsgComm(msg);
353     traceEndCommOp(msg);
354 #if CMI_MPI_TRACE_MOREDETAILED
355     char tmp[64];
356     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
357     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
358 #endif
359 #endif
360
361     MACHSTATE(3,"}MPI_send end");
362     CpvAccess(MsgQueueLen)++;
363     if (CpvAccess(sent_msgs)==0)
364         CpvAccess(sent_msgs) = smsg;
365     else
366         CpvAccess(end_sent)->next = smsg;
367     CpvAccess(end_sent) = smsg;
368
369 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
370     if (mode == P2P_SYNC || mode == P2P_ASYNC)
371     {
372     while (CpvAccess(MsgQueueLen) > request_max) {
373         CmiReleaseSentMessages();
374         PumpMsgs();
375     }
376     }
377 #endif
378
379     return (CmiCommHandle) &(smsg->req);
380 }
381
382 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
383     /* Ignoring the mode for MPI layer */
384
385     CmiState cs = CmiGetState();
386     SMSG_LIST *msg_tmp;
387     int  rank;
388
389     CmiAssert(destNode != CmiMyNode());
390 #if CMK_SMP
391     if (_thread_provided != MPI_THREAD_MULTIPLE) {
392       EnqueueMsg(msg, size, destNode, mode);
393       return 0;
394     }
395 #endif
396     /* non smp */
397     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
398     msg_tmp->msg = msg;
399     msg_tmp->destpe = destNode;
400     msg_tmp->size = size;
401     msg_tmp->next = 0;
402     msg_tmp->mode = mode;
403     return MPISendOneMsg(msg_tmp);
404 }
405
406 static size_t CmiAllAsyncMsgsSent(void) {
407     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
408     MPI_Status sts;
409     int done;
410
411     while (msg_tmp!=0) {
412         done = 0;
413         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
414             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
415         if (!done)
416             return 0;
417         msg_tmp = msg_tmp->next;
418         /*    MsgQueueLen--; ????? */
419     }
420     return 1;
421 }
422
423 int CmiAsyncMsgSent(CmiCommHandle c) {
424
425     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
426     int done;
427     MPI_Status sts;
428
429     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
430         msg_tmp = msg_tmp->next;
431     if (msg_tmp) {
432         done = 0;
433         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
434             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
435         return ((done)?1:0);
436     } else {
437         return 1;
438     }
439 }
440
441 void CmiReleaseCommHandle(CmiCommHandle c) {
442     return;
443 }
444
445 /* ######Beginning of functions related with communication progress ###### */
446 static void CmiReleaseSentMessages(void) {
447     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
448     SMSG_LIST *prev=0;
449     SMSG_LIST *temp;
450     int done;
451     MPI_Status sts;
452
453 #if CMK_BLUEGENEL
454     MPID_Progress_test();
455 #endif
456
457     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
458     while (msg_tmp!=0) {
459         done =0;
460 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
461         double startT = CmiWallTimer();
462 #endif
463         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
464             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
465         if (done) {
466             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
467             CpvAccess(MsgQueueLen)--;
468             /* Release the message */
469             temp = msg_tmp->next;
470             if (prev==0) /* first message */
471                 CpvAccess(sent_msgs) = temp;
472             else
473                 prev->next = temp;
474             CmiFree(msg_tmp->msg);
475             CmiFree(msg_tmp);
476             msg_tmp = temp;
477         } else {
478             prev = msg_tmp;
479             msg_tmp = msg_tmp->next;
480         }
481 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
482         {
483             double endT = CmiWallTimer();
484             /* only record the event if it takes more than 1ms */
485             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
486         }
487 #endif
488     }
489     CpvAccess(end_sent) = prev;
490     MACHSTATE(2,"} CmiReleaseSentMessages end");
491 }
492
493 static int PumpMsgs(void) {
494     int nbytes, flg, res;
495     char *msg;
496     MPI_Status sts;
497     int recd=0;
498
499 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
500     int recvCnt=0;
501 #endif
502
503 #if CMK_BLUEGENEL
504     MPID_Progress_test();
505 #endif
506
507     MACHSTATE(2,"PumpMsgs begin {");
508
509 #if CMI_DYNAMIC_EXERT_CAP
510     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
511 #endif
512
513     while (1) {
514 #if CMI_EXERT_RECV_CAP
515         if (recvCnt==RECV_CAP) break;
516 #elif CMI_DYNAMIC_EXERT_CAP
517         if (recvCnt >= dynamicRecvCap) break;
518 #endif
519
520         /* First check posted recvs then do  probe unmatched outstanding messages */
521 #if MPI_POST_RECV
522         int completed_index=-1;
523         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
524             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
525         if (flg) {
526             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
527                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
528
529             recd = 1;
530             msg = (char *) CmiAlloc(nbytes);
531             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
532             /* and repost the recv */
533
534             START_EVENT();
535
536             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
537                                            MPI_POST_RECV_SIZE,
538                                            MPI_BYTE,
539                                            MPI_ANY_SOURCE,
540                                            POST_RECV_TAG,
541                                            MPI_COMM_WORLD,
542                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
543                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
544
545             END_EVENT(50);
546
547             CpvAccess(Cmi_posted_recv_total)++;
548         } else {
549             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
550             if (res != MPI_SUCCESS)
551                 CmiAbort("MPI_Iprobe failed\n");
552             if (!flg) break;
553             recd = 1;
554             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
555             msg = (char *) CmiAlloc(nbytes);
556
557             START_EVENT();
558
559             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
560                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
561
562             END_EVENT(30);
563
564             CpvAccess(Cmi_unposted_recv_total)++;
565         }
566 #else
567         /* Original version */
568 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
569         double startT = CmiWallTimer();
570 #endif
571         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
572         if (res != MPI_SUCCESS)
573             CmiAbort("MPI_Iprobe failed\n");
574
575         if (!flg) break;
576 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
577         {
578             double endT = CmiWallTimer();
579             /* only trace the probe that last longer than 1ms */
580             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
581         }
582 #endif
583
584         recd = 1;
585         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
586         msg = (char *) CmiAlloc(nbytes);
587
588         START_EVENT();
589
590         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
591             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
592
593         /*END_EVENT(30);*/
594
595 #endif
596
597 #if CMK_SMP_TRACE_COMMTHREAD
598         traceBeginCommOp(msg);
599         traceChangeLastTimestamp(CpvAccess(projTraceStart));
600         traceEndCommOp(msg);
601 #if CMI_MPI_TRACE_MOREDETAILED
602         char tmp[32];
603         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
604         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
605 #endif
606 #elif CMK_TRACE_COMMOVERHEAD
607         char tmp[32];
608         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
609         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
610 #endif
611
612
613         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
614         CMI_CHECK_CHECKSUM(msg, nbytes);
615 #if CMK_ERROR_CHECKING
616         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
617             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
618             CmiFree(msg);
619             CmiAbort("Abort!\n");
620             continue;
621         }
622 #endif
623
624         handleOneRecvedMsg(nbytes, msg);
625
626 #if CMI_EXERT_RECV_CAP
627         recvCnt++;
628 #elif CMI_DYNAMIC_EXERT_CAP
629         recvCnt++;
630 #if CMK_SMP
631         /* check sendMsgBuf to get the  number of messages that have not been sent
632              * which is only available in SMP mode
633          * MsgQueueLen indicates the number of messages that have not been released
634              * by MPI
635              */
636         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
637                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
638             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
639         }
640 #else
641         /* MsgQueueLen indicates the number of messages that have not been released
642              * by MPI
643              */
644         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
645             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
646         }
647 #endif
648
649 #endif
650
651     }
652
653     MACHSTATE(2,"} PumpMsgs end ");
654     return recd;
655 }
656
657 /* blocking version */
658 static void PumpMsgsBlocking(void) {
659     static int maxbytes = 20000000;
660     static char *buf = NULL;
661     int nbytes, flg;
662     MPI_Status sts;
663     char *msg;
664     int recd=0;
665
666     if (!PCQueueEmpty(CmiGetState()->recv)) return;
667     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
668     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
669     if (CpvAccess(sent_msgs))  return;
670
671 #if 0
672     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
673 #endif
674
675     if (buf == NULL) {
676         buf = (char *) CmiAlloc(maxbytes);
677         _MEMCHECK(buf);
678     }
679
680
681 #if MPI_POST_RECV
682 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
683     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
684 #endif
685
686     START_EVENT();
687
688     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
689         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
690
691     /*END_EVENT(30);*/
692
693     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
694     msg = (char *) CmiAlloc(nbytes);
695     memcpy(msg, buf, nbytes);
696
697 #if CMK_SMP_TRACE_COMMTHREAD
698     traceBeginCommOp(msg);
699     traceChangeLastTimestamp(CpvAccess(projTraceStart));
700     traceEndCommOp(msg);
701 #if CMI_MPI_TRACE_MOREDETAILED
702     char tmp[32];
703     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
704     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
705 #endif
706 #endif
707
708     handleOneRecvedMsg(nbytes, msg);
709 }
710
711
712 #if CMK_SMP
713
714 /* called by communication thread in SMP */
715 static int SendMsgBuf() {
716     SMSG_LIST *msg_tmp;
717     char *msg;
718     int node, rank, size;
719     int i;
720     int sent = 0;
721
722 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
723     int sentCnt = 0;
724 #endif
725
726 #if CMI_DYNAMIC_EXERT_CAP
727     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
728 #endif
729
730     MACHSTATE(2,"SendMsgBuf begin {");
731 #if MULTI_SENDQUEUE
732     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
733         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
734             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
735 #else
736     /* single message sending queue */
737     /* CmiLock(sendMsgBufLock); */
738     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
739     /* CmiUnlock(sendMsgBufLock); */
740     while (NULL != msg_tmp) {
741 #endif
742             MPISendOneMsg(msg_tmp);
743             sent=1;
744
745 #if CMI_EXERT_SEND_CAP
746             if (++sentCnt == SEND_CAP) break;
747 #elif CMI_DYNAMIC_EXERT_CAP
748             if (++sentCnt >= dynamicSendCap) break;
749             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
750                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
751 #endif
752
753 #if ! MULTI_SENDQUEUE
754             /* CmiLock(sendMsgBufLock); */
755             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
756             /* CmiUnlock(sendMsgBufLock); */
757 #endif
758         }
759 #if MULTI_SENDQUEUE
760     }
761 #endif
762     MACHSTATE(2,"}SendMsgBuf end ");
763     return sent;
764 }
765
766 static int MsgQueueEmpty() {
767     int i;
768 #if MULTI_SENDQUEUE
769     for (i=0; i<_Cmi_mynodesize; i++)
770         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
771 #else
772     return PCQueueEmpty(sendMsgBuf);
773 #endif
774     return 1;
775 }
776
777 /* test if all processors recv queues are empty */
778 static int RecvQueueEmpty() {
779     int i;
780     for (i=0; i<_Cmi_mynodesize; i++) {
781         CmiState cs=CmiGetStateN(i);
782         if (!PCQueueEmpty(cs->recv)) return 0;
783     }
784     return 1;
785 }
786
787
788 #define REPORT_COMM_METRICS 0
789 #if REPORT_COMM_METRICS
790 static double pumptime = 0.0;
791                          static double releasetime = 0.0;
792                                                      static double sendtime = 0.0;
793 #endif
794
795 #endif //end of CMK_SMP
796
797 static void AdvanceCommunicationForMPI() {
798 #if REPORT_COMM_METRICS
799     double t1, t2, t3, t4;
800     t1 = CmiWallTimer();
801 #endif
802
803 #if CMK_SMP
804     PumpMsgs();
805
806 #if REPORT_COMM_METRICS
807     t2 = CmiWallTimer();
808 #endif
809
810     CmiReleaseSentMessages();
811 #if REPORT_COMM_METRICS
812     t3 = CmiWallTimer();
813 #endif
814
815     SendMsgBuf();
816
817 #if REPORT_COMM_METRICS
818     t4 = CmiWallTimer();
819     pumptime += (t2-t1);
820     releasetime += (t3-t2);
821     sendtime += (t4-t3);
822 #endif
823
824 #else /* non-SMP case */
825     CmiReleaseSentMessages();
826
827 #if REPORT_COMM_METRICS
828     t2 = CmiWallTimer();
829 #endif
830     PumpMsgs();
831
832 #if REPORT_COMM_METRICS
833     t3 = CmiWallTimer();
834     pumptime += (t3-t2);
835     releasetime += (t2-t1);
836 #endif
837
838 #endif /* end of #if CMK_SMP */
839 }
840 /* ######End of functions related with communication progress ###### */
841
842 static void MachinePostNonLocalForMPI() {
843 #if !CMK_SMP
844     if (no_outstanding_sends) {
845         while (CpvAccess(MsgQueueLen)>0) {
846             AdvanceCommunicationForMPI();
847         }
848     }
849
850     /* FIXME: I don't think the following codes are needed because
851      * it repeats the same job of the next call of CmiGetNonLocal
852      */
853 #if 0
854     if (!msg) {
855         CmiReleaseSentMessages();
856         if (PumpMsgs())
857             return  PCQueuePop(cs->recv);
858         else
859             return 0;
860     }
861 #endif
862 #else
863   if (_thread_provided == MPI_THREAD_MULTIPLE) {
864         CmiReleaseSentMessages();
865         SendMsgBuf();
866   }
867 #endif
868 }
869
870 /* Idle-state related functions: called in non-smp mode */
871 void CmiNotifyIdleForMPI(void) {
872     CmiReleaseSentMessages();
873     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
874 }
875
876 /* Network progress function is used to poll the network when for
877    messages. This flushes receive buffers on some  implementations*/
878 #if CMK_MACHINE_PROGRESS_DEFINED
879 void CmiMachineProgressImpl() {
880 #if !CMK_SMP
881     PumpMsgs();
882 #if CMK_IMMEDIATE_MSG
883     CmiHandleImmediate();
884 #endif
885 #else
886     /*Not implemented yet. Communication server does not seem to be
887       thread safe, so only communication thread call it */
888     if (CmiMyRank() == CmiMyNodeSize())
889         CommunicationServerThread(0);
890 #endif
891 }
892 #endif
893
894 /* ######Beginning of functions related with exiting programs###### */
895 void DrainResourcesForMPI() {
896 #if !CMK_SMP
897     while (!CmiAllAsyncMsgsSent()) {
898         PumpMsgs();
899         CmiReleaseSentMessages();
900     }
901 #else
902     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
903         CmiReleaseSentMessages();
904         SendMsgBuf();
905         PumpMsgs();
906     }
907 #endif
908 #if CMK_MEM_CHECKPOINT
909     if (CmiMyPe() == 0) mpi_end_spare();
910 #endif
911     MACHSTATE(2, "Machine exit barrier begin {");
912     START_EVENT();
913     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
914         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
915     END_EVENT(10);
916     MACHSTATE(2, "} Machine exit barrier end");
917 }
918
919 void LrtsExit() {
920 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
921     int doPrint = 0;
922 #if CMK_SMP
923     if (CmiMyNode()==0) doPrint = 1;
924 #else
925     if (CmiMyPe()==0) doPrint = 1;
926 #endif
927
928     if (doPrint) {
929 #if MPI_POST_RECV
930         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
931 #endif
932     }
933 #endif
934
935 #if REPORT_COMM_METRICS
936 #if CMK_SMP
937     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
938               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
939               pumptime, releasetime, sendtime);
940 #else
941     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
942               CmiMyPe(), pumptime, releasetime, sendtime);
943 #endif
944 #endif
945
946 #if ! CMK_AUTOBUILD
947     signal(SIGINT, signal_int);
948     MPI_Finalize();
949 #endif
950     exit(0);
951 }
952
953 static int machine_exit_idx;
954 static void machine_exit(char *m) {
955     EmergencyExit();
956     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
957     fflush(stdout);
958     CmiNodeBarrier();
959     if (CmiMyRank() == 0) {
960         MPI_Barrier(MPI_COMM_WORLD);
961         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
962         MPI_Abort(MPI_COMM_WORLD, 1);
963     } else {
964         while (1) CmiYield();
965     }
966 }
967
968 static void KillOnAllSigs(int sigNo) {
969     static int already_in_signal_handler = 0;
970     char *m;
971     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
972     already_in_signal_handler = 1;
973 #if CMK_CCS_AVAILABLE
974     if (CpvAccess(cmiArgDebugFlag)) {
975         CpdNotify(CPD_SIGNAL, sigNo);
976         CpdFreeze();
977     }
978 #endif
979     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
980              "Signal: %d\n",CmiMyPe(),sigNo);
981     CmiPrintStackTrace(1);
982
983     m = CmiAlloc(CmiMsgHeaderSizeBytes);
984     CmiSetHandler(m, machine_exit_idx);
985     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
986     machine_exit(m);
987 }
988 /* ######End of functions related with exiting programs###### */
989
990
991 /* ######Beginning of functions related with starting programs###### */
992 static void registerMPITraceEvents() {
993 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
994     traceRegisterUserEvent("MPI_Barrier", 10);
995     traceRegisterUserEvent("MPI_Send", 20);
996     traceRegisterUserEvent("MPI_Recv", 30);
997     traceRegisterUserEvent("MPI_Isend", 40);
998     traceRegisterUserEvent("MPI_Irecv", 50);
999     traceRegisterUserEvent("MPI_Test", 60);
1000     traceRegisterUserEvent("MPI_Iprobe", 70);
1001 #endif
1002 }
1003
1004 #if MACHINE_DEBUG_LOG
1005 FILE *debugLog = NULL;
1006 #endif
1007
1008 static char *thread_level_tostring(int thread_level) {
1009 #if CMK_MPI_INIT_THREAD
1010     switch (thread_level) {
1011     case MPI_THREAD_SINGLE:
1012         return "MPI_THREAD_SINGLE";
1013     case MPI_THREAD_FUNNELED:
1014         return "MPI_THREAD_FUNNELED";
1015     case MPI_THREAD_SERIALIZED:
1016         return "MPI_THREAD_SERIALIZED";
1017     case MPI_THREAD_MULTIPLE :
1018         return "MPI_THREAD_MULTIPLE ";
1019     default: {
1020         char *str = (char*)malloc(5);
1021         sprintf(str,"%d", thread_level);
1022         return str;
1023     }
1024     }
1025     return  "unknown";
1026 #else
1027     char *str = (char*)malloc(5);
1028     sprintf(str,"%d", thread_level);
1029     return str;
1030 #endif
1031 }
1032
1033 /**
1034  *  Obtain the number of nodes, my node id, and consuming machine layer
1035  *  specific arguments
1036  */
1037 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1038     int n,i;
1039     int ver, subver;
1040     int provided;
1041     int thread_level;
1042     int myNID;
1043     int largc=*argc;
1044     char** largv=*argv;
1045
1046 #if MACHINE_DEBUG
1047     debugLog=NULL;
1048 #endif
1049 #if CMK_USE_HP_MAIN_FIX
1050 #if FOR_CPLUS
1051     _main(largc,largv);
1052 #endif
1053 #endif
1054
1055 #if CMK_MPI_INIT_THREAD
1056 #if CMK_SMP
1057     thread_level = MPI_THREAD_MULTIPLE;
1058 #else
1059     thread_level = MPI_THREAD_SINGLE;
1060 #endif
1061     MPI_Init_thread(argc, argv, thread_level, &provided);
1062     _thread_provided = provided;
1063 #else
1064     MPI_Init(argc, argv);
1065     thread_level = 0;
1066     provided = -1;
1067 #endif
1068     largc = *argc;
1069     largv = *argv;
1070     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1071     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1072
1073     myNID = *myNodeID;
1074
1075     MPI_Get_version(&ver, &subver);
1076     if (myNID == 0) {
1077         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1078     }
1079
1080     {
1081         int debug = CmiGetArgFlag(largv,"++debug");
1082         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1083         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1084 #if CMK_HAS_GETPID
1085             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1086             fflush(stdout);
1087             if (!debug_no_pause)
1088                 sleep(15);
1089 #else
1090             printf("++debug ignored.\n");
1091 #endif
1092         }
1093     }
1094
1095
1096 #if CMK_MEM_CHECKPOINT
1097     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1098        CmiAssert(num_workpes <= *numNodes);
1099        total_pes = *numNodes;
1100        *numNodes = num_workpes;
1101     }
1102     else
1103        total_pes = num_workpes = *numNodes;
1104     if (*myNodeID == 0)
1105        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1106     petorank = (int *)malloc(sizeof(int) * num_workpes);
1107     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1108     nextrank = num_workpes;
1109
1110     if (*myNodeID >= num_workpes) {    /* is spare processor */
1111       MPI_Status sts;
1112       int vals[2];
1113       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1114       int newpe = vals[0];
1115       CpvAccess(_curRestartPhase) = vals[1];
1116
1117       if (newpe == -1) {
1118           MPI_Barrier(MPI_COMM_WORLD);
1119           MPI_Finalize();
1120           exit(0);
1121       }
1122
1123       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1124         /* update petorank */
1125       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1126       nextrank = *myNodeID + 1;
1127       *myNodeID = newpe;
1128       myNID = newpe;
1129
1130        /* add +restartaftercrash to argv */
1131       char *phase_str;
1132       char **restart_argv;
1133       int i=0;
1134       while(largv[i]!= NULL) i++;
1135       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1136       i=0;
1137       while(largv[i]!= NULL){
1138                 restart_argv[i] = largv[i];
1139                 i++;
1140       }
1141       restart_argv[i] = "+restartaftercrash";
1142       phase_str = (char*)malloc(10);
1143       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1144       restart_argv[i+1]=phase_str;
1145       restart_argv[i+2]=NULL;
1146       *argv = restart_argv;
1147       *argc = i+2;
1148       largc = *argc;
1149       largv = *argv;
1150     }
1151 #endif
1152
1153     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1154     if (idleblock && _Cmi_mynode == 0) {
1155         printf("Charm++: Running in idle blocking mode.\n");
1156     }
1157
1158 #if CMK_CHARMDEBUG
1159     /* setup signal handlers */
1160     signal(SIGSEGV, KillOnAllSigs);
1161     signal(SIGFPE, KillOnAllSigs);
1162     signal(SIGILL, KillOnAllSigs);
1163     signal_int = signal(SIGINT, KillOnAllSigs);
1164     signal(SIGTERM, KillOnAllSigs);
1165     signal(SIGABRT, KillOnAllSigs);
1166 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1167     signal(SIGQUIT, KillOnAllSigs);
1168     signal(SIGBUS, KillOnAllSigs);
1169 #   endif /*UNIX*/
1170 #endif
1171
1172 #if CMK_NO_OUTSTANDING_SENDS
1173     no_outstanding_sends=1;
1174 #endif
1175     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1176         no_outstanding_sends = 1;
1177         if (myNID == 0)
1178             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1179                    no_outstanding_sends?"":" not");
1180     }
1181
1182     request_max=MAX_QLEN;
1183     CmiGetArgInt(largv,"+requestmax",&request_max);
1184     /*printf("request max=%d\n", request_max);*/
1185
1186 #if MPI_POST_RECV
1187     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1188     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1189     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1190     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1191     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1192     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1193     if (myNID==0) {
1194         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1195                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1196     }
1197 #endif
1198
1199 #if CMI_DYNAMIC_EXERT_CAP
1200     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1201     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1202     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1203     if (myNID==0) {
1204         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1205                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1206     }
1207 #endif
1208
1209     /* checksum flag */
1210     if (CmiGetArgFlag(largv,"+checksum")) {
1211 #if CMK_ERROR_CHECKING
1212         checksum_flag = 1;
1213         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1214 #else
1215         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1216 #endif
1217     }
1218
1219     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1220     for (i=0; i<_Cmi_mynodesize+1; i++) {
1221 #if MULTI_SENDQUEUE
1222         procState[i].sendMsgBuf = PCQueueCreate();
1223 #endif
1224         procState[i].recvLock = CmiCreateLock();
1225     }
1226 #if CMK_SMP
1227 #if !MULTI_SENDQUEUE
1228     sendMsgBuf = PCQueueCreate();
1229     sendMsgBufLock = CmiCreateLock();
1230 #endif
1231 #endif
1232 }
1233
1234 static void MachinePreCommonInitForMPI(int everReturn) {
1235
1236 #if MPI_POST_RECV
1237     int doInit = 1;
1238     int i;
1239
1240 #if CMK_SMP
1241     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1242 #endif
1243
1244     /* Currently, in mpi smp, the main thread will be the comm thread, so
1245      *  only the comm thread should post recvs. Cpvs, however, need to be
1246      * created on rank 0 (the ptrs to the actual cpv memory), while
1247      * other ranks are busy waiting for this to finish. So cpv initialize
1248      * routines have to be called on every ranks, although they are only
1249      * useful on comm thread (whose rank is not zero) -Chao Mei
1250      */
1251     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1252     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1253     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1254     CpvInitialize(char*,CmiPostedRecvBuffers);
1255
1256     if (doInit) {
1257         /* Post some extra recvs to help out with incoming messages */
1258         /* On some MPIs the messages are unexpected and thus slow */
1259
1260         /* An array of request handles for posted recvs */
1261         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1262
1263         /* An array of buffers for posted recvs */
1264         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1265
1266         /* Post Recvs */
1267         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1268             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1269                                            MPI_POST_RECV_SIZE,
1270                                            MPI_BYTE,
1271                                            MPI_ANY_SOURCE,
1272                                            POST_RECV_TAG,
1273                                            MPI_COMM_WORLD,
1274                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1275                 CmiAbort("MPI_Irecv failed\n");
1276         }
1277     }
1278 #endif
1279
1280 }
1281
1282 static void MachinePostCommonInitForMPI(int everReturn) {
1283
1284     CmiIdleState *s=CmiNotifyGetState();
1285
1286     CpvInitialize(SMSG_LIST *, sent_msgs);
1287     CpvInitialize(SMSG_LIST *, end_sent);
1288     CpvInitialize(int, MsgQueueLen);
1289     CpvAccess(sent_msgs) = NULL;
1290     CpvAccess(end_sent) = NULL;
1291     CpvAccess(MsgQueueLen) = 0;
1292
1293     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1294
1295 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1296     CpvInitialize(double, projTraceStart);
1297     /* only PE 0 needs to care about registration (to generate sts file). */
1298     if (CmiMyPe() == 0) {
1299         registerMachineUserEventsFunction(&registerMPITraceEvents);
1300     }
1301 #endif
1302
1303 #if CMK_SMP
1304     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1305     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1306     if (_thread_provided == MPI_THREAD_MULTIPLE)
1307       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1308 #else
1309     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1310 #endif
1311
1312 #if MACHINE_DEBUG_LOG
1313     if (CmiMyRank() == 0) {
1314         char ln[200];
1315         sprintf(ln,"debugLog.%d",CmiMyNode());
1316         debugLog=fopen(ln,"w");
1317     }
1318 #endif
1319 }
1320 /* ######End of functions related with starting programs###### */
1321
1322 /***********************************************************************
1323  *
1324  * Abort function:
1325  *
1326  ************************************************************************/
1327
1328 void CmiAbort(const char *message) {
1329     char *m;
1330     /* if CharmDebug is attached simply try to send a message to it */
1331 #if CMK_CCS_AVAILABLE
1332     if (CpvAccess(cmiArgDebugFlag)) {
1333         CpdNotify(CPD_ABORT, message);
1334         CpdFreeze();
1335     }
1336 #endif
1337     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1338              "Reason: %s\n",CmiMyPe(),message);
1339     /*  CmiError(message); */
1340     CmiPrintStackTrace(0);
1341     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1342     CmiSetHandler(m, machine_exit_idx);
1343     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1344     machine_exit(m);
1345     /* Program never reaches here */
1346     MPI_Abort(MPI_COMM_WORLD, 1);
1347 }
1348
1349 /**************************  TIMER FUNCTIONS **************************/
1350 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1351
1352 /* MPI calls are not threadsafe, even the timer on some machines */
1353 static CmiNodeLock  timerLock = 0;
1354                                 static int _absoluteTime = 0;
1355                                                            static double starttimer = 0;
1356                                                                                       static int _is_global = 0;
1357
1358 int CmiTimerIsSynchronized() {
1359     int  flag;
1360     void *v;
1361
1362     /*  check if it using synchronized timer */
1363     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1364         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1365     if (flag) {
1366         _is_global = *(int*)v;
1367         if (_is_global && CmiMyPe() == 0)
1368             printf("Charm++> MPI timer is synchronized\n");
1369     }
1370     return _is_global;
1371 }
1372
1373 int CmiTimerAbsolute() {
1374     return _absoluteTime;
1375 }
1376
1377 double CmiStartTimer() {
1378     return 0.0;
1379 }
1380
1381 double CmiInitTime() {
1382     return starttimer;
1383 }
1384
1385 void CmiTimerInit(char **argv) {
1386     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1387     if (_absoluteTime && CmiMyPe() == 0)
1388         printf("Charm++> absolute MPI timer is used\n");
1389
1390 #if ! CMK_MEM_CHECKPOINT
1391     _is_global = CmiTimerIsSynchronized();
1392 #else
1393     _is_global = 0;
1394 #endif
1395
1396     if (_is_global) {
1397         if (CmiMyRank() == 0) {
1398             double minTimer;
1399 #if CMK_TIMER_USE_XT3_DCLOCK
1400             starttimer = dclock();
1401 #else
1402             starttimer = MPI_Wtime();
1403 #endif
1404
1405             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1406                           MPI_COMM_WORLD );
1407             starttimer = minTimer;
1408         }
1409     } else { /* we don't have a synchronous timer, set our own start time */
1410 #if ! CMK_MEM_CHECKPOINT
1411         CmiBarrier();
1412         CmiBarrier();
1413         CmiBarrier();
1414 #endif
1415 #if CMK_TIMER_USE_XT3_DCLOCK
1416         starttimer = dclock();
1417 #else
1418         starttimer = MPI_Wtime();
1419 #endif
1420     }
1421
1422 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1423     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1424         timerLock = CmiCreateLock();
1425 #endif
1426     CmiNodeAllBarrier();          /* for smp */
1427 }
1428
1429 /**
1430  * Since the timerLock is never created, and is
1431  * always NULL, then all the if-condition inside
1432  * the timer functions could be disabled right
1433  * now in the case of SMP. --Chao Mei
1434  */
1435 double CmiTimer(void) {
1436     double t;
1437 #if 0 && CMK_SMP
1438     if (timerLock) CmiLock(timerLock);
1439 #endif
1440
1441 #if CMK_TIMER_USE_XT3_DCLOCK
1442     t = dclock();
1443 #else
1444     t = MPI_Wtime();
1445 #endif
1446
1447 #if 0 && CMK_SMP
1448     if (timerLock) CmiUnlock(timerLock);
1449 #endif
1450
1451     return _absoluteTime?t: (t-starttimer);
1452 }
1453
1454 double CmiWallTimer(void) {
1455     double t;
1456 #if 0 && CMK_SMP
1457     if (timerLock) CmiLock(timerLock);
1458 #endif
1459
1460 #if CMK_TIMER_USE_XT3_DCLOCK
1461     t = dclock();
1462 #else
1463     t = MPI_Wtime();
1464 #endif
1465
1466 #if 0 && CMK_SMP
1467     if (timerLock) CmiUnlock(timerLock);
1468 #endif
1469
1470     return _absoluteTime? t: (t-starttimer);
1471 }
1472
1473 double CmiCpuTimer(void) {
1474     double t;
1475 #if 0 && CMK_SMP
1476     if (timerLock) CmiLock(timerLock);
1477 #endif
1478 #if CMK_TIMER_USE_XT3_DCLOCK
1479     t = dclock() - starttimer;
1480 #else
1481     t = MPI_Wtime() - starttimer;
1482 #endif
1483 #if 0 && CMK_SMP
1484     if (timerLock) CmiUnlock(timerLock);
1485 #endif
1486     return t;
1487 }
1488
1489 #endif     /* CMK_TIMER_USE_SPECIAL */
1490
1491 /************Barrier Related Functions****************/
1492 /* must be called on all ranks including comm thread in SMP */
1493 int CmiBarrier() {
1494 #if CMK_SMP
1495     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1496     CmiNodeAllBarrier();
1497     if (CmiMyRank() == CmiMyNodeSize())
1498 #else
1499     if (CmiMyRank() == 0)
1500 #endif
1501     {
1502         /**
1503          *  The call of CmiBarrier is usually before the initialization
1504          *  of trace module of Charm++, therefore, the START_EVENT
1505          *  and END_EVENT are disabled here. -Chao Mei
1506          */
1507         /*START_EVENT();*/
1508
1509         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1510             CmiAbort("Timernit: MPI_Barrier failed!\n");
1511
1512         /*END_EVENT(10);*/
1513     }
1514     CmiNodeAllBarrier();
1515     return 0;
1516 }
1517
1518 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1519 int CmiBarrierZero() {
1520     int i;
1521 #if CMK_SMP
1522     if (CmiMyRank() == CmiMyNodeSize())
1523 #else
1524     if (CmiMyRank() == 0)
1525 #endif
1526     {
1527         char msg[1];
1528         MPI_Status sts;
1529         if (CmiMyNode() == 0)  {
1530             for (i=0; i<CmiNumNodes()-1; i++) {
1531                 START_EVENT();
1532
1533                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1534                     CmiPrintf("MPI_Recv failed!\n");
1535
1536                 END_EVENT(30);
1537             }
1538         } else {
1539             START_EVENT();
1540
1541             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1542                 printf("MPI_Send failed!\n");
1543
1544             END_EVENT(20);
1545         }
1546     }
1547     CmiNodeAllBarrier();
1548     return 0;
1549 }
1550
1551
1552 #if CMK_MEM_CHECKPOINT
1553
1554 void mpi_restart_crashed(int pe, int rank)
1555 {
1556     int vals[2];
1557     vals[0] = pe;
1558     vals[1] = CpvAccess(_curRestartPhase)+1;
1559     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1560     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1561 }
1562
1563 /* notify spare processors to exit */
1564 void mpi_end_spare()
1565 {
1566     int i;
1567     for (i=nextrank; i<total_pes; i++) {
1568         int vals[2] = {-1,-1};
1569         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1570     }
1571 }
1572
1573 int find_spare_mpirank(int pe)
1574 {
1575     if (nextrank == total_pes) {
1576       CmiAbort("Charm++> No spare processor available.");
1577     }
1578     petorank[pe] = nextrank;
1579     nextrank++;
1580     return nextrank-1;
1581 }
1582
1583 void CkDieNow()
1584 {
1585     CmiPrintf("[%d] die now.\n", CmiMyPe());
1586
1587       /* release old messages */
1588     while (!CmiAllAsyncMsgsSent()) {
1589         PumpMsgs();
1590         CmiReleaseSentMessages();
1591     }
1592     MPI_Barrier(MPI_COMM_WORLD);
1593     MPI_Finalize();
1594     exit(0);
1595 }
1596
1597 #endif
1598
1599 /*@}*/
1600