Merge commit '30f31a362b23bc054c1e3244d8c6c98758dcfe57' into charm
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59
60 #if CMI_EXERT_SEND_CAP
61 #define SEND_CAP 3
62 #endif
63
64 #if CMI_EXERT_RECV_CAP
65 #define RECV_CAP 2
66 #endif
67 /* ###End of flow control related macros ### */
68
69 /* ###Beginning of machine-layer-tracing related macros ### */
70 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
71 #define CMI_MPI_TRACE_MOREDETAILED 0
72 #undef CMI_MPI_TRACE_USEREVENTS
73 #define CMI_MPI_TRACE_USEREVENTS 1
74 #else
75 #undef CMK_SMP_TRACE_COMMTHREAD
76 #define CMK_SMP_TRACE_COMMTHREAD 0
77 #endif
78
79 #define CMK_TRACE_COMMOVERHEAD 0
80 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_TRACE_COMMOVERHEAD
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #endif
87
88 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
89 CpvStaticDeclare(double, projTraceStart);
90 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
91 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
92 #else
93 #define  START_EVENT()
94 #define  END_EVENT(x)
95 #endif
96 /* ###End of machine-layer-tracing related macros ### */
97
98 /* ###Beginning of POST_RECV related macros ### */
99 /*
100  * If MPI_POST_RECV is defined, we provide default values for
101  * size and number of posted recieves. If MPI_POST_RECV_COUNT
102  * is set then a default value for MPI_POST_RECV_SIZE is used
103  * if not specified by the user.
104  */
105 #define MPI_POST_RECV 0
106
107 /* Making those parameters configurable for testing them easily */
108
109 #if MPI_POST_RECV
110 #define MPI_DYNAMIC_POST_RECV 0
111
112 /* Note the tag offset of a msg is determined by
113  * (its size - MPI_RECV_LOWERSIZE)/MPI_POST_RECV_INC.
114  * based on POST_RECV_TAG.
115  */
116 static int MPI_POST_RECV_COUNT=10;
117
118 /* The range of msgs to be tracked for histogramming */
119 static int MPI_POST_RECV_LOWERSIZE=8000;
120 static int MPI_POST_RECV_UPPERSIZE=64000;
121
122 /* The increment of msg size to be tracked, i.e. the histogram bucket size */
123 static int MPI_POST_RECV_INC = 1000;
124
125 /* The unit increment of msg cnt for increase #buf for a post recved msg */
126 static int MPI_POST_RECV_MSG_INC = 400;
127
128 /* If the #msg exceeds this value, post recv is created for such msg */
129 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
130
131 /* The frequency of checking the existing posted recv buffers in the unit of #msgs */
132 static int MPI_POST_RECV_FREQ = 1000;
133
134 static int MPI_POST_RECV_SIZE;
135
136 typedef struct mpiPostRecvList {
137     /* POST_RECV_TAG + msgSizeIdx is the recv tag;
138      * Based on this value, this buf corresponds to msg size ranging
139      * [msgSizeIdx*MPI_POST_RECV_INC, (msgSizeIdx+1)*MPI_POST_RECV_INC)
140      */
141     int msgSizeIdx;
142     int bufCnt;
143     MPI_Request *postedRecvReqs;
144     char **postedRecvBufs;
145     struct mpiPostRecvList *next;
146 } MPIPostRecvList;
147 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
148 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
149 CpvDeclare(int, msgRecvCnt);
150
151 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
152 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
153 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
154 CpvDeclare(char**,CmiPostedRecvBuffers);
155
156 /* Note: currently MPI doesn't provide a function whether a request is in progress.
157  * For example, a irecv has been filled partially. Then a call to MPI_Test still returns
158  * indicating it has not been finished. If only relying on this result, then calling
159  * MPI_Cancel will result in a loss of this msg. The dynamic post recv mechanism
160  * can only be safely used in a synchronized point such as load balancing.
161  */
162 #if MPI_DYNAMIC_POST_RECV
163 static int MSG_HISTOGRAM_BINSIZE;
164 static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
165 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
166 static void recordMsgHistogramInfo(int size, char *msg);
167 static void reportMsgHistogramInfo();
168 #endif /* end of MPI_DYNAMIC_POST_RECV defined */
169
170 #endif /* end of MPI_POST_RECV defined */
171
172 /* Providing functions for external usage to set up the dynamic recv buffer
173  * when the user is aware that it's safe to call such function
174  */
175 void CmiSetupMachineRecvBuffers();
176
177 #define CAPTURE_MSG_HISTOGRAM 0
178 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
179 static int MSG_HISTOGRAM_BINSIZE=1000;
180 static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
181 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
182 static void recordMsgHistogramInfo(int size, char *msg);
183 static void reportMsgHistogramInfo();
184 #endif
185
186 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
187 #define TAG     1375
188 #if MPI_POST_RECV
189 #define POST_RECV_TAG       (TAG+1)
190 #define BARRIER_ZERO_TAG  TAG
191 #else
192 #define BARRIER_ZERO_TAG   (TAG-1)
193 #endif
194 /* ###End of POST_RECV related related macros ### */
195
196 #if CMK_BLUEGENEL
197 #define MAX_QLEN 8
198 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
199 #else
200 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
201 #define MAX_QLEN 200
202 #endif
203 /* =======End of Definitions of Performance-Specific Macros =======*/
204
205
206 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
207 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
208 #define CHARM_MAGIC_NUMBER               126
209
210 #if CMK_ERROR_CHECKING
211 extern unsigned char computeCheckSum(unsigned char *data, int len);
212 static int checksum_flag = 0;
213 #define CMI_SET_CHECKSUM(msg, len)      \
214         if (checksum_flag)  {   \
215           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
216           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
217         }
218 #define CMI_CHECK_CHECKSUM(msg, len)    \
219         if (checksum_flag)      \
220           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
221             CmiAbort("Fatal error: checksum doesn't agree!\n");
222 #else
223 #define CMI_SET_CHECKSUM(msg, len)
224 #define CMI_CHECK_CHECKSUM(msg, len)
225 #endif
226 /* =====End of Definitions of Message-Corruption Related Macros=====*/
227
228 /* =====Beginning of Declarations of Machine Specific Variables===== */
229 #include <signal.h>
230 void (*signal_int)(int);
231
232 static int _thread_provided = -1; /* Indicating MPI thread level */
233 static int idleblock = 0;
234
235 /* A simple list for msgs that have been sent by MPI_Isend */
236 typedef struct msg_list {
237     char *msg;
238     struct msg_list *next;
239     int size, destpe, mode;
240 #if CMK_SMP_TRACE_COMMTHREAD
241     int srcpe;
242 #endif
243     MPI_Request req;
244 } SMSG_LIST;
245
246 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
247 CpvStaticDeclare(SMSG_LIST *, end_sent);
248
249 CpvStaticDeclare(int, MsgQueueLen);
250 static int request_max;
251 /*FLAG: consume outstanding Isends in scheduler loop*/
252 static int no_outstanding_sends=0;
253
254 #if NODE_0_IS_CONVHOST
255 int inside_comm = 0;
256 #endif
257
258 typedef struct ProcState {
259 #if MULTI_SENDQUEUE
260     PCQueue      sendMsgBuf;       /* per processor message sending queue */
261 #endif
262     CmiNodeLock  recvLock;                  /* for cs->recv */
263 } ProcState;
264 static ProcState  *procState;
265
266 #if CMK_SMP && !MULTI_SENDQUEUE
267 static PCQueue sendMsgBuf;
268 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
269 #endif
270 /* =====End of Declarations of Machine Specific Variables===== */
271
272 #if CMK_MEM_CHECKPOINT
273 #define FAIL_TAG   1200
274 int num_workpes, total_pes;
275 int *petorank = NULL;
276 int  nextrank;
277 void mpi_end_spare();
278 #endif
279
280 /* =====Beginning of Declarations of Machine Specific Functions===== */
281 /* Utility functions */
282 #if CMK_BLUEGENEL
283 extern void MPID_Progress_test();
284 #endif
285 static size_t CmiAllAsyncMsgsSent(void);
286 static void CmiReleaseSentMessages(void);
287 static int PumpMsgs(void);
288 static void PumpMsgsBlocking(void);
289
290 #if CMK_SMP
291 static int MsgQueueEmpty();
292 static int RecvQueueEmpty();
293 static int SendMsgBuf();
294 static  void EnqueueMsg(void *m, int size, int node, int mode);
295 #endif
296
297 /* The machine-specific send function */
298 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
299 #define LrtsSendFunc MachineSpecificSendForMPI
300
301 /* ### Beginning of Machine-startup Related Functions ### */
302 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
303 #define LrtsInit MachineInitForMPI
304
305 static void MachinePreCommonInitForMPI(int everReturn);
306 static void MachinePostCommonInitForMPI(int everReturn);
307 #define LrtsPreCommonInit MachinePreCommonInitForMPI
308 #define LrtsPostCommonInit MachinePostCommonInitForMPI
309 /* ### End of Machine-startup Related Functions ### */
310
311 /* ### Beginning of Machine-running Related Functions ### */
312 static void AdvanceCommunicationForMPI(int whenidle);
313 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
314
315 static void DrainResourcesForMPI(); /* used when exit */
316 #define LrtsDrainResources DrainResourcesForMPI
317
318 static void MachineExitForMPI();
319 #define LrtsExit MachineExitForMPI
320 /* ### End of Machine-running Related Functions ### */
321
322 /* ### Beginning of Idle-state Related Functions ### */
323 void CmiNotifyIdleForMPI(void);
324 /* ### End of Idle-state Related Functions ### */
325
326 static void MachinePostNonLocalForMPI();
327 #define LrtsPostNonLocal MachinePostNonLocalForMPI
328
329 /* =====End of Declarations of Machine Specific Functions===== */
330
331 /**
332  *  Macros that overwrites the common codes, such as
333  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
334  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
335  *  CMK_OFFLOAD_BCAST_PROCESS etc.
336  */
337 #define CMK_HAS_SIZE_IN_MSGHDR 0
338 #include "machine-lrts.h"
339 #include "machine-common-core.c"
340
341 /* The machine specific msg-sending function */
342
343 #if CMK_SMP
344 static void EnqueueMsg(void *m, int size, int node, int mode) {
345     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
346     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
347     msg_tmp->msg = m;
348     msg_tmp->size = size;
349     msg_tmp->destpe = node;
350     msg_tmp->next = 0;
351     msg_tmp->mode = mode;
352
353 #if CMK_SMP_TRACE_COMMTHREAD
354     msg_tmp->srcpe = CmiMyPe();
355 #endif
356
357 #if MULTI_SENDQUEUE
358     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
359 #else
360     /*CmiLock(sendMsgBufLock);*/
361     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
362     /*CmiUnlock(sendMsgBufLock);*/
363 #endif
364
365     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
366 }
367 #endif
368
369 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
370 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
371     int node = smsg->destpe;
372     int size = smsg->size;
373     char *msg = smsg->msg;
374     int mode = smsg->mode;
375     int dstrank;
376
377     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
378 #if CMK_ERROR_CHECKING
379     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
380     CMI_SET_CHECKSUM(msg, size);
381 #endif
382
383 #if MPI_POST_RECV
384     if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
385 #if MPI_DYNAMIC_POST_RECV
386         int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
387         START_EVENT();
388         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,MPI_COMM_WORLD,&(smsg->req)))
389             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
390 #else
391         START_EVENT();
392         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
393             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
394 #endif
395         /*END_EVENT(40);*/
396     } else {
397         START_EVENT();
398         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
399             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
400         /*END_EVENT(40);*/
401     }
402 #else
403     START_EVENT();
404     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
405         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
406     /*END_EVENT(40);*/
407 #endif
408
409 #if CMK_SMP_TRACE_COMMTHREAD
410     traceBeginCommOp(msg);
411     traceChangeLastTimestamp(CpvAccess(projTraceStart));
412     /* traceSendMsgComm must execute after traceBeginCommOp because
413          * we pretend we execute an entry method, and inside this we
414          * pretend we will send another message. Otherwise how could
415          * a message creation just before an entry method invocation?
416          * If such logic is broken, the projections will not trace
417          * messages correctly! -Chao Mei
418          */
419     traceSendMsgComm(msg);
420     traceEndCommOp(msg);
421 #if CMI_MPI_TRACE_MOREDETAILED
422     char tmp[64];
423     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
424     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
425 #endif
426 #endif
427
428     MACHSTATE(3,"}MPI_Isend end");
429     CpvAccess(MsgQueueLen)++;
430     if (CpvAccess(sent_msgs)==0)
431         CpvAccess(sent_msgs) = smsg;
432     else
433         CpvAccess(end_sent)->next = smsg;
434     CpvAccess(end_sent) = smsg;
435
436 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
437     if (mode == P2P_SYNC || mode == P2P_ASYNC)
438     {
439     while (CpvAccess(MsgQueueLen) > request_max) {
440         CmiReleaseSentMessages();
441         PumpMsgs();
442     }
443     }
444 #endif
445
446     return (CmiCommHandle) &(smsg->req);
447 }
448
449 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
450     /* Ignoring the mode for MPI layer */
451
452     CmiState cs = CmiGetState();
453     SMSG_LIST *msg_tmp;
454     int  rank;
455
456     CmiAssert(destNode != CmiMyNode());
457 #if CMK_SMP
458     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
459       EnqueueMsg(msg, size, destNode, mode);
460       return 0;
461     }
462 #endif
463     /* non smp */
464     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
465     msg_tmp->msg = msg;
466     msg_tmp->destpe = destNode;
467     msg_tmp->size = size;
468     msg_tmp->next = 0;
469     msg_tmp->mode = mode;
470     return MPISendOneMsg(msg_tmp);
471 }
472
473 static size_t CmiAllAsyncMsgsSent(void) {
474     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
475     MPI_Status sts;
476     int done;
477
478     while (msg_tmp!=0) {
479         done = 0;
480         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
481             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
482         if (!done)
483             return 0;
484         msg_tmp = msg_tmp->next;
485         /*    MsgQueueLen--; ????? */
486     }
487     return 1;
488 }
489
490 int CmiAsyncMsgSent(CmiCommHandle c) {
491
492     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
493     int done;
494     MPI_Status sts;
495
496     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
497         msg_tmp = msg_tmp->next;
498     if (msg_tmp) {
499         done = 0;
500         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
501             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
502         return ((done)?1:0);
503     } else {
504         return 1;
505     }
506 }
507
508 void CmiReleaseCommHandle(CmiCommHandle c) {
509     return;
510 }
511
512 /* ######Beginning of functions related with communication progress ###### */
513 static void CmiReleaseSentMessages(void) {
514     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
515     SMSG_LIST *prev=0;
516     SMSG_LIST *temp;
517     int done;
518     MPI_Status sts;
519
520 #if CMK_BLUEGENEL
521     MPID_Progress_test();
522 #endif
523
524     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
525     while (msg_tmp!=0) {
526         done =0;
527 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
528         double startT = CmiWallTimer();
529 #endif
530         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
531             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
532         if (done) {
533             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
534             CpvAccess(MsgQueueLen)--;
535             /* Release the message */
536             temp = msg_tmp->next;
537             if (prev==0) /* first message */
538                 CpvAccess(sent_msgs) = temp;
539             else
540                 prev->next = temp;
541             CmiFree(msg_tmp->msg);
542             CmiFree(msg_tmp);
543             msg_tmp = temp;
544         } else {
545             prev = msg_tmp;
546             msg_tmp = msg_tmp->next;
547         }
548 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
549         {
550             double endT = CmiWallTimer();
551             /* only record the event if it takes more than 1ms */
552             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
553         }
554 #endif
555     }
556     CpvAccess(end_sent) = prev;
557     MACHSTATE(2,"} CmiReleaseSentMessages end");
558 }
559
560 static int PumpMsgs(void) {
561     int nbytes, flg, res;
562     char *msg;
563     MPI_Status sts;
564     int recd=0;
565
566 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
567     int recvCnt=0;
568 #endif
569
570 #if CMK_BLUEGENEL
571     MPID_Progress_test();
572 #endif
573
574     MACHSTATE(2,"PumpMsgs begin {");
575
576 #if CMI_DYNAMIC_EXERT_CAP
577     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
578 #endif
579
580     while (1) {
581 #if CMI_EXERT_RECV_CAP
582         if (recvCnt==RECV_CAP) break;
583 #elif CMI_DYNAMIC_EXERT_CAP
584         if (recvCnt >= dynamicRecvCap) break;
585 #endif
586
587 #if CMI_SMP_TRACE_COMMTHREAD
588         START_EVENT();
589 #endif
590
591         /* First check posted recvs then do  probe unmatched outstanding messages */
592 #if MPI_POST_RECV
593         MPIPostRecvList *postedOne = NULL;
594         int completed_index = -1;
595         flg = 0;
596 #if MPI_DYNAMIC_POST_RECV
597         MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
598         if (oldPostRecvPtr) {
599             /* post recv buf inited */
600             do {
601                 /* round-robin iteration over the list */
602                 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
603                 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
604                     CmiAbort("PumpMsgs: MPI_Testany failed!\n");
605
606                 if (flg) {
607                     postedOne = cur;
608                     break;
609                 }
610                 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
611             } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
612         }
613 #else
614         MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
615         if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
616             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
617 #endif
618         if (flg) {
619             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
620                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
621
622             recd = 1;
623 #if !MPI_DYNAMIC_POST_RECV
624             postedOne = CpvAccess(curPostRecvPtr);
625 #endif
626             msg = (postedOne->postedRecvBufs)[completed_index];
627             (postedOne->postedRecvBufs)[completed_index] = NULL;
628
629             CpvAccess(Cmi_posted_recv_total)++;
630         } else {
631             res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flg, &sts);
632             if (res != MPI_SUCCESS)
633                 CmiAbort("MPI_Iprobe failed\n");
634             if (!flg) break;
635             recd = 1;
636             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
637             msg = (char *) CmiAlloc(nbytes);
638
639             START_EVENT();
640
641             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
642                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
643
644             END_EVENT(30);
645
646             CpvAccess(Cmi_unposted_recv_total)++;
647         }
648 #else
649         /* Original version */
650 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
651         double startT = CmiWallTimer();
652 #endif
653         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
654         if (res != MPI_SUCCESS)
655             CmiAbort("MPI_Iprobe failed\n");
656
657         if (!flg) break;
658 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
659         {
660             double endT = CmiWallTimer();
661             /* only trace the probe that last longer than 1ms */
662             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
663         }
664 #endif
665
666         recd = 1;
667         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
668         msg = (char *) CmiAlloc(nbytes);
669
670         START_EVENT();
671
672         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
673             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
674
675         /*END_EVENT(30);*/
676
677 #endif /*end of not MPI_POST_RECV */
678
679 #if CMK_SMP_TRACE_COMMTHREAD
680         traceBeginCommOp(msg);
681         traceChangeLastTimestamp(CpvAccess(projTraceStart));
682         traceEndCommOp(msg);
683 #if CMI_MPI_TRACE_MOREDETAILED
684         char tmp[32];
685         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
686         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
687 #endif
688 #elif CMK_TRACE_COMMOVERHEAD
689         char tmp[32];
690         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
691         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
692 #endif
693
694
695         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
696         CMI_CHECK_CHECKSUM(msg, nbytes);
697 #if CMK_ERROR_CHECKING
698         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
699             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
700             CmiFree(msg);
701             CmiAbort("Abort!\n");
702             continue;
703         }
704 #endif
705
706         handleOneRecvedMsg(nbytes, msg);
707         
708 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
709         recordMsgHistogramInfo(nbytes, msg);
710 #endif
711
712 #if  MPI_POST_RECV
713 #if MPI_DYNAMIC_POST_RECV
714         if (postedOne) {
715             //printf("[%d]: get one posted recv\n", CmiMyPe());
716             /* Get the upper size of this buffer */
717             int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
718             int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
719             /* Has to re-allocate the buffer for the message */
720             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
721
722             /* and repost the recv */
723             START_EVENT();
724
725             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
726                                          postRecvBufSize,
727                                          MPI_BYTE,
728                                          MPI_ANY_SOURCE,
729                                          postRecvTag,
730                                          MPI_COMM_WORLD,
731                                          &((postedOne->postedRecvReqs)[completed_index])  ))
732                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
733             END_EVENT(50);
734         }
735 #else
736         if (postedOne) {
737             /* Has to re-allocate the buffer for the message */
738             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
739
740             /* and repost the recv */
741             START_EVENT();
742
743             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
744                                          MPI_POST_RECV_SIZE,
745                                          MPI_BYTE,
746                                          MPI_ANY_SOURCE,
747                                          POST_RECV_TAG,
748                                          MPI_COMM_WORLD,
749                                          &((postedOne->postedRecvReqs)[completed_index])  ))
750                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
751             END_EVENT(50);
752         }
753 #endif /* not MPI_DYNAMIC_POST_RECV */
754 #endif
755
756 #if CMI_EXERT_RECV_CAP
757         recvCnt++;
758 #elif CMI_DYNAMIC_EXERT_CAP
759         recvCnt++;
760 #if CMK_SMP
761         /* check sendMsgBuf to get the  number of messages that have not been sent
762              * which is only available in SMP mode
763          * MsgQueueLen indicates the number of messages that have not been released
764              * by MPI
765              */
766         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
767                 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
768             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
769         }
770 #else
771         /* MsgQueueLen indicates the number of messages that have not been released
772              * by MPI
773              */
774         if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
775             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
776         }
777 #endif
778
779 #endif
780
781     }
782
783     MACHSTATE(2,"} PumpMsgs end ");
784     return recd;
785 }
786
787 /* blocking version */
788 static void PumpMsgsBlocking(void) {
789     static int maxbytes = 20000000;
790     static char *buf = NULL;
791     int nbytes, flg;
792     MPI_Status sts;
793     char *msg;
794     int recd=0;
795
796     if (!PCQueueEmpty(CmiGetState()->recv)) return;
797     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
798     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
799     if (CpvAccess(sent_msgs))  return;
800
801 #if 0
802     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
803 #endif
804
805     if (buf == NULL) {
806         buf = (char *) CmiAlloc(maxbytes);
807         _MEMCHECK(buf);
808     }
809
810
811 #if MPI_POST_RECV
812 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
813     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
814 #endif
815
816     START_EVENT();
817
818     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
819         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
820
821     /*END_EVENT(30);*/
822
823     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
824     msg = (char *) CmiAlloc(nbytes);
825     memcpy(msg, buf, nbytes);
826
827 #if CMK_SMP_TRACE_COMMTHREAD
828     traceBeginCommOp(msg);
829     traceChangeLastTimestamp(CpvAccess(projTraceStart));
830     traceEndCommOp(msg);
831 #if CMI_MPI_TRACE_MOREDETAILED
832     char tmp[32];
833     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
834     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
835 #endif
836 #endif
837
838     handleOneRecvedMsg(nbytes, msg);
839 }
840
841
842 #if CMK_SMP
843
844 /* called by communication thread in SMP */
845 static int SendMsgBuf() {
846     SMSG_LIST *msg_tmp;
847     char *msg;
848     int node, rank, size;
849     int i;
850     int sent = 0;
851
852 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
853     int sentCnt = 0;
854 #endif
855
856 #if CMI_DYNAMIC_EXERT_CAP
857     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
858 #endif
859
860     MACHSTATE(2,"SendMsgBuf begin {");
861 #if MULTI_SENDQUEUE
862     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
863         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
864             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
865 #else
866     /* single message sending queue */
867     /* CmiLock(sendMsgBufLock); */
868     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
869     /* CmiUnlock(sendMsgBufLock); */
870     while (NULL != msg_tmp) {
871 #endif
872             MPISendOneMsg(msg_tmp);
873             sent=1;
874
875 #if CMI_EXERT_SEND_CAP
876             if (++sentCnt == SEND_CAP) break;
877 #elif CMI_DYNAMIC_EXERT_CAP
878             if (++sentCnt >= dynamicSendCap) break;
879             if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
880                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
881 #endif
882
883 #if ! MULTI_SENDQUEUE
884             /* CmiLock(sendMsgBufLock); */
885             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
886             /* CmiUnlock(sendMsgBufLock); */
887 #endif
888         }
889 #if MULTI_SENDQUEUE
890     }
891 #endif
892     MACHSTATE(2,"}SendMsgBuf end ");
893     return sent;
894 }
895
896 static int MsgQueueEmpty() {
897     int i;
898 #if MULTI_SENDQUEUE
899     for (i=0; i<_Cmi_mynodesize; i++)
900         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
901 #else
902     return PCQueueEmpty(sendMsgBuf);
903 #endif
904     return 1;
905 }
906
907 /* test if all processors recv queues are empty */
908 static int RecvQueueEmpty() {
909     int i;
910     for (i=0; i<_Cmi_mynodesize; i++) {
911         CmiState cs=CmiGetStateN(i);
912         if (!PCQueueEmpty(cs->recv)) return 0;
913     }
914     return 1;
915 }
916
917
918 #define REPORT_COMM_METRICS 0
919 #if REPORT_COMM_METRICS
920 static double pumptime = 0.0;
921 static double releasetime = 0.0;
922 static double sendtime = 0.0;
923 #endif
924
925 #endif //end of CMK_SMP
926
927 static void AdvanceCommunicationForMPI(int whenidle) {
928 #if REPORT_COMM_METRICS
929     double t1, t2, t3, t4;
930     t1 = CmiWallTimer();
931 #endif
932
933 #if CMK_SMP
934     PumpMsgs();
935
936 #if REPORT_COMM_METRICS
937     t2 = CmiWallTimer();
938 #endif
939
940     CmiReleaseSentMessages();
941 #if REPORT_COMM_METRICS
942     t3 = CmiWallTimer();
943 #endif
944
945     SendMsgBuf();
946
947 #if REPORT_COMM_METRICS
948     t4 = CmiWallTimer();
949     pumptime += (t2-t1);
950     releasetime += (t3-t2);
951     sendtime += (t4-t3);
952 #endif
953
954 #else /* non-SMP case */
955     CmiReleaseSentMessages();
956
957 #if REPORT_COMM_METRICS
958     t2 = CmiWallTimer();
959 #endif
960     PumpMsgs();
961
962 #if REPORT_COMM_METRICS
963     t3 = CmiWallTimer();
964     pumptime += (t3-t2);
965     releasetime += (t2-t1);
966 #endif
967
968 #endif /* end of #if CMK_SMP */
969 }
970 /* ######End of functions related with communication progress ###### */
971
972 static void MachinePostNonLocalForMPI() {
973 #if !CMK_SMP
974     if (no_outstanding_sends) {
975         while (CpvAccess(MsgQueueLen)>0) {
976             AdvanceCommunicationForMPI(0);
977         }
978     }
979
980     /* FIXME: I don't think the following codes are needed because
981      * it repeats the same job of the next call of CmiGetNonLocal
982      */
983 #if 0
984     if (!msg) {
985         CmiReleaseSentMessages();
986         if (PumpMsgs())
987             return  PCQueuePop(cs->recv);
988         else
989             return 0;
990     }
991 #endif
992 #else
993   if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
994         CmiReleaseSentMessages();       
995         /* ??? SendMsgBuf is a not a thread-safe function. If it is put
996          * here and this function will be called in CmiNotifyStillIdle,
997          * then a data-race problem occurs */
998         /*SendMsgBuf();*/
999   }
1000 #endif
1001 }
1002
1003 /* Idle-state related functions: called in non-smp mode */
1004 void CmiNotifyIdleForMPI(void) {
1005     CmiReleaseSentMessages();
1006     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1007 }
1008
1009 /* Network progress function is used to poll the network when for
1010    messages. This flushes receive buffers on some  implementations*/
1011 #if CMK_MACHINE_PROGRESS_DEFINED
1012 void CmiMachineProgressImpl() {
1013 #if !CMK_SMP
1014     PumpMsgs();
1015 #if CMK_IMMEDIATE_MSG
1016     CmiHandleImmediate();
1017 #endif
1018 #else
1019     /*Not implemented yet. Communication server does not seem to be
1020       thread safe, so only communication thread call it */
1021     if (CmiMyRank() == CmiMyNodeSize())
1022         CommunicationServerThread(0);
1023 #endif
1024 }
1025 #endif
1026
1027 /* ######Beginning of functions related with exiting programs###### */
1028 void DrainResourcesForMPI() {
1029 #if !CMK_SMP
1030     while (!CmiAllAsyncMsgsSent()) {
1031         PumpMsgs();
1032         CmiReleaseSentMessages();
1033     }
1034 #else
1035     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
1036         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1037             CmiReleaseSentMessages();
1038             SendMsgBuf();
1039             PumpMsgs();
1040         }
1041     }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1042         while(!CmiAllAsyncMsgsSent()) {
1043             CmiReleaseSentMessages();
1044         }
1045     }
1046 #endif
1047 #if CMK_MEM_CHECKPOINT
1048     if (CmiMyPe() == 0) mpi_end_spare();
1049 #endif
1050     MACHSTATE(2, "Machine exit barrier begin {");
1051     START_EVENT();
1052     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1053         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
1054     END_EVENT(10);
1055     MACHSTATE(2, "} Machine exit barrier end");
1056 }
1057
1058 void MachineExitForMPI() {
1059     int i;
1060 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1061     int doPrint = 0;
1062     if (CmiMyNode()==0) doPrint = 1;
1063
1064     if (doPrint /*|| CmiMyNode()%11==0 */) {
1065 #if MPI_POST_RECV
1066         CmiPrintf("node[%d]: %llu posted receives,  %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1067 #endif
1068     }
1069 #endif
1070
1071 #if MPI_POST_RECV
1072     {
1073         MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
1074         if (ptr) {
1075             do {
1076                 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
1077                 ptr = ptr->next;
1078             } while (ptr!=CpvAccess(postRecvListHdr));
1079         }
1080     }
1081 #endif
1082
1083 #if REPORT_COMM_METRICS
1084 #if CMK_SMP
1085     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
1086               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
1087               pumptime, releasetime, sendtime);
1088 #else
1089     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
1090               CmiMyPe(), pumptime, releasetime, sendtime);
1091 #endif
1092 #endif
1093
1094 #if ! CMK_AUTOBUILD
1095     signal(SIGINT, signal_int);
1096     MPI_Finalize();
1097 #endif
1098     exit(0);
1099 }
1100
1101 static int machine_exit_idx;
1102 static void machine_exit(char *m) {
1103     EmergencyExit();
1104     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1105     fflush(stdout);
1106     CmiNodeBarrier();
1107     if (CmiMyRank() == 0) {
1108         MPI_Barrier(MPI_COMM_WORLD);
1109         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1110         MPI_Abort(MPI_COMM_WORLD, 1);
1111     } else {
1112         while (1) CmiYield();
1113     }
1114 }
1115
1116 static void KillOnAllSigs(int sigNo) {
1117     static int already_in_signal_handler = 0;
1118     char *m;
1119     if (already_in_signal_handler) return;   /* MPI_Abort(MPI_COMM_WORLD,1); */
1120     already_in_signal_handler = 1;
1121 #if CMK_CCS_AVAILABLE
1122     if (CpvAccess(cmiArgDebugFlag)) {
1123         CpdNotify(CPD_SIGNAL, sigNo);
1124         CpdFreeze();
1125     }
1126 #endif
1127     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1128              "Signal: %d\n",CmiMyPe(),sigNo);
1129     CmiPrintStackTrace(1);
1130
1131     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1132     CmiSetHandler(m, machine_exit_idx);
1133     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1134     machine_exit(m);
1135 }
1136 /* ######End of functions related with exiting programs###### */
1137
1138
1139 /* ######Beginning of functions related with starting programs###### */
1140 static void registerMPITraceEvents() {
1141 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1142     traceRegisterUserEvent("MPI_Barrier", 10);
1143     traceRegisterUserEvent("MPI_Send", 20);
1144     traceRegisterUserEvent("MPI_Recv", 30);
1145     traceRegisterUserEvent("MPI_Isend", 40);
1146     traceRegisterUserEvent("MPI_Irecv", 50);
1147     traceRegisterUserEvent("MPI_Test", 60);
1148     traceRegisterUserEvent("MPI_Iprobe", 70);
1149 #endif
1150 }
1151
1152 #if MACHINE_DEBUG_LOG
1153 FILE *debugLog = NULL;
1154 #endif
1155
1156 static char *thread_level_tostring(int thread_level) {
1157 #if CMK_MPI_INIT_THREAD
1158     switch (thread_level) {
1159     case MPI_THREAD_SINGLE:
1160         return "MPI_THREAD_SINGLE";
1161     case MPI_THREAD_FUNNELED:
1162         return "MPI_THREAD_FUNNELED";
1163     case MPI_THREAD_SERIALIZED:
1164         return "MPI_THREAD_SERIALIZED";
1165     case MPI_THREAD_MULTIPLE :
1166         return "MPI_THREAD_MULTIPLE";
1167     default: {
1168         char *str = (char*)malloc(5);
1169         sprintf(str,"%d", thread_level);
1170         return str;
1171     }
1172     }
1173     return  "unknown";
1174 #else
1175     char *str = (char*)malloc(5);
1176     sprintf(str,"%d", thread_level);
1177     return str;
1178 #endif
1179 }
1180
1181 /**
1182  *  Obtain the number of nodes, my node id, and consuming machine layer
1183  *  specific arguments
1184  */
1185 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1186     int n,i;
1187     int ver, subver;
1188     int provided;
1189     int thread_level;
1190     int myNID;
1191     int largc=*argc;
1192     char** largv=*argv;
1193
1194 #if MACHINE_DEBUG
1195     debugLog=NULL;
1196 #endif
1197 #if CMK_USE_HP_MAIN_FIX
1198 #if FOR_CPLUS
1199     _main(largc,largv);
1200 #endif
1201 #endif
1202
1203 #if CMK_SMP
1204     if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
1205       Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
1206     }
1207 #endif
1208
1209 #if CMK_MPI_INIT_THREAD
1210 #if CMK_SMP
1211     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
1212       thread_level = MPI_THREAD_FUNNELED;
1213     else
1214       thread_level = MPI_THREAD_MULTIPLE;
1215 #else
1216     thread_level = MPI_THREAD_SINGLE;
1217 #endif
1218     MPI_Init_thread(argc, argv, thread_level, &provided);
1219     _thread_provided = provided;
1220 #else
1221     MPI_Init(argc, argv);
1222     thread_level = 0;
1223     _thread_provided = -1;
1224 #endif
1225     largc = *argc;
1226     largv = *argv;
1227     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1228     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1229
1230     myNID = *myNodeID;
1231
1232 #if CMK_SMP
1233     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
1234         Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV; 
1235     }
1236 #endif
1237
1238     MPI_Get_version(&ver, &subver);
1239     if (myNID == 0) {
1240         printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
1241         printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
1242     }
1243
1244     {
1245         int debug = CmiGetArgFlag(largv,"++debug");
1246         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1247         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1248 #if CMK_HAS_GETPID
1249             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1250             fflush(stdout);
1251             if (!debug_no_pause)
1252                 sleep(15);
1253 #else
1254             printf("++debug ignored.\n");
1255 #endif
1256         }
1257     }
1258
1259
1260 #if CMK_MEM_CHECKPOINT
1261     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1262        CmiAssert(num_workpes <= *numNodes);
1263        total_pes = *numNodes;
1264        *numNodes = num_workpes;
1265     }
1266     else
1267        total_pes = num_workpes = *numNodes;
1268     if (*myNodeID == 0)
1269        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1270     petorank = (int *)malloc(sizeof(int) * num_workpes);
1271     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1272     nextrank = num_workpes;
1273
1274     if (*myNodeID >= num_workpes) {    /* is spare processor */
1275       MPI_Status sts;
1276       int vals[2];
1277       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
1278       int newpe = vals[0];
1279       CpvAccess(_curRestartPhase) = vals[1];
1280
1281       if (newpe == -1) {
1282           MPI_Barrier(MPI_COMM_WORLD);
1283           MPI_Finalize();
1284           exit(0);
1285       }
1286
1287       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1288         /* update petorank */
1289       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,MPI_COMM_WORLD, &sts);
1290       nextrank = *myNodeID + 1;
1291       *myNodeID = newpe;
1292       myNID = newpe;
1293
1294        /* add +restartaftercrash to argv */
1295       char *phase_str;
1296       char **restart_argv;
1297       int i=0;
1298       while(largv[i]!= NULL) i++;
1299       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1300       i=0;
1301       while(largv[i]!= NULL){
1302                 restart_argv[i] = largv[i];
1303                 i++;
1304       }
1305       restart_argv[i] = "+restartaftercrash";
1306       phase_str = (char*)malloc(10);
1307       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1308       restart_argv[i+1]=phase_str;
1309       restart_argv[i+2]=NULL;
1310       *argv = restart_argv;
1311       *argc = i+2;
1312       largc = *argc;
1313       largv = *argv;
1314     }
1315 #endif
1316
1317     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1318     if (idleblock && _Cmi_mynode == 0) {
1319         printf("Charm++: Running in idle blocking mode.\n");
1320     }
1321
1322 #if CMK_CHARMDEBUG
1323     /* setup signal handlers */
1324     signal(SIGSEGV, KillOnAllSigs);
1325     signal(SIGFPE, KillOnAllSigs);
1326     signal(SIGILL, KillOnAllSigs);
1327     signal_int = signal(SIGINT, KillOnAllSigs);
1328     signal(SIGTERM, KillOnAllSigs);
1329     signal(SIGABRT, KillOnAllSigs);
1330 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1331     signal(SIGQUIT, KillOnAllSigs);
1332     signal(SIGBUS, KillOnAllSigs);
1333 #   endif /*UNIX*/
1334 #endif
1335
1336 #if CMK_NO_OUTSTANDING_SENDS
1337     no_outstanding_sends=1;
1338 #endif
1339     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1340         no_outstanding_sends = 1;
1341         if (myNID == 0)
1342             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1343                    no_outstanding_sends?"":" not");
1344     }
1345
1346     request_max=MAX_QLEN;
1347     CmiGetArgInt(largv,"+requestmax",&request_max);
1348     /*printf("request max=%d\n", request_max);*/
1349
1350 #if MPI_POST_RECV
1351     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1352     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1353     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1354     CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
1355     CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
1356     CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
1357     CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
1358     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1359     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1360     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1361     if (myNID==0) {
1362         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
1363                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
1364                MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
1365     }
1366 #endif
1367
1368 #if CMI_DYNAMIC_EXERT_CAP
1369     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1370     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1371     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1372     if (myNID==0) {
1373         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1374                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1375     }
1376 #endif
1377
1378     /* checksum flag */
1379     if (CmiGetArgFlag(largv,"+checksum")) {
1380 #if CMK_ERROR_CHECKING
1381         checksum_flag = 1;
1382         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1383 #else
1384         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1385 #endif
1386     }
1387
1388     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1389     for (i=0; i<_Cmi_mynodesize+1; i++) {
1390 #if MULTI_SENDQUEUE
1391         procState[i].sendMsgBuf = PCQueueCreate();
1392 #endif
1393         procState[i].recvLock = CmiCreateLock();
1394     }
1395 #if CMK_SMP
1396 #if !MULTI_SENDQUEUE
1397     sendMsgBuf = PCQueueCreate();
1398     sendMsgBufLock = CmiCreateLock();
1399 #endif
1400 #endif
1401 }
1402
1403 static void MachinePreCommonInitForMPI(int everReturn) {
1404
1405 #if MPI_POST_RECV
1406     int doInit = 1;
1407     int i;
1408
1409 #if CMK_SMP
1410     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1411 #endif
1412
1413     /* Currently, in mpi smp, the main thread will be the comm thread, so
1414      *  only the comm thread should post recvs. Cpvs, however, need to be
1415      * created on rank 0 (the ptrs to the actual cpv memory), while
1416      * other ranks are busy waiting for this to finish. So cpv initialize
1417      * routines have to be called on every ranks, although they are only
1418      * useful on comm thread (whose rank is not zero) -Chao Mei
1419      */
1420     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1421     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1422     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1423     CpvInitialize(char **, CmiPostedRecvBuffers);
1424
1425     CpvAccess(CmiPostedRecvRequests) = NULL;
1426     CpvAccess(CmiPostedRecvBuffers) = NULL;
1427
1428     CpvInitialize(MPIPostRecvList *, postRecvListHdr);
1429     CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
1430     CpvInitialize(int, msgRecvCnt);
1431
1432     CpvAccess(postRecvListHdr) = NULL;
1433     CpvAccess(curPostRecvPtr) = NULL;
1434     CpvAccess(msgRecvCnt) = 0;
1435
1436 #if MPI_DYNAMIC_POST_RECV
1437     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1438 #endif
1439
1440     if (doInit) {
1441 #if MPI_DYNAMIC_POST_RECV
1442         MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
1443         /* including two more buckets that are out of the range [LOWERSIZE, UPPERSIZE] */
1444         MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
1445         CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1446         memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1447 #else
1448         /* Post some extra recvs to help out with incoming messages */
1449         /* On some MPIs the messages are unexpected and thus slow */
1450
1451         CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
1452
1453         /* An array of request handles for posted recvs */
1454         CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
1455         CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
1456         CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1457         /* An array of buffers for posted recvs */
1458         CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
1459         CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
1460         CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
1461
1462         /* Post Recvs */
1463         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1464             char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE); /* Note: could be aligned allocation?? */
1465             CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
1466             if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
1467                                          MPI_POST_RECV_SIZE,
1468                                          MPI_BYTE,
1469                                          MPI_ANY_SOURCE,
1470                                          POST_RECV_TAG,
1471                                          MPI_COMM_WORLD,
1472                                          CpvAccess(postRecvListHdr)->postedRecvReqs+i  ))
1473                 CmiAbort("MPI_Irecv failed\n");
1474         }
1475 #endif
1476     }
1477 #endif /* end of MPI_POST_RECV */
1478
1479 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
1480     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1481     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1482     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1483 #endif
1484 }
1485
1486 static void MachinePostCommonInitForMPI(int everReturn) {
1487
1488     CmiIdleState *s=CmiNotifyGetState();
1489
1490     CpvInitialize(SMSG_LIST *, sent_msgs);
1491     CpvInitialize(SMSG_LIST *, end_sent);
1492     CpvInitialize(int, MsgQueueLen);
1493     CpvAccess(sent_msgs) = NULL;
1494     CpvAccess(end_sent) = NULL;
1495     CpvAccess(MsgQueueLen) = 0;
1496
1497     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1498
1499 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1500     CpvInitialize(double, projTraceStart);
1501     /* only PE 0 needs to care about registration (to generate sts file). */
1502     if (CmiMyPe() == 0) {
1503         registerMachineUserEventsFunction(&registerMPITraceEvents);
1504     }
1505 #endif
1506
1507 #if CMK_SMP
1508     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1509     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1510     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV)
1511       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1512 #else
1513     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1514 #endif
1515
1516 #if MACHINE_DEBUG_LOG
1517     if (CmiMyRank() == 0) {
1518         char ln[200];
1519         sprintf(ln,"debugLog.%d",CmiMyNode());
1520         debugLog=fopen(ln,"w");
1521     }
1522 #endif
1523 }
1524 /* ######End of functions related with starting programs###### */
1525
1526 /***********************************************************************
1527  *
1528  * Abort function:
1529  *
1530  ************************************************************************/
1531
1532 void CmiAbort(const char *message) {
1533     char *m;
1534     /* if CharmDebug is attached simply try to send a message to it */
1535 #if CMK_CCS_AVAILABLE
1536     if (CpvAccess(cmiArgDebugFlag)) {
1537         CpdNotify(CPD_ABORT, message);
1538         CpdFreeze();
1539     }
1540 #endif
1541     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1542              "Reason: %s\n",CmiMyPe(),message);
1543     /*  CmiError(message); */
1544     CmiPrintStackTrace(0);
1545     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1546     CmiSetHandler(m, machine_exit_idx);
1547     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1548     machine_exit(m);
1549     /* Program never reaches here */
1550     MPI_Abort(MPI_COMM_WORLD, 1);
1551 }
1552
1553 /**************************  TIMER FUNCTIONS **************************/
1554 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1555
1556 /* MPI calls are not threadsafe, even the timer on some machines */
1557 static CmiNodeLock  timerLock = 0;
1558                                 static int _absoluteTime = 0;
1559                                                            static double starttimer = 0;
1560                                                                                       static int _is_global = 0;
1561
1562 int CmiTimerIsSynchronized() {
1563     int  flag;
1564     void *v;
1565
1566     /*  check if it using synchronized timer */
1567     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1568         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1569     if (flag) {
1570         _is_global = *(int*)v;
1571         if (_is_global && CmiMyPe() == 0)
1572             printf("Charm++> MPI timer is synchronized\n");
1573     }
1574     return _is_global;
1575 }
1576
1577 int CmiTimerAbsolute() {
1578     return _absoluteTime;
1579 }
1580
1581 double CmiStartTimer() {
1582     return 0.0;
1583 }
1584
1585 double CmiInitTime() {
1586     return starttimer;
1587 }
1588
1589 void CmiTimerInit(char **argv) {
1590     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1591     if (_absoluteTime && CmiMyPe() == 0)
1592         printf("Charm++> absolute MPI timer is used\n");
1593
1594 #if ! CMK_MEM_CHECKPOINT
1595     _is_global = CmiTimerIsSynchronized();
1596 #else
1597     _is_global = 0;
1598 #endif
1599
1600     if (_is_global) {
1601         if (CmiMyRank() == 0) {
1602             double minTimer;
1603 #if CMK_TIMER_USE_XT3_DCLOCK
1604             starttimer = dclock();
1605 #else
1606             starttimer = MPI_Wtime();
1607 #endif
1608
1609             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1610                           MPI_COMM_WORLD );
1611             starttimer = minTimer;
1612         }
1613     } else { /* we don't have a synchronous timer, set our own start time */
1614 #if ! CMK_MEM_CHECKPOINT
1615         CmiBarrier();
1616         CmiBarrier();
1617         CmiBarrier();
1618 #endif
1619 #if CMK_TIMER_USE_XT3_DCLOCK
1620         starttimer = dclock();
1621 #else
1622         starttimer = MPI_Wtime();
1623 #endif
1624     }
1625
1626 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1627     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1628         timerLock = CmiCreateLock();
1629 #endif
1630     CmiNodeAllBarrier();          /* for smp */
1631 }
1632
1633 /**
1634  * Since the timerLock is never created, and is
1635  * always NULL, then all the if-condition inside
1636  * the timer functions could be disabled right
1637  * now in the case of SMP. --Chao Mei
1638  */
1639 double CmiTimer(void) {
1640     double t;
1641 #if 0 && CMK_SMP
1642     if (timerLock) CmiLock(timerLock);
1643 #endif
1644
1645 #if CMK_TIMER_USE_XT3_DCLOCK
1646     t = dclock();
1647 #else
1648     t = MPI_Wtime();
1649 #endif
1650
1651 #if 0 && CMK_SMP
1652     if (timerLock) CmiUnlock(timerLock);
1653 #endif
1654
1655     return _absoluteTime?t: (t-starttimer);
1656 }
1657
1658 double CmiWallTimer(void) {
1659     double t;
1660 #if 0 && CMK_SMP
1661     if (timerLock) CmiLock(timerLock);
1662 #endif
1663
1664 #if CMK_TIMER_USE_XT3_DCLOCK
1665     t = dclock();
1666 #else
1667     t = MPI_Wtime();
1668 #endif
1669
1670 #if 0 && CMK_SMP
1671     if (timerLock) CmiUnlock(timerLock);
1672 #endif
1673
1674     return _absoluteTime? t: (t-starttimer);
1675 }
1676
1677 double CmiCpuTimer(void) {
1678     double t;
1679 #if 0 && CMK_SMP
1680     if (timerLock) CmiLock(timerLock);
1681 #endif
1682 #if CMK_TIMER_USE_XT3_DCLOCK
1683     t = dclock() - starttimer;
1684 #else
1685     t = MPI_Wtime() - starttimer;
1686 #endif
1687 #if 0 && CMK_SMP
1688     if (timerLock) CmiUnlock(timerLock);
1689 #endif
1690     return t;
1691 }
1692
1693 #endif     /* CMK_TIMER_USE_SPECIAL */
1694
1695 /************Barrier Related Functions****************/
1696 /* must be called on all ranks including comm thread in SMP */
1697 int CmiBarrier() {
1698 #if CMK_SMP
1699     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1700     CmiNodeAllBarrier();
1701     if (CmiMyRank() == CmiMyNodeSize())
1702 #else
1703     if (CmiMyRank() == 0)
1704 #endif
1705     {
1706         /**
1707          *  The call of CmiBarrier is usually before the initialization
1708          *  of trace module of Charm++, therefore, the START_EVENT
1709          *  and END_EVENT are disabled here. -Chao Mei
1710          */
1711         /*START_EVENT();*/
1712
1713         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1714             CmiAbort("Timernit: MPI_Barrier failed!\n");
1715
1716         /*END_EVENT(10);*/
1717     }
1718     CmiNodeAllBarrier();
1719     return 0;
1720 }
1721
1722 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1723 int CmiBarrierZero() {
1724     int i;
1725 #if CMK_SMP
1726     if (CmiMyRank() == CmiMyNodeSize())
1727 #else
1728     if (CmiMyRank() == 0)
1729 #endif
1730     {
1731         char msg[1];
1732         MPI_Status sts;
1733         if (CmiMyNode() == 0)  {
1734             for (i=0; i<CmiNumNodes()-1; i++) {
1735                 START_EVENT();
1736
1737                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1738                     CmiPrintf("MPI_Recv failed!\n");
1739
1740                 END_EVENT(30);
1741             }
1742         } else {
1743             START_EVENT();
1744
1745             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1746                 printf("MPI_Send failed!\n");
1747
1748             END_EVENT(20);
1749         }
1750     }
1751     CmiNodeAllBarrier();
1752     return 0;
1753 }
1754
1755
1756 #if CMK_MEM_CHECKPOINT
1757
1758 void mpi_restart_crashed(int pe, int rank)
1759 {
1760     int vals[2];
1761     vals[0] = pe;
1762     vals[1] = CpvAccess(_curRestartPhase)+1;
1763     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1764     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
1765 }
1766
1767 /* notify spare processors to exit */
1768 void mpi_end_spare()
1769 {
1770     int i;
1771     for (i=nextrank; i<total_pes; i++) {
1772         int vals[2] = {-1,-1};
1773         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,MPI_COMM_WORLD);
1774     }
1775 }
1776
1777 int find_spare_mpirank(int pe)
1778 {
1779     if (nextrank == total_pes) {
1780       CmiAbort("Charm++> No spare processor available.");
1781     }
1782     petorank[pe] = nextrank;
1783     nextrank++;
1784     return nextrank-1;
1785 }
1786
1787 void CkDieNow()
1788 {
1789     CmiPrintf("[%d] die now.\n", CmiMyPe());
1790
1791       /* release old messages */
1792     while (!CmiAllAsyncMsgsSent()) {
1793         PumpMsgs();
1794         CmiReleaseSentMessages();
1795     }
1796     MPI_Barrier(MPI_COMM_WORLD);
1797     MPI_Finalize();
1798     exit(0);
1799 }
1800
1801 #endif
1802
1803 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
1804 /* Functions related with capturing msg histogram */
1805
1806 #if MPI_DYNAMIC_POST_RECV
1807 /* Consume all messages in the request buffers */
1808 static void consumeAllMsgs()
1809 {
1810     MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
1811     if (ptr) {
1812         do {
1813             int i;
1814             for (i=0; i<ptr->bufCnt; i++) {
1815                 int done = 0;
1816                 MPI_Status sts;
1817
1818                 /* Indicating this entry has been tested before */
1819                 if (ptr->postedRecvBufs[i] == NULL) continue;
1820
1821                 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
1822                     CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
1823                 if (done) {
1824                     int nbytes;
1825                     char *msg;
1826                     if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
1827                         CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
1828                     /* ready to handle this msg */
1829                     msg = (ptr->postedRecvBufs)[i];
1830                     (ptr->postedRecvBufs)[i] = NULL;
1831
1832                     handleOneRecvedMsg(nbytes, msg);
1833                 } else {
1834                     if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
1835                         CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
1836                 }
1837             }
1838             ptr = ptr->next;
1839         } while (ptr != CpvAccess(curPostRecvPtr));
1840     }
1841 }
1842
1843 static void recordMsgHistogramInfo(int size, char *msg)
1844 {
1845     int idx = 0;
1846     size -= MPI_POST_RECV_LOWERSIZE;
1847     if (size > 0)
1848         idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
1849
1850     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
1851     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
1852 }
1853
1854 #define POST_RECV_USE_STATIC_PARAM 0
1855 #define POST_RECV_REPORT_STS 0
1856
1857 #if POST_RECV_REPORT_STS
1858 static int buildDynCallCnt = 0;
1859 #endif
1860
1861 static void buildDynamicRecvBuffers()
1862 {
1863     int i;
1864
1865     int local_MSG_CNT_THRESHOLD;
1866     int local_MSG_INC;
1867
1868 #if POST_RECV_REPORT_STS
1869     buildDynCallCnt++;
1870 #endif
1871
1872     /* For debugging usage */
1873     reportMsgHistogramInfo();
1874
1875     CpvAccess(msgRecvCnt) = 0;
1876     /* consume all outstanding msgs */
1877     consumeAllMsgs();
1878
1879 #if POST_RECV_USE_STATIC_PARAM
1880     local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
1881     local_MSG_INC = MPI_POST_RECV_MSG_INC;
1882 #else
1883     {
1884         int total = 0;
1885         int count = 0;
1886         for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
1887             int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
1888             /* avg is temporarily used for counting how many buckets are non-zero */
1889             if (tmp > 0)  {
1890                 total += tmp;
1891                 count++;
1892             }
1893         }
1894         if (count == 1) local_MSG_CNT_THRESHOLD = 1; /* Just filter out those zero-count msgs */
1895         else local_MSG_CNT_THRESHOLD = total / count /3; /* Catch >50% msgs NEED-BETTER-SCHEME HERE!!*/
1896         local_MSG_INC = total/count; /* Not having a good heuristic right now */
1897 #if POST_RECV_REPORT_STS
1898         printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
1899 #endif
1900     }
1901 #endif
1902
1903     /* First continue to find the first msg range that requires post recv */
1904     /* Ignore the fist and the last one because they are not tracked */
1905     MPIPostRecvList *newHdr = NULL;
1906     MPIPostRecvList *newListPtr = newHdr;
1907     MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
1908     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
1909         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
1910         if (count >= local_MSG_CNT_THRESHOLD) {
1911
1912 #if POST_RECV_REPORT_STS
1913             /* Report histogram results */
1914             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
1915             int high = low + MSG_HISTOGRAM_BINSIZE;
1916             int reportCnt;
1917             if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
1918             else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
1919             printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
1920 #endif
1921             /* find if this msg idx exists, the "i" is the msgSizeIdx, in the current list */
1922             int notFound = 1;
1923             MPIPostRecvList *newEntry = NULL;
1924             while (ptr) {
1925                 if (ptr->msgSizeIdx < i) {
1926                     /* free the buffer for this range of msg size */
1927                     MPIPostRecvList *nextptr = ptr->next;
1928
1929                     free(ptr->postedRecvReqs);
1930                     int j;
1931                     for (j=0; j<ptr->bufCnt; j++) {
1932                         if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
1933                     }
1934                     free(ptr->postedRecvBufs);
1935                     ptr = nextptr;
1936                 } else if (ptr->msgSizeIdx == i) {
1937                     int newBufCnt, j;
1938                     int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
1939                     newEntry = ptr;
1940                     /* Do some adjustment according to the current statistics */
1941                     if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
1942                     else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
1943                     if (newBufCnt != ptr->bufCnt) {
1944                         /* free old buffers, and allocate new buffers */
1945                         free(ptr->postedRecvReqs);
1946                         ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
1947                         for (j=0; j<ptr->bufCnt; j++) {
1948                             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
1949                         }
1950                         free(ptr->postedRecvBufs);
1951                         ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
1952                     }
1953
1954                     /* re-post those buffers */
1955                     ptr->bufCnt = newBufCnt;
1956                     for (j=0; j<ptr->bufCnt; j++) {
1957                         ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
1958                         if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
1959                                                      MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
1960                                                      MPI_COMM_WORLD, ptr->postedRecvReqs+j))
1961                             CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
1962                     }
1963
1964                     /* We already posted bufs for this range of msg size */
1965                     ptr = ptr->next;
1966                     /* Need to set ptr to NULL as the buf list comes to an end and the while loop exits */
1967                     if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
1968                     notFound = 0;
1969                     break;
1970                 } else {
1971                     /* The msgSizeIdx is larger than i */
1972                     break;
1973                 }
1974                 if (ptr == CpvAccess(postRecvListHdr)) {
1975                     ptr = NULL;
1976                     break;
1977                 }
1978             } /* end while(ptr): iterating the posted recv buffer list */
1979
1980             if (notFound) {
1981                 /* the current range of msg size is not found in the list */
1982                 int j;
1983                 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
1984                 newEntry = malloc(sizeof(MPIPostRecvList));
1985                 MPIPostRecvList *one = newEntry;
1986                 one->msgSizeIdx = i;
1987                 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
1988                 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
1989                 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
1990                 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
1991                 for (j=0; j<one->bufCnt; j++) {
1992                     one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
1993                     if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
1994                                                  MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
1995                                                  MPI_COMM_WORLD, one->postedRecvReqs+j))
1996                         CmiAbort("MPI_Irecv failed in recordMsgHistogramInfo!\n");
1997                 }
1998             } /* end if notFound */
1999
2000             /* Update the new list with the newEntry */
2001             CmiAssert(newEntry != NULL);
2002             if (newHdr == NULL) {
2003                 newHdr = newEntry;
2004                 newListPtr = newEntry;
2005                 newHdr->next = newHdr;
2006             } else {
2007                 newListPtr->next = newEntry;
2008                 newListPtr = newEntry;
2009                 newListPtr->next = newHdr;
2010             }
2011         } /* end if the count of this msg size range exceeds the threshold */
2012     } /* end for loop over the histogram buckets */
2013
2014     /* Free remaining entries in the list */
2015     while (ptr) {
2016         /* free the buffer for this range of msg size */
2017         MPIPostRecvList *nextptr = ptr->next;
2018
2019         free(ptr->postedRecvReqs);
2020         int j;
2021         for (j=0; j<ptr->bufCnt; j++) {
2022             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2023         }
2024         free(ptr->postedRecvBufs);
2025         ptr = nextptr;
2026         if (ptr == CpvAccess(postRecvListHdr)) break;
2027     }
2028
2029     CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
2030     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
2031 } /* end of function buildDynamicRecvBuffers */
2032
2033 static void examineMsgHistogramInfo(int size, char *msg)
2034 {
2035     int total = CpvAccess(msgRecvCnt)++;
2036     if (total < MPI_POST_RECV_FREQ) {
2037         recordMsgHistogramInfo(size, msg);
2038     } else {
2039         buildDynamicRecvBuffers();
2040     }
2041 }
2042 #else
2043 /* case when CAPTURE_MSG_HISTOGRAM is defined */
2044 static void recordMsgHistogramInfo(int size, char *msg)
2045 {
2046     int idx = size/MSG_HISTOGRAM_BINSIZE;
2047     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
2048     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
2049 }
2050 #endif /* end of MPI_DYNAMIC_POST_RECV */
2051
2052 static void reportMsgHistogramInfo()
2053 {
2054 #if MPI_DYNAMIC_POST_RECV
2055     int i, count;
2056     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
2057     if (count > 0) {
2058         printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
2059     }
2060     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2061         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2062         if (count > 0) {
2063             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2064             int high = low + MSG_HISTOGRAM_BINSIZE;
2065             printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
2066         }
2067     }
2068     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
2069     if (count > 0) {
2070         printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
2071     }
2072 #else
2073     int i;
2074     for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
2075         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2076         if (count > 0) {
2077             int low = i*MSG_HISTOGRAM_BINSIZE;
2078             int high = low + MSG_HISTOGRAM_BINSIZE;
2079             printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
2080         }
2081     }
2082 #endif
2083 }
2084 #endif /* end of CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV */
2085
2086 void CmiSetupMachineRecvBuffersUser()
2087 {
2088 #if MPI_DYNAMIC_POST_RECV
2089     buildDynamicRecvBuffers();
2090 #endif
2091 }
2092
2093
2094 /*@}*/
2095