should not even ask for thread level of support of MULTIPLE if the smp mode is comm_s...
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 static int MPI_POST_RECV_COUNT=10;
111 static int MPI_POST_RECV_LOWERSIZE=2000;
112 static int MPI_POST_RECV_UPPERSIZE=4000;
113 static int MPI_POST_RECV_SIZE;
114
115 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
116 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
117 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
118 CpvDeclare(char*,CmiPostedRecvBuffers);
119 #endif
120
121 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
122 #define TAG     1375
123 #if MPI_POST_RECV
124 #define POST_RECV_TAG       (TAG+1)
125 #define BARRIER_ZERO_TAG  TAG
126 #else
127 #define BARRIER_ZERO_TAG   (TAG-1)
128 #endif
129 /* ###End of POST_RECV related related macros ### */
130
131 #if CMK_BLUEGENEL
132 #define MAX_QLEN 8
133 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
134 #else
135 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
136 #define MAX_QLEN 200
137 #endif
138 /* =======End of Definitions of Performance-Specific Macros =======*/
139
140
141 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
142 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
143 #define CHARM_MAGIC_NUMBER               126
144
145 #if CMK_ERROR_CHECKING
146 extern unsigned char computeCheckSum(unsigned char *data, int len);
147 static int checksum_flag = 0;
148 #define CMI_SET_CHECKSUM(msg, len)      \
149         if (checksum_flag)  {   \
150           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
151           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
152         }
153 #define CMI_CHECK_CHECKSUM(msg, len)    \
154         if (checksum_flag)      \
155           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
156             CmiAbort("Fatal error: checksum doesn't agree!\n");
157 #else
158 #define CMI_SET_CHECKSUM(msg, len)
159 #define CMI_CHECK_CHECKSUM(msg, len)
160 #endif
161 /* =====End of Definitions of Message-Corruption Related Macros=====*/
162
163 /* =====Beginning of Declarations of Machine Specific Variables===== */
164 #include <signal.h>
165 void (*signal_int)(int);
166
167 static int _thread_provided = -1; /* Indicating MPI thread level */
168 static int idleblock = 0;
169
170 /* A simple list for msgs that have been sent by MPI_Isend */
171 typedef struct msg_list {
172     char *msg;
173     struct msg_list *next;
174     int size, destpe, mode;
175 #if CMK_SMP_TRACE_COMMTHREAD
176     int srcpe;
177 #endif
178     MPI_Request req;
179 } SMSG_LIST;
180
181 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
182 CpvStaticDeclare(SMSG_LIST *, end_sent);
183
184 CpvStaticDeclare(int, MsgQueueLen);
185 static int request_max;
186 /*FLAG: consume outstanding Isends in scheduler loop*/
187 static int no_outstanding_sends=0;
188
189 #if NODE_0_IS_CONVHOST
190 int inside_comm = 0;
191 #endif
192
193 typedef struct ProcState {
194 #if MULTI_SENDQUEUE
195     PCQueue      sendMsgBuf;       /* per processor message sending queue */
196 #endif
197     CmiNodeLock  recvLock;                  /* for cs->recv */
198 } ProcState;
199 static ProcState  *procState;
200
201 #if CMK_SMP && !MULTI_SENDQUEUE
202 static PCQueue sendMsgBuf;
203 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
204 #endif
205 /* =====End of Declarations of Machine Specific Variables===== */
206
207 #if CMK_MEM_CHECKPOINT
208 #define FAIL_TAG   1200
209 int num_workpes, total_pes;
210 int *petorank = NULL;
211 int  nextrank;
212 void mpi_end_spare();
213 #endif
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node, int mode);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define LrtsSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
238 #define LrtsInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define LrtsPreCommonInit MachinePreCommonInitForMPI
243 #define LrtsPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI(int whenidle);
248 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define LrtsDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define LrtsExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 static void MachinePostNonLocalForMPI();
262 #define LrtsPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-lrts.h"
274 #include "machine-common-core.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node, int mode) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285     msg_tmp->next = 0;
286     msg_tmp->mode = mode;
287
288 #if CMK_SMP_TRACE_COMMTHREAD
289     msg_tmp->srcpe = CmiMyPe();
290 #endif
291
292 #if MULTI_SENDQUEUE
293     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
294 #else
295     /*CmiLock(sendMsgBufLock);*/
296     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
297     /*CmiUnlock(sendMsgBufLock);*/
298 #endif
299
300     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
301 }
302 #endif
303
304 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
305 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
306     int node = smsg->destpe;
307     int size = smsg->size;
308     char *msg = smsg->msg;
309     int mode = smsg->mode;
310     int dstrank;
311
312     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
313 #if CMK_ERROR_CHECKING
314     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
315     CMI_SET_CHECKSUM(msg, size);
316 #endif
317
318 #if MPI_POST_RECV
319     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
320         START_EVENT();
321         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
322             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
323         /*END_EVENT(40);*/
324     } else {
325         START_EVENT();
326         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
327             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
328         /*END_EVENT(40);*/
329     }
330 #else
331     START_EVENT();
332 #if CMK_MEM_CHECKPOINT
333     dstrank = petorank[node];
334 #else
335     dstrank = node;
336 #endif
337     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
338         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
339     /*END_EVENT(40);*/
340 #endif
341
342 #if CMK_SMP_TRACE_COMMTHREAD
343     traceBeginCommOp(msg);
344     traceChangeLastTimestamp(CpvAccess(projTraceStart));
345     /* traceSendMsgComm must execute after traceBeginCommOp because
346          * we pretend we execute an entry method, and inside this we
347          * pretend we will send another message. Otherwise how could
348          * a message creation just before an entry method invocation?
349          * If such logic is broken, the projections will not trace
350          * messages correctly! -Chao Mei
351          */
352     traceSendMsgComm(msg);
353     traceEndCommOp(msg);
354 #if CMI_MPI_TRACE_MOREDETAILED
355     char tmp[64];
356     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
357     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
358 #endif
359 #endif
360
361     MACHSTATE(3,"}MPI_send end");
362     CpvAccess(MsgQueueLen)++;
363     if (CpvAccess(sent_msgs)==0)
364         CpvAccess(sent_msgs) = smsg;
365     else
366         CpvAccess(end_sent)->next = smsg;
367     CpvAccess(end_sent) = smsg;
368
369 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
370     if (mode == P2P_SYNC || mode == P2P_ASYNC)
371     {
372     while (CpvAccess(MsgQueueLen) > request_max) {
373         CmiReleaseSentMessages();
374         PumpMsgs();
375     }
376     }
377 #endif
378
379     return (CmiCommHandle) &(smsg->req);
380 }
381
382 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
383     /* Ignoring the mode for MPI layer */
384
385     CmiState cs = CmiGetState();
386     SMSG_LIST *msg_tmp;
387     int  rank;
388
389     CmiAssert(destNode != CmiMyNode());
390 #if CMK_SMP
391     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
392       EnqueueMsg(msg, size, destNode, mode);
393       return 0;
394     }
395 #endif
396     /* non smp */
397     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
398     msg_tmp->msg = msg;
399     msg_tmp->destpe = destNode;
400     msg_tmp->size = size;
401     msg_tmp->next = 0;
402     msg_tmp->mode = mode;
403     return MPISendOneMsg(msg_tmp);
404 }
405
406 static size_t CmiAllAsyncMsgsSent(void) {
407     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
408     MPI_Status sts;
409     int done;
410
411     while (msg_tmp!=0) {
412         done = 0;
413         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
414             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
415         if (!done)
416             return 0;
417         msg_tmp = msg_tmp->next;
418         /*    MsgQueueLen--; ????? */
419     }
420     return 1;
421 }
422
423 int CmiAsyncMsgSent(CmiCommHandle c) {
424
425     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
426     int done;
427     MPI_Status sts;
428
429     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
430         msg_tmp = msg_tmp->next;
431     if (msg_tmp) {
432         done = 0;
433         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
434             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
435         return ((done)?1:0);
436     } else {
437         return 1;
438     }
439 }
440
441 void CmiReleaseCommHandle(CmiCommHandle c) {
442     return;
443 }
444
445 /* ######Beginning of functions related with communication progress ###### */
446 static void CmiReleaseSentMessages(void) {
447     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
448     SMSG_LIST *prev=0;
449     SMSG_LIST *temp;
450     int done;
451     MPI_Status sts;
452
453 #if CMK_BLUEGENEL
454     MPID_Progress_test();
455 #endif
456
457     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
458     while (msg_tmp!=0) {
459         done =0;
460 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
461         double startT = CmiWallTimer();
462 #endif
463         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
464             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
465         if (done) {
466             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
467             CpvAccess(MsgQueueLen)--;
468             /* Release the message */
469             temp = msg_tmp->next;
470             if (prev==0) /* first message */
471                 CpvAccess(sent_msgs) = temp;
472             else
473                 prev->next = temp;
474             CmiFree(msg_tmp->msg);
475             CmiFree(msg_tmp);
476             msg_tmp = temp;
477         } else {
478             prev = msg_tmp;
479             msg_tmp = msg_tmp->next;
480         }
481 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
482         {
483             double endT = CmiWallTimer();
484             /* only record the event if it takes more than 1ms */
485             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
486         }
487 #endif
488     }
489     CpvAccess(end_sent) = prev;
490     MACHSTATE(2,"} CmiReleaseSentMessages end");
491 }
492
493 static int PumpMsgs(void) {
494     int nbytes, flg, res;
495     char *msg;
496     MPI_Status sts;
497     int recd=0;
498
499 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
500     int recvCnt=0;
501 #endif
502
503 #if CMK_BLUEGENEL
504     MPID_Progress_test();
505 #endif
506
507     MACHSTATE(2,"PumpMsgs begin {");
508
509 #if CMI_DYNAMIC_EXERT_CAP
510     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
511 #endif
512
513     while (1) {
514 #if CMI_EXERT_RECV_CAP
515         if (recvCnt==RECV_CAP) break;
516 #elif CMI_DYNAMIC_EXERT_CAP
517         if (recvCnt >= dynamicRecvCap) break;
518 #endif
519
520         /* First check posted recvs then do  probe unmatched outstanding messages */
521 #if MPI_POST_RECV
522         int completed_index=-1;
523         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
524             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
525         if (flg) {
526             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
527                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
528
529             recd = 1;
530             msg = (char *) CmiAlloc(nbytes);
531             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
532             /* and repost the recv */
533
534             START_EVENT();
535
536             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
537                                            MPI_POST_RECV_SIZE,
538                                            MPI_BYTE,
539                                            MPI_ANY_SOURCE,
540                                            POST_RECV_TAG,
541                                            MPI_COMM_WORLD,
542                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
543                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
544
545             END_EVENT(50);
546
547             CpvAccess(Cmi_posted_recv_total)++;
548         } else {
549             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
550             if (res != MPI_SUCCESS)
551                 CmiAbort("MPI_Iprobe failed\n");
552             if (!flg) break;
553             recd = 1;
554             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
555             msg = (char *) CmiAlloc(nbytes);
556
557             START_EVENT();
558
559             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
560                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
561
562             END_EVENT(30);
563
564             CpvAccess(Cmi_unposted_recv_total)++;
565         }
566 #else
567         /* Original version */
568 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
569         double startT = CmiWallTimer();
570 #endif
571         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
572         if (res != MPI_SUCCESS)
573             CmiAbort("MPI_Iprobe failed\n");
574
575         if (!flg) break;
576 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
577         {
578             double endT = CmiWallTimer();
579             /* only trace the probe that last longer than 1ms */
580             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
581         }
582 #endif
583
584         recd = 1;
585         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
586         msg = (char *) CmiAlloc(nbytes);
587
588         START_EVENT();
589
590         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
591             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
592
593         /*END_EVENT(30);*/
594
595 #endif
596
597 #if CMK_SMP_TRACE_COMMTHREAD
598         traceBeginCommOp(msg);
599         traceChangeLastTimestamp(CpvAccess(projTraceStart));
600         traceEndCommOp(msg);
601 #if CMI_MPI_TRACE_MOREDETAILED
602         char tmp[32];
603         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
604         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
605 #endif
606 #elif CMK_TRACE_COMMOVERHEAD
607         char tmp[32];
608         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
609         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
610 #endif
611
612
613         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
614         CMI_CHECK_CHECKSUM(msg, nbytes);
615 #if CMK_ERROR_CHECKING
616         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
617             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
618             CmiFree(msg);
619             CmiAbort("Abort!\n");
620             continue;
621         }
622 #endif
623
624         handleOneRecvedMsg(nbytes, msg);
625
626 #if CMI_EXERT_RECV_CAP
627         recvCnt++;
628 #elif CMI_DYNAMIC_EXERT_CAP
629         recvCnt++;
630 #if CMK_SMP
631         /* check sendMsgBuf to get the  number of messages that have not been sent
632              * which is only available in SMP mode
633          * MsgQueueLen indicates the number of messages that have not been released
634              * by MPI
635              */
636         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
637                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
638             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
639         }
640 #else
641         /* MsgQueueLen indicates the number of messages that have not been released
642              * by MPI
643              */
644         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
645             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
646         }
647 #endif
648
649 #endif
650
651     }
652
653     MACHSTATE(2,"} PumpMsgs end ");
654     return recd;
655 }
656
657 /* blocking version */
658 static void PumpMsgsBlocking(void) {
659     static int maxbytes = 20000000;
660     static char *buf = NULL;
661     int nbytes, flg;
662     MPI_Status sts;
663     char *msg;
664     int recd=0;
665
666     if (!PCQueueEmpty(CmiGetState()->recv)) return;
667     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
668     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
669     if (CpvAccess(sent_msgs))  return;
670
671 #if 0
672     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
673 #endif
674
675     if (buf == NULL) {
676         buf = (char *) CmiAlloc(maxbytes);
677         _MEMCHECK(buf);
678     }
679
680
681 #if MPI_POST_RECV
682 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
683     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
684 #endif
685
686     START_EVENT();
687
688     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
689         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
690
691     /*END_EVENT(30);*/
692
693     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
694     msg = (char *) CmiAlloc(nbytes);
695     memcpy(msg, buf, nbytes);
696
697 #if CMK_SMP_TRACE_COMMTHREAD
698     traceBeginCommOp(msg);
699     traceChangeLastTimestamp(CpvAccess(projTraceStart));
700     traceEndCommOp(msg);
701 #if CMI_MPI_TRACE_MOREDETAILED
702     char tmp[32];
703     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
704     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
705 #endif
706 #endif
707
708     handleOneRecvedMsg(nbytes, msg);
709 }
710
711
712 #if CMK_SMP
713
714 /* called by communication thread in SMP */
715 static int SendMsgBuf() {
716     SMSG_LIST *msg_tmp;
717     char *msg;
718     int node, rank, size;
719     int i;
720     int sent = 0;
721
722 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
723     int sentCnt = 0;
724 #endif
725
726 #if CMI_DYNAMIC_EXERT_CAP
727     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
728 #endif
729
730     MACHSTATE(2,"SendMsgBuf begin {");
731 #if MULTI_SENDQUEUE
732     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
733         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
734             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
735 #else
736     /* single message sending queue */
737     /* CmiLock(sendMsgBufLock); */
738     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
739     /* CmiUnlock(sendMsgBufLock); */
740     while (NULL != msg_tmp) {
741 #endif
742             MPISendOneMsg(msg_tmp);
743             sent=1;
744
745 #if CMI_EXERT_SEND_CAP
746             if (++sentCnt == SEND_CAP) break;
747 #elif CMI_DYNAMIC_EXERT_CAP
748             if (++sentCnt >= dynamicSendCap) break;
749             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
750                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
751 #endif
752
753 #if ! MULTI_SENDQUEUE
754             /* CmiLock(sendMsgBufLock); */
755             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
756             /* CmiUnlock(sendMsgBufLock); */
757 #endif
758         }
759 #if MULTI_SENDQUEUE
760     }
761 #endif
762     MACHSTATE(2,"}SendMsgBuf end ");
763     return sent;
764 }
765
766 static int MsgQueueEmpty() {
767     int i;
768 #if MULTI_SENDQUEUE
769     for (i=0; i<_Cmi_mynodesize; i++)
770         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
771 #else
772     return PCQueueEmpty(sendMsgBuf);
773 #endif
774     return 1;
775 }
776
777 /* test if all processors recv queues are empty */
778 static int RecvQueueEmpty() {
779     int i;
780     for (i=0; i<_Cmi_mynodesize; i++) {
781         CmiState cs=CmiGetStateN(i);
782         if (!PCQueueEmpty(cs->recv)) return 0;
783     }
784     return 1;
785 }
786
787
788 #define REPORT_COMM_METRICS 0
789 #if REPORT_COMM_METRICS
790 static double pumptime = 0.0;
791 static double releasetime = 0.0;
792 static double sendtime = 0.0;
793 #endif
794
795 #endif //end of CMK_SMP
796
797 static void AdvanceCommunicationForMPI(int whenidle) {
798 #if REPORT_COMM_METRICS
799     double t1, t2, t3, t4;
800     t1 = CmiWallTimer();
801 #endif
802
803 #if CMK_SMP
804     PumpMsgs();
805
806 #if REPORT_COMM_METRICS
807     t2 = CmiWallTimer();
808 #endif
809
810     CmiReleaseSentMessages();
811 #if REPORT_COMM_METRICS
812     t3 = CmiWallTimer();
813 #endif
814
815     SendMsgBuf();
816
817 #if REPORT_COMM_METRICS
818     t4 = CmiWallTimer();
819     pumptime += (t2-t1);
820     releasetime += (t3-t2);
821     sendtime += (t4-t3);
822 #endif
823
824 #else /* non-SMP case */
825     CmiReleaseSentMessages();
826
827 #if REPORT_COMM_METRICS
828     t2 = CmiWallTimer();
829 #endif
830     PumpMsgs();
831
832 #if REPORT_COMM_METRICS
833     t3 = CmiWallTimer();
834     pumptime += (t3-t2);
835     releasetime += (t2-t1);
836 #endif
837
838 #endif /* end of #if CMK_SMP */
839 }
840 /* ######End of functions related with communication progress ###### */
841
842 static void MachinePostNonLocalForMPI() {
843 #if !CMK_SMP
844     if (no_outstanding_sends) {
845         while (CpvAccess(MsgQueueLen)>0) {
846             AdvanceCommunicationForMPI(0);
847         }
848     }
849
850     /* FIXME: I don't think the following codes are needed because
851      * it repeats the same job of the next call of CmiGetNonLocal
852      */
853 #if 0
854     if (!msg) {
855         CmiReleaseSentMessages();
856         if (PumpMsgs())
857             return  PCQueuePop(cs->recv);
858         else
859             return 0;
860     }
861 #endif
862 #else
863   if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
864         CmiReleaseSentMessages();       
865         /* ??? SendMsgBuf is a not a thread-safe function. If it is put
866          * here and this function will be called in CmiNotifyStillIdle,
867          * then a data-race problem occurs */
868         /*SendMsgBuf();*/
869   }
870 #endif
871 }
872
873 /* Idle-state related functions: called in non-smp mode */
874 void CmiNotifyIdleForMPI(void) {
875     CmiReleaseSentMessages();
876     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
877 }
878
879 /* Network progress function is used to poll the network when for
880    messages. This flushes receive buffers on some  implementations*/
881 #if CMK_MACHINE_PROGRESS_DEFINED
882 void CmiMachineProgressImpl() {
883 #if !CMK_SMP
884     PumpMsgs();
885 #if CMK_IMMEDIATE_MSG
886     CmiHandleImmediate();
887 #endif
888 #else
889     /*Not implemented yet. Communication server does not seem to be
890       thread safe, so only communication thread call it */
891     if (CmiMyRank() == CmiMyNodeSize())
892         CommunicationServerThread(0);
893 #endif
894 }
895 #endif
896
897 /* ######Beginning of functions related with exiting programs###### */
898 void DrainResourcesForMPI() {
899 #if !CMK_SMP
900     while (!CmiAllAsyncMsgsSent()) {
901         PumpMsgs();
902         CmiReleaseSentMessages();
903     }
904 #else
905     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
906         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
907             CmiReleaseSentMessages();
908             SendMsgBuf();
909             PumpMsgs();
910         }
911     }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
912         while(!CmiAllAsyncMsgsSent()) {
913             CmiReleaseSentMessages();
914         }
915     }
916 #endif
917 #if CMK_MEM_CHECKPOINT
918     if (CmiMyPe() == 0) mpi_end_spare();
919 #endif
920     MACHSTATE(2, "Machine exit barrier begin {");
921     START_EVENT();
922     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
923         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
924     END_EVENT(10);
925     MACHSTATE(2, "} Machine exit barrier end");
926 }
927
928 void MachineExitForMPI() {
929 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
930     int doPrint = 0;
931 #if CMK_SMP
932     if (CmiMyNode()==0) doPrint = 1;
933 #else
934     if (CmiMyPe()==0) doPrint = 1;
935 #endif
936
937     if (doPrint) {
938 #if MPI_POST_RECV
939         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
940 #endif
941     }
942 #endif
943
944 #if REPORT_COMM_METRICS
945 #if CMK_SMP
946     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
947               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
948               pumptime, releasetime, sendtime);
949 #else
950     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
951               CmiMyPe(), pumptime, releasetime, sendtime);
952 #endif
953 #endif
954
955 #if ! CMK_AUTOBUILD
956     signal(SIGINT, signal_int);
957     MPI_Finalize();
958 #endif
959     exit(0);
960 }
961
962 static int machine_exit_idx;
963 static void machine_exit(char *m) {
964     EmergencyExit();
965     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
966     fflush(stdout);
967     CmiNodeBarrier();
968     if (CmiMyRank() == 0) {
969         MPI_Barrier(MPI_COMM_WORLD);
970         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
971         MPI_Abort(MPI_COMM_WORLD, 1);
972     } else {
973         while (1) CmiYield();
974     }
975 }
976
977 static void KillOnAllSigs(int sigNo) {
978     static int already_in_signal_handler = 0;
979     char *m;
980     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
981     already_in_signal_handler = 1;
982 #if CMK_CCS_AVAILABLE
983     if (CpvAccess(cmiArgDebugFlag)) {
984         CpdNotify(CPD_SIGNAL, sigNo);
985         CpdFreeze();
986     }
987 #endif
988     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
989              "Signal: %d\n",CmiMyPe(),sigNo);
990     CmiPrintStackTrace(1);
991
992     m = CmiAlloc(CmiMsgHeaderSizeBytes);
993     CmiSetHandler(m, machine_exit_idx);
994     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
995     machine_exit(m);
996 }
997 /* ######End of functions related with exiting programs###### */
998
999
1000 /* ######Beginning of functions related with starting programs###### */
1001 static void registerMPITraceEvents() {
1002 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1003     traceRegisterUserEvent("MPI_Barrier", 10);
1004     traceRegisterUserEvent("MPI_Send", 20);
1005     traceRegisterUserEvent("MPI_Recv", 30);
1006     traceRegisterUserEvent("MPI_Isend", 40);
1007     traceRegisterUserEvent("MPI_Irecv", 50);
1008     traceRegisterUserEvent("MPI_Test", 60);
1009     traceRegisterUserEvent("MPI_Iprobe", 70);
1010 #endif
1011 }
1012
1013 #if MACHINE_DEBUG_LOG
1014 FILE *debugLog = NULL;
1015 #endif
1016
1017 static char *thread_level_tostring(int thread_level) {
1018 #if CMK_MPI_INIT_THREAD
1019     switch (thread_level) {
1020     case MPI_THREAD_SINGLE:
1021         return "MPI_THREAD_SINGLE";
1022     case MPI_THREAD_FUNNELED:
1023         return "MPI_THREAD_FUNNELED";
1024     case MPI_THREAD_SERIALIZED:
1025         return "MPI_THREAD_SERIALIZED";
1026     case MPI_THREAD_MULTIPLE :
1027         return "MPI_THREAD_MULTIPLE";
1028     default: {
1029         char *str = (char*)malloc(5);
1030         sprintf(str,"%d", thread_level);
1031         return str;
1032     }
1033     }
1034     return  "unknown";
1035 #else
1036     char *str = (char*)malloc(5);
1037     sprintf(str,"%d", thread_level);
1038     return str;
1039 #endif
1040 }
1041
1042 /**
1043  *  Obtain the number of nodes, my node id, and consuming machine layer
1044  *  specific arguments
1045  */
1046 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1047     int n,i;
1048     int ver, subver;
1049     int provided;
1050     int thread_level;
1051     int myNID;
1052     int largc=*argc;
1053     char** largv=*argv;
1054
1055 #if MACHINE_DEBUG
1056     debugLog=NULL;
1057 #endif
1058 #if CMK_USE_HP_MAIN_FIX
1059 #if FOR_CPLUS
1060     _main(largc,largv);
1061 #endif
1062 #endif
1063
1064 #if CMK_SMP
1065     if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
1066       Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
1067     }
1068 #endif
1069
1070 #if CMK_MPI_INIT_THREAD
1071 #if CMK_SMP
1072     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
1073       thread_level = MPI_THREAD_FUNNELED;
1074     else
1075       thread_level = MPI_THREAD_MULTIPLE;
1076 #else
1077     thread_level = MPI_THREAD_SINGLE;
1078 #endif
1079     MPI_Init_thread(argc, argv, thread_level, &provided);
1080     _thread_provided = provided;
1081 #else
1082     MPI_Init(argc, argv);
1083     thread_level = 0;
1084     _thread_provided = -1;
1085 #endif
1086     largc = *argc;
1087     largv = *argv;
1088     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1089     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1090
1091     myNID = *myNodeID;
1092
1093 #if CMK_SMP
1094     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
1095         Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV; 
1096     }
1097 #endif
1098
1099     MPI_Get_version(&ver, &subver);
1100     if (myNID == 0) {
1101         printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
1102         printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
1103     }
1104
1105     {
1106         int debug = CmiGetArgFlag(largv,"++debug");
1107         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1108         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1109 #if CMK_HAS_GETPID
1110             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1111             fflush(stdout);
1112             if (!debug_no_pause)
1113                 sleep(15);
1114 #else
1115             printf("++debug ignored.\n");
1116 #endif
1117         }
1118     }
1119
1120
1121 #if CMK_MEM_CHECKPOINT
1122     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1123        CmiAssert(num_workpes <= *numNodes);
1124        total_pes = *numNodes;
1125        *numNodes = num_workpes;
1126     }
1127     else
1128        total_pes = num_workpes = *numNodes;
1129     if (*myNodeID == 0)
1130        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1131     petorank = (int *)malloc(sizeof(int) * num_workpes);
1132     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1133     nextrank = num_workpes;
1134
1135     if (*myNodeID >= num_workpes) {    /* is spare processor */
1136       MPI_Status sts;
1137       int vals[2];
1138       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1139       int newpe = vals[0];
1140       CpvAccess(_curRestartPhase) = vals[1];
1141
1142       if (newpe == -1) {
1143           MPI_Barrier(MPI_COMM_WORLD);
1144           MPI_Finalize();
1145           exit(0);
1146       }
1147
1148       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1149         /* update petorank */
1150       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1151       nextrank = *myNodeID + 1;
1152       *myNodeID = newpe;
1153       myNID = newpe;
1154
1155        /* add +restartaftercrash to argv */
1156       char *phase_str;
1157       char **restart_argv;
1158       int i=0;
1159       while(largv[i]!= NULL) i++;
1160       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1161       i=0;
1162       while(largv[i]!= NULL){
1163                 restart_argv[i] = largv[i];
1164                 i++;
1165       }
1166       restart_argv[i] = "+restartaftercrash";
1167       phase_str = (char*)malloc(10);
1168       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1169       restart_argv[i+1]=phase_str;
1170       restart_argv[i+2]=NULL;
1171       *argv = restart_argv;
1172       *argc = i+2;
1173       largc = *argc;
1174       largv = *argv;
1175     }
1176 #endif
1177
1178     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1179     if (idleblock && _Cmi_mynode == 0) {
1180         printf("Charm++: Running in idle blocking mode.\n");
1181     }
1182
1183 #if CMK_CHARMDEBUG
1184     /* setup signal handlers */
1185     signal(SIGSEGV, KillOnAllSigs);
1186     signal(SIGFPE, KillOnAllSigs);
1187     signal(SIGILL, KillOnAllSigs);
1188     signal_int = signal(SIGINT, KillOnAllSigs);
1189     signal(SIGTERM, KillOnAllSigs);
1190     signal(SIGABRT, KillOnAllSigs);
1191 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1192     signal(SIGQUIT, KillOnAllSigs);
1193     signal(SIGBUS, KillOnAllSigs);
1194 #   endif /*UNIX*/
1195 #endif
1196
1197 #if CMK_NO_OUTSTANDING_SENDS
1198     no_outstanding_sends=1;
1199 #endif
1200     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1201         no_outstanding_sends = 1;
1202         if (myNID == 0)
1203             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1204                    no_outstanding_sends?"":" not");
1205     }
1206
1207     request_max=MAX_QLEN;
1208     CmiGetArgInt(largv,"+requestmax",&request_max);
1209     /*printf("request max=%d\n", request_max);*/
1210
1211 #if MPI_POST_RECV
1212     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1213     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1214     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1215     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1216     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1217     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1218     if (myNID==0) {
1219         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1220                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1221     }
1222 #endif
1223
1224 #if CMI_DYNAMIC_EXERT_CAP
1225     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1226     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1227     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1228     if (myNID==0) {
1229         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1230                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1231     }
1232 #endif
1233
1234     /* checksum flag */
1235     if (CmiGetArgFlag(largv,"+checksum")) {
1236 #if CMK_ERROR_CHECKING
1237         checksum_flag = 1;
1238         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1239 #else
1240         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1241 #endif
1242     }
1243
1244     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1245     for (i=0; i<_Cmi_mynodesize+1; i++) {
1246 #if MULTI_SENDQUEUE
1247         procState[i].sendMsgBuf = PCQueueCreate();
1248 #endif
1249         procState[i].recvLock = CmiCreateLock();
1250     }
1251 #if CMK_SMP
1252 #if !MULTI_SENDQUEUE
1253     sendMsgBuf = PCQueueCreate();
1254     sendMsgBufLock = CmiCreateLock();
1255 #endif
1256 #endif
1257 }
1258
1259 static void MachinePreCommonInitForMPI(int everReturn) {
1260
1261 #if MPI_POST_RECV
1262     int doInit = 1;
1263     int i;
1264
1265 #if CMK_SMP
1266     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1267 #endif
1268
1269     /* Currently, in mpi smp, the main thread will be the comm thread, so
1270      *  only the comm thread should post recvs. Cpvs, however, need to be
1271      * created on rank 0 (the ptrs to the actual cpv memory), while
1272      * other ranks are busy waiting for this to finish. So cpv initialize
1273      * routines have to be called on every ranks, although they are only
1274      * useful on comm thread (whose rank is not zero) -Chao Mei
1275      */
1276     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1277     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1278     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1279     CpvInitialize(char*,CmiPostedRecvBuffers);
1280
1281     if (doInit) {
1282         /* Post some extra recvs to help out with incoming messages */
1283         /* On some MPIs the messages are unexpected and thus slow */
1284
1285         /* An array of request handles for posted recvs */
1286         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1287
1288         /* An array of buffers for posted recvs */
1289         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1290
1291         /* Post Recvs */
1292         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1293             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1294                                            MPI_POST_RECV_SIZE,
1295                                            MPI_BYTE,
1296                                            MPI_ANY_SOURCE,
1297                                            POST_RECV_TAG,
1298                                            MPI_COMM_WORLD,
1299                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1300                 CmiAbort("MPI_Irecv failed\n");
1301         }
1302     }
1303 #endif
1304
1305 }
1306
1307 static void MachinePostCommonInitForMPI(int everReturn) {
1308
1309     CmiIdleState *s=CmiNotifyGetState();
1310
1311     CpvInitialize(SMSG_LIST *, sent_msgs);
1312     CpvInitialize(SMSG_LIST *, end_sent);
1313     CpvInitialize(int, MsgQueueLen);
1314     CpvAccess(sent_msgs) = NULL;
1315     CpvAccess(end_sent) = NULL;
1316     CpvAccess(MsgQueueLen) = 0;
1317
1318     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1319
1320 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1321     CpvInitialize(double, projTraceStart);
1322     /* only PE 0 needs to care about registration (to generate sts file). */
1323     if (CmiMyPe() == 0) {
1324         registerMachineUserEventsFunction(&registerMPITraceEvents);
1325     }
1326 #endif
1327
1328 #if CMK_SMP
1329     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1330     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1331     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV)
1332       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1333 #else
1334     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1335 #endif
1336
1337 #if MACHINE_DEBUG_LOG
1338     if (CmiMyRank() == 0) {
1339         char ln[200];
1340         sprintf(ln,"debugLog.%d",CmiMyNode());
1341         debugLog=fopen(ln,"w");
1342     }
1343 #endif
1344 }
1345 /* ######End of functions related with starting programs###### */
1346
1347 /***********************************************************************
1348  *
1349  * Abort function:
1350  *
1351  ************************************************************************/
1352
1353 void CmiAbort(const char *message) {
1354     char *m;
1355     /* if CharmDebug is attached simply try to send a message to it */
1356 #if CMK_CCS_AVAILABLE
1357     if (CpvAccess(cmiArgDebugFlag)) {
1358         CpdNotify(CPD_ABORT, message);
1359         CpdFreeze();
1360     }
1361 #endif
1362     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1363              "Reason: %s\n",CmiMyPe(),message);
1364     /*  CmiError(message); */
1365     CmiPrintStackTrace(0);
1366     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1367     CmiSetHandler(m, machine_exit_idx);
1368     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1369     machine_exit(m);
1370     /* Program never reaches here */
1371     MPI_Abort(MPI_COMM_WORLD, 1);
1372 }
1373
1374 /**************************  TIMER FUNCTIONS **************************/
1375 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1376
1377 /* MPI calls are not threadsafe, even the timer on some machines */
1378 static CmiNodeLock  timerLock = 0;
1379                                 static int _absoluteTime = 0;
1380                                                            static double starttimer = 0;
1381                                                                                       static int _is_global = 0;
1382
1383 int CmiTimerIsSynchronized() {
1384     int  flag;
1385     void *v;
1386
1387     /*  check if it using synchronized timer */
1388     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1389         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1390     if (flag) {
1391         _is_global = *(int*)v;
1392         if (_is_global && CmiMyPe() == 0)
1393             printf("Charm++> MPI timer is synchronized\n");
1394     }
1395     return _is_global;
1396 }
1397
1398 int CmiTimerAbsolute() {
1399     return _absoluteTime;
1400 }
1401
1402 double CmiStartTimer() {
1403     return 0.0;
1404 }
1405
1406 double CmiInitTime() {
1407     return starttimer;
1408 }
1409
1410 void CmiTimerInit(char **argv) {
1411     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1412     if (_absoluteTime && CmiMyPe() == 0)
1413         printf("Charm++> absolute MPI timer is used\n");
1414
1415 #if ! CMK_MEM_CHECKPOINT
1416     _is_global = CmiTimerIsSynchronized();
1417 #else
1418     _is_global = 0;
1419 #endif
1420
1421     if (_is_global) {
1422         if (CmiMyRank() == 0) {
1423             double minTimer;
1424 #if CMK_TIMER_USE_XT3_DCLOCK
1425             starttimer = dclock();
1426 #else
1427             starttimer = MPI_Wtime();
1428 #endif
1429
1430             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1431                           MPI_COMM_WORLD );
1432             starttimer = minTimer;
1433         }
1434     } else { /* we don't have a synchronous timer, set our own start time */
1435 #if ! CMK_MEM_CHECKPOINT
1436         CmiBarrier();
1437         CmiBarrier();
1438         CmiBarrier();
1439 #endif
1440 #if CMK_TIMER_USE_XT3_DCLOCK
1441         starttimer = dclock();
1442 #else
1443         starttimer = MPI_Wtime();
1444 #endif
1445     }
1446
1447 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1448     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1449         timerLock = CmiCreateLock();
1450 #endif
1451     CmiNodeAllBarrier();          /* for smp */
1452 }
1453
1454 /**
1455  * Since the timerLock is never created, and is
1456  * always NULL, then all the if-condition inside
1457  * the timer functions could be disabled right
1458  * now in the case of SMP. --Chao Mei
1459  */
1460 double CmiTimer(void) {
1461     double t;
1462 #if 0 && CMK_SMP
1463     if (timerLock) CmiLock(timerLock);
1464 #endif
1465
1466 #if CMK_TIMER_USE_XT3_DCLOCK
1467     t = dclock();
1468 #else
1469     t = MPI_Wtime();
1470 #endif
1471
1472 #if 0 && CMK_SMP
1473     if (timerLock) CmiUnlock(timerLock);
1474 #endif
1475
1476     return _absoluteTime?t: (t-starttimer);
1477 }
1478
1479 double CmiWallTimer(void) {
1480     double t;
1481 #if 0 && CMK_SMP
1482     if (timerLock) CmiLock(timerLock);
1483 #endif
1484
1485 #if CMK_TIMER_USE_XT3_DCLOCK
1486     t = dclock();
1487 #else
1488     t = MPI_Wtime();
1489 #endif
1490
1491 #if 0 && CMK_SMP
1492     if (timerLock) CmiUnlock(timerLock);
1493 #endif
1494
1495     return _absoluteTime? t: (t-starttimer);
1496 }
1497
1498 double CmiCpuTimer(void) {
1499     double t;
1500 #if 0 && CMK_SMP
1501     if (timerLock) CmiLock(timerLock);
1502 #endif
1503 #if CMK_TIMER_USE_XT3_DCLOCK
1504     t = dclock() - starttimer;
1505 #else
1506     t = MPI_Wtime() - starttimer;
1507 #endif
1508 #if 0 && CMK_SMP
1509     if (timerLock) CmiUnlock(timerLock);
1510 #endif
1511     return t;
1512 }
1513
1514 #endif     /* CMK_TIMER_USE_SPECIAL */
1515
1516 /************Barrier Related Functions****************/
1517 /* must be called on all ranks including comm thread in SMP */
1518 int CmiBarrier() {
1519 #if CMK_SMP
1520     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1521     CmiNodeAllBarrier();
1522     if (CmiMyRank() == CmiMyNodeSize())
1523 #else
1524     if (CmiMyRank() == 0)
1525 #endif
1526     {
1527         /**
1528          *  The call of CmiBarrier is usually before the initialization
1529          *  of trace module of Charm++, therefore, the START_EVENT
1530          *  and END_EVENT are disabled here. -Chao Mei
1531          */
1532         /*START_EVENT();*/
1533
1534         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1535             CmiAbort("Timernit: MPI_Barrier failed!\n");
1536
1537         /*END_EVENT(10);*/
1538     }
1539     CmiNodeAllBarrier();
1540     return 0;
1541 }
1542
1543 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1544 int CmiBarrierZero() {
1545     int i;
1546 #if CMK_SMP
1547     if (CmiMyRank() == CmiMyNodeSize())
1548 #else
1549     if (CmiMyRank() == 0)
1550 #endif
1551     {
1552         char msg[1];
1553         MPI_Status sts;
1554         if (CmiMyNode() == 0)  {
1555             for (i=0; i<CmiNumNodes()-1; i++) {
1556                 START_EVENT();
1557
1558                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1559                     CmiPrintf("MPI_Recv failed!\n");
1560
1561                 END_EVENT(30);
1562             }
1563         } else {
1564             START_EVENT();
1565
1566             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1567                 printf("MPI_Send failed!\n");
1568
1569             END_EVENT(20);
1570         }
1571     }
1572     CmiNodeAllBarrier();
1573     return 0;
1574 }
1575
1576
1577 #if CMK_MEM_CHECKPOINT
1578
1579 void mpi_restart_crashed(int pe, int rank)
1580 {
1581     int vals[2];
1582     vals[0] = pe;
1583     vals[1] = CpvAccess(_curRestartPhase)+1;
1584     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1585     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1586 }
1587
1588 /* notify spare processors to exit */
1589 void mpi_end_spare()
1590 {
1591     int i;
1592     for (i=nextrank; i<total_pes; i++) {
1593         int vals[2] = {-1,-1};
1594         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1595     }
1596 }
1597
1598 int find_spare_mpirank(int pe)
1599 {
1600     if (nextrank == total_pes) {
1601       CmiAbort("Charm++> No spare processor available.");
1602     }
1603     petorank[pe] = nextrank;
1604     nextrank++;
1605     return nextrank-1;
1606 }
1607
1608 void CkDieNow()
1609 {
1610     CmiPrintf("[%d] die now.\n", CmiMyPe());
1611
1612       /* release old messages */
1613     while (!CmiAllAsyncMsgsSent()) {
1614         PumpMsgs();
1615         CmiReleaseSentMessages();
1616     }
1617     MPI_Barrier(MPI_COMM_WORLD);
1618     MPI_Finalize();
1619     exit(0);
1620 }
1621
1622 #endif
1623
1624 /*@}*/
1625