replacing MPI_COMM_WORLD with charmComm on mpi layer
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59 static MPI_Comm charmComm;
60
61 #if CMI_EXERT_SEND_CAP
62 #define SEND_CAP 3
63 #endif
64
65 #if CMI_EXERT_RECV_CAP
66 #define RECV_CAP 2
67 #endif
68 /* ###End of flow control related macros ### */
69
70 /* ###Beginning of machine-layer-tracing related macros ### */
71 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
72 #define CMI_MPI_TRACE_MOREDETAILED 0
73 #undef CMI_MPI_TRACE_USEREVENTS
74 #define CMI_MPI_TRACE_USEREVENTS 1
75 #else
76 #undef CMK_SMP_TRACE_COMMTHREAD
77 #define CMK_SMP_TRACE_COMMTHREAD 0
78 #endif
79
80 #define CMK_TRACE_COMMOVERHEAD 0
81 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_TRACE_COMMOVERHEAD
86 #define CMK_TRACE_COMMOVERHEAD 0
87 #endif
88
89 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
90 CpvStaticDeclare(double, projTraceStart);
91 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
92 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
93 #else
94 #define  START_EVENT()
95 #define  END_EVENT(x)
96 #endif
97
98 #if CMK_SMP_TRACE_COMMTHREAD
99 #define START_TRACE_SENDCOMM(msg)  \
100                         int isTraceEligible = traceBeginCommOp(msg); \
101                         if(isTraceEligible) traceSendMsgComm(msg);
102 #define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
103 #define START_TRACE_RECVCOMM(msg) CpvAccess(projTraceStart) = CmiWallTimer();
104 #define END_TRACE_RECVCOMM(msg) \
105                         if(traceBeginCommOp(msg)){ \
106                             traceChangeLastTimestamp(CpvAccess(projTraceStart)); \
107                             traceSendMsgComm(msg); \
108                             traceEndCommOp(msg); \
109                         }
110 #define CONDITIONAL_TRACE_USER_EVENT(x) \
111                         do{ \
112                             double etime = CmiWallTimer(); \
113                             if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
114                                 traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
115                             }\
116                         }while(0);
117 #else
118 #define START_TRACE_SENDCOMM(msg)
119 #define END_TRACE_SENDCOMM(msg)
120 #define START_TRACE_RECVCOMM(msg)
121 #define END_TRACE_RECVCOMM(msg)
122 #define CONDITIONAL_TRACE_USER_EVENT(x)
123 #endif
124
125 /* ###End of machine-layer-tracing related macros ### */
126
127 /* ###Beginning of POST_RECV related macros ### */
128 /*
129  * If MPI_POST_RECV is defined, we provide default values for
130  * size and number of posted recieves. If MPI_POST_RECV_COUNT
131  * is set then a default value for MPI_POST_RECV_SIZE is used
132  * if not specified by the user.
133  */
134 #define MPI_POST_RECV 0
135
136 /* Making those parameters configurable for testing them easily */
137
138 #if MPI_POST_RECV
139 #define MPI_DYNAMIC_POST_RECV 0
140
141 /* Note the tag offset of a msg is determined by
142  * (its size - MPI_RECV_LOWERSIZE)/MPI_POST_RECV_INC.
143  * based on POST_RECV_TAG.
144  */
145 static int MPI_POST_RECV_COUNT=10;
146
147 /* The range of msgs to be tracked for histogramming */
148 static int MPI_POST_RECV_LOWERSIZE=8000;
149 static int MPI_POST_RECV_UPPERSIZE=64000;
150
151 /* The increment of msg size to be tracked, i.e. the histogram bucket size */
152 static int MPI_POST_RECV_INC = 1000;
153
154 /* The unit increment of msg cnt for increase #buf for a post recved msg */
155 static int MPI_POST_RECV_MSG_INC = 400;
156
157 /* If the #msg exceeds this value, post recv is created for such msg */
158 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
159
160 /* The frequency of checking the existing posted recv buffers in the unit of #msgs */
161 static int MPI_POST_RECV_FREQ = 1000;
162
163 static int MPI_POST_RECV_SIZE;
164
165 typedef struct mpiPostRecvList {
166     /* POST_RECV_TAG + msgSizeIdx is the recv tag;
167      * Based on this value, this buf corresponds to msg size ranging
168      * [msgSizeIdx*MPI_POST_RECV_INC, (msgSizeIdx+1)*MPI_POST_RECV_INC)
169      */
170     int msgSizeIdx;
171     int bufCnt;
172     MPI_Request *postedRecvReqs;
173     char **postedRecvBufs;
174     struct mpiPostRecvList *next;
175 } MPIPostRecvList;
176 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
177 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
178 CpvDeclare(int, msgRecvCnt);
179
180 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
181 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
182 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
183 CpvDeclare(char**,CmiPostedRecvBuffers);
184
185 /* Note: currently MPI doesn't provide a function whether a request is in progress.
186  * For example, a irecv has been filled partially. Then a call to MPI_Test still returns
187  * indicating it has not been finished. If only relying on this result, then calling
188  * MPI_Cancel will result in a loss of this msg. The dynamic post recv mechanism
189  * can only be safely used in a synchronized point such as load balancing.
190  */
191 #if MPI_DYNAMIC_POST_RECV
192 static int MSG_HISTOGRAM_BINSIZE;
193 static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
194 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
195 static void recordMsgHistogramInfo(int size);
196 static void reportMsgHistogramInfo();
197 #endif /* end of MPI_DYNAMIC_POST_RECV defined */
198
199 #endif /* end of MPI_POST_RECV defined */
200
201 /* Defining this macro will use MPI_Irecv instead of MPI_Recv for
202  * large messages. This could save synchronization overhead caused by
203  * the rzv protocol used by MPI
204  */
205 #define USE_ASYNC_RECV_FUNC 0
206
207 #ifdef USE_ASYNC_RECV_FUNC
208 static int IRECV_MSG_THRESHOLD = 8000;
209 typedef struct IRecvListEntry{
210     MPI_Request req;
211     char *msg;
212     int size;
213     struct IRecvListEntry *next;
214 }*IRecvList;
215
216 static IRecvList freedIrecvList = NULL; /* used to recycle the entries */
217 static IRecvList waitIrecvListHead = NULL; /* points to the guardian entry, i.e., the next of it points to the first entry */
218 static IRecvList waitIrecvListTail = NULL; /* points to the last entry */
219
220 static IRecvList irecvListEntryAllocate(){
221     IRecvList ret;
222     if(freedIrecvList == NULL) {
223         ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));        
224         return ret;
225     } else {
226         ret = freedIrecvList;
227         freedIrecvList = freedIrecvList->next;
228         return ret;
229     }
230 }
231 static void irecvListEntryFree(IRecvList used){
232     used->next = freedIrecvList;
233     freedIrecvList = used;
234 }
235
236 #endif /* end of USE_ASYNC_RECV_FUNC */
237
238 /* Providing functions for external usage to set up the dynamic recv buffer
239  * when the user is aware that it's safe to call such function
240  */
241 void CmiSetupMachineRecvBuffers();
242
243 #define CAPTURE_MSG_HISTOGRAM 0
244 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
245 static int MSG_HISTOGRAM_BINSIZE=1000;
246 static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
247 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
248 static void recordMsgHistogramInfo(int size);
249 static void reportMsgHistogramInfo();
250 #endif
251
252 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
253 #define TAG     1375
254 #if MPI_POST_RECV
255 #define POST_RECV_TAG       (TAG+1)
256 #define BARRIER_ZERO_TAG  TAG
257 #else
258 #define BARRIER_ZERO_TAG   (TAG-1)
259 #endif
260 /* ###End of POST_RECV related related macros ### */
261
262 #if CMK_BLUEGENEL
263 #define MAX_QLEN 8
264 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
265 #else
266 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
267 #define MAX_QLEN 200
268 #endif
269 /* =======End of Definitions of Performance-Specific Macros =======*/
270
271
272 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
273 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
274 #define CHARM_MAGIC_NUMBER               126
275
276 #if CMK_ERROR_CHECKING
277 extern unsigned char computeCheckSum(unsigned char *data, int len);
278 static int checksum_flag = 0;
279 #define CMI_SET_CHECKSUM(msg, len)      \
280         if (checksum_flag)  {   \
281           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
282           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
283         }
284 #define CMI_CHECK_CHECKSUM(msg, len)    \
285         if (checksum_flag)      \
286           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
287             CmiAbort("Fatal error: checksum doesn't agree!\n");
288 #else
289 #define CMI_SET_CHECKSUM(msg, len)
290 #define CMI_CHECK_CHECKSUM(msg, len)
291 #endif
292 /* =====End of Definitions of Message-Corruption Related Macros=====*/
293
294 /* =====Beginning of Declarations of Machine Specific Variables===== */
295 #include <signal.h>
296 void (*signal_int)(int);
297
298 static int _thread_provided = -1; /* Indicating MPI thread level */
299 static int idleblock = 0;
300
301 /* A simple list for msgs that have been sent by MPI_Isend */
302 typedef struct msg_list {
303     char *msg;
304     struct msg_list *next;
305     int size, destpe, mode;
306     MPI_Request req;
307 } SMSG_LIST;
308
309 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
310 CpvStaticDeclare(SMSG_LIST *, end_sent);
311
312 CpvStaticDeclare(int, MsgQueueLen);
313 static int request_max;
314 /*FLAG: consume outstanding Isends in scheduler loop*/
315 static int no_outstanding_sends=0;
316
317 #if NODE_0_IS_CONVHOST
318 int inside_comm = 0;
319 #endif
320
321 typedef struct ProcState {
322 #if MULTI_SENDQUEUE
323     PCQueue      sendMsgBuf;       /* per processor message sending queue */
324 #endif
325     CmiNodeLock  recvLock;                  /* for cs->recv */
326 } ProcState;
327 static ProcState  *procState;
328
329 #if CMK_SMP && !MULTI_SENDQUEUE
330 static PCQueue sendMsgBuf;
331 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
332 #endif
333 /* =====End of Declarations of Machine Specific Variables===== */
334
335 #if CMK_MEM_CHECKPOINT
336 #define FAIL_TAG   1200
337 int num_workpes, total_pes;
338 int *petorank = NULL;
339 int  nextrank;
340 void mpi_end_spare();
341 #endif
342
343 /* =====Beginning of Declarations of Machine Specific Functions===== */
344 /* Utility functions */
345 #if CMK_BLUEGENEL
346 extern void MPID_Progress_test();
347 #endif
348 static size_t CmiAllAsyncMsgsSent(void);
349 static void CmiReleaseSentMessages(void);
350 static int PumpMsgs(void);
351 static void PumpMsgsBlocking(void);
352
353 #if CMK_SMP
354 static int MsgQueueEmpty();
355 static int RecvQueueEmpty();
356 static int SendMsgBuf();
357 static  void EnqueueMsg(void *m, int size, int node, int mode);
358 #endif
359
360 /* The machine-specific send function */
361 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
362 #define LrtsSendFunc MachineSpecificSendForMPI
363
364 /* ### Beginning of Machine-startup Related Functions ### */
365 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID);
366 #define LrtsInit MachineInitForMPI
367
368 static void MachinePreCommonInitForMPI(int everReturn);
369 static void MachinePostCommonInitForMPI(int everReturn);
370 #define LrtsPreCommonInit MachinePreCommonInitForMPI
371 #define LrtsPostCommonInit MachinePostCommonInitForMPI
372 /* ### End of Machine-startup Related Functions ### */
373
374 /* ### Beginning of Machine-running Related Functions ### */
375 static void AdvanceCommunicationForMPI(int whenidle);
376 #define LrtsAdvanceCommunication AdvanceCommunicationForMPI
377
378 static void DrainResourcesForMPI(); /* used when exit */
379 #define LrtsDrainResources DrainResourcesForMPI
380
381 static void MachineExitForMPI();
382 #define LrtsExit MachineExitForMPI
383 /* ### End of Machine-running Related Functions ### */
384
385 /* ### Beginning of Idle-state Related Functions ### */
386 void CmiNotifyIdleForMPI(void);
387 /* ### End of Idle-state Related Functions ### */
388
389 static void MachinePostNonLocalForMPI();
390 #define LrtsPostNonLocal MachinePostNonLocalForMPI
391
392 /* =====End of Declarations of Machine Specific Functions===== */
393
394 /**
395  *  Macros that overwrites the common codes, such as
396  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
397  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
398  *  CMK_OFFLOAD_BCAST_PROCESS etc.
399  */
400 #define CMK_HAS_SIZE_IN_MSGHDR 0
401 #include "machine-lrts.h"
402 #include "machine-common-core.c"
403
404 /* The machine specific msg-sending function */
405
406 #if CMK_SMP
407 static void EnqueueMsg(void *m, int size, int node, int mode) {
408     /*SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
409     SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
410     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
411     msg_tmp->msg = m;
412     msg_tmp->size = size;
413     msg_tmp->destpe = node;
414     msg_tmp->next = 0;
415     msg_tmp->mode = mode;
416
417 #if MULTI_SENDQUEUE
418     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
419 #else
420     /*CmiLock(sendMsgBufLock);*/
421     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
422     /*CmiUnlock(sendMsgBufLock);*/
423 #endif
424
425     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
426 }
427 #endif
428
429 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
430 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
431     int node = smsg->destpe;
432     int size = smsg->size;
433     char *msg = smsg->msg;
434     int mode = smsg->mode;
435     int dstrank;
436
437     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
438 #if CMK_ERROR_CHECKING
439     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
440     CMI_SET_CHECKSUM(msg, size);
441 #endif
442
443 #if MPI_POST_RECV
444     if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
445 #if MPI_DYNAMIC_POST_RECV
446         int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
447         START_TRACE_SENDCOMM(msg);
448         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,charmComm,&(smsg->req)))
449             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
450         END_TRACE_SENDCOMM(msg);
451 #else
452         START_TRACE_SENDCOMM(msg);
453         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,charmComm,&(smsg->req)))
454             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
455         END_TRACE_SENDCOMM(msg);
456 #endif
457     } else {
458         START_TRACE_SENDCOMM(msg);
459             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
460             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
461         END_TRACE_SENDCOMM(msg);
462     }
463 #else
464 /* branch not using MPI_POST_RECV */
465
466 #if CMK_MEM_CHECKPOINT
467         dstrank = petorank[node];
468 #else
469         dstrank=node;
470 #endif
471     START_TRACE_SENDCOMM(msg)
472     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
473         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
474     END_TRACE_SENDCOMM(msg)
475 #endif /* end of #if MPI_POST_RECV */
476
477     MACHSTATE(3,"}MPI_Isend end");
478     CpvAccess(MsgQueueLen)++;
479     if (CpvAccess(sent_msgs)==0)
480         CpvAccess(sent_msgs) = smsg;
481     else
482         CpvAccess(end_sent)->next = smsg;
483     CpvAccess(end_sent) = smsg;
484
485 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
486     if (mode == P2P_SYNC || mode == P2P_ASYNC)
487     {
488     while (CpvAccess(MsgQueueLen) > request_max) {
489         CmiReleaseSentMessages();
490         PumpMsgs();
491     }
492     }
493 #endif
494
495     return (CmiCommHandle) &(smsg->req);
496 }
497
498 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
499     /* Ignoring the mode for MPI layer */
500
501     CmiState cs = CmiGetState();
502     SMSG_LIST *msg_tmp;
503     int  rank;
504
505     CmiAssert(destNode != CmiMyNode());
506 #if CMK_SMP
507     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
508       EnqueueMsg(msg, size, destNode, mode);
509       return 0;
510     }
511 #endif
512     /* non smp */
513     /*msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
514     msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
515     msg_tmp->msg = msg;
516     msg_tmp->destpe = destNode;
517     msg_tmp->size = size;
518     msg_tmp->next = 0;
519     msg_tmp->mode = mode;
520     return MPISendOneMsg(msg_tmp);
521 }
522
523 static size_t CmiAllAsyncMsgsSent(void) {
524     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
525     MPI_Status sts;
526     int done;
527
528     while (msg_tmp!=0) {
529         done = 0;
530         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
531             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
532         if (!done)
533             return 0;
534         msg_tmp = msg_tmp->next;
535         /*    MsgQueueLen--; ????? */
536     }
537     return 1;
538 }
539
540 int CmiAsyncMsgSent(CmiCommHandle c) {
541
542     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
543     int done;
544     MPI_Status sts;
545
546     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
547         msg_tmp = msg_tmp->next;
548     if (msg_tmp) {
549         done = 0;
550         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
551             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
552         return ((done)?1:0);
553     } else {
554         return 1;
555     }
556 }
557
558 void CmiReleaseCommHandle(CmiCommHandle c) {
559     return;
560 }
561
562 /* ######Beginning of functions related with communication progress ###### */
563 static void CmiReleaseSentMessages(void) {
564     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
565     SMSG_LIST *prev=0;
566     SMSG_LIST *temp;
567     int done;
568     MPI_Status sts;
569
570 #if CMK_BLUEGENEL
571     MPID_Progress_test();
572 #endif
573
574     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
575     while (msg_tmp!=0) {
576         done =0;
577 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
578         double startT = CmiWallTimer();
579 #endif
580         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
581             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
582         if (done) {
583             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
584             CpvAccess(MsgQueueLen)--;
585             /* Release the message */
586             temp = msg_tmp->next;
587             if (prev==0) /* first message */
588                 CpvAccess(sent_msgs) = temp;
589             else
590                 prev->next = temp;
591             CmiFree(msg_tmp->msg);
592             /* CmiFree(msg_tmp); */
593             free(msg_tmp);
594             msg_tmp = temp;
595         } else {
596             prev = msg_tmp;
597             msg_tmp = msg_tmp->next;
598         }
599 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
600         {
601             double endT = CmiWallTimer();
602             /* only record the event if it takes more than 1ms */
603             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
604         }
605 #endif
606     }
607     CpvAccess(end_sent) = prev;
608     MACHSTATE(2,"} CmiReleaseSentMessages end");
609 }
610
611 static int PumpMsgs(void) {
612     int nbytes, flg, res;
613     char *msg;
614     MPI_Status sts;
615     int recd=0;
616
617 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
618     int recvCnt=0;
619 #endif
620
621 #if CMK_BLUEGENEL
622     MPID_Progress_test();
623 #endif
624
625     MACHSTATE(2,"PumpMsgs begin {");
626
627 #if CMI_DYNAMIC_EXERT_CAP
628     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
629 #endif
630
631     while (1) {
632         int doSyncRecv = 1;
633 #if CMI_EXERT_RECV_CAP
634         if (recvCnt==RECV_CAP) break;
635 #elif CMI_DYNAMIC_EXERT_CAP
636         if (recvCnt >= dynamicRecvCap) break;
637 #endif
638
639         START_TRACE_RECVCOMM(NULL);
640
641         /* First check posted recvs then do  probe unmatched outstanding messages */
642 #if MPI_POST_RECV
643         MPIPostRecvList *postedOne = NULL;
644         int completed_index = -1;
645         flg = 0;
646 #if MPI_DYNAMIC_POST_RECV
647         MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
648         if (oldPostRecvPtr) {
649             /* post recv buf inited */
650             do {
651                 /* round-robin iteration over the list */
652                 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
653                 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
654                     CmiAbort("PumpMsgs: MPI_Testany failed!\n");
655
656                 if (flg) {
657                     postedOne = cur;
658                     break;
659                 }
660                 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
661             } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
662         }
663 #else
664         MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
665         if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
666             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
667 #endif
668         CONDITIONAL_TRACE_USER_EVENT(60); /* MPI_Test related user event */
669         if (flg) {
670             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
671                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
672
673             recd = 1;
674 #if !MPI_DYNAMIC_POST_RECV
675             postedOne = CpvAccess(curPostRecvPtr);
676 #endif
677             msg = (postedOne->postedRecvBufs)[completed_index];
678             (postedOne->postedRecvBufs)[completed_index] = NULL;
679
680             CpvAccess(Cmi_posted_recv_total)++;
681         } else {
682             START_EVENT();
683             res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, charmComm, &flg, &sts);
684             if (res != MPI_SUCCESS)
685                 CmiAbort("MPI_Iprobe failed\n");
686             if (!flg) break;
687             
688             CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
689             recd = 1;
690             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
691             msg = (char *) CmiAlloc(nbytes);
692
693 #if USE_ASYNC_RECV_FUNC
694             if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
695 #endif            
696             if(doSyncRecv){
697                 START_EVENT();
698                 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
699                     CmiAbort("PumpMsgs: MPI_Recv failed!\n");                
700             }
701 #if USE_ASYNC_RECV_FUNC        
702             else {
703                 START_EVENT();
704                 IRecvList one = irecvListEntryAllocate();
705                 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req));
706                     CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
707                 one->msg = msg;
708                 one->size = nbytes;
709                 one->next = NULL;
710                 waitIrecvListTail->next = one;
711                 CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
712             }
713 #endif
714             CpvAccess(Cmi_unposted_recv_total)++;
715         }
716 #else
717         /* Original version */
718         START_EVENT();
719         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
720         if (res != MPI_SUCCESS)
721             CmiAbort("MPI_Iprobe failed\n");
722
723         if (!flg) break;
724         CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
725         
726         recd = 1;
727         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
728         msg = (char *) CmiAlloc(nbytes);
729
730 #if USE_ASYNC_RECV_FUNC
731         if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
732 #endif        
733         if(doSyncRecv){
734             START_EVENT();
735             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
736                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
737         }
738 #if USE_ASYNC_RECV_FUNC        
739         else {
740             START_EVENT();
741             IRecvList one = irecvListEntryAllocate();
742             if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
743                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
744             one->msg = msg;
745             one->size = nbytes;
746             one->next = NULL;
747             waitIrecvListTail->next = one;
748             waitIrecvListTail = one;
749             /*printf("PE[%d]: MPI_Irecv msg=%p, size=%d, entry=%p\n", CmiMyPe(), msg, nbytes, one);*/
750             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
751         }
752 #endif
753
754 #endif /*end of not MPI_POST_RECV */
755
756         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
757         CMI_CHECK_CHECKSUM(msg, nbytes);
758 #if CMK_ERROR_CHECKING
759         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
760             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
761             CmiFree(msg);
762             CmiAbort("Abort!\n");
763             continue;
764         }
765 #endif
766
767         if(doSyncRecv){
768             END_TRACE_RECVCOMM(msg);
769             handleOneRecvedMsg(nbytes, msg);
770         }
771         
772 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
773         recordMsgHistogramInfo(nbytes);
774 #endif
775
776 #if  MPI_POST_RECV
777 #if MPI_DYNAMIC_POST_RECV
778         if (postedOne) {
779             //printf("[%d]: get one posted recv\n", CmiMyPe());
780             /* Get the upper size of this buffer */
781             int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
782             int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
783             /* Has to re-allocate the buffer for the message */
784             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
785
786             /* and repost the recv */
787             START_EVENT();
788
789             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
790                                          postRecvBufSize,
791                                          MPI_BYTE,
792                                          MPI_ANY_SOURCE,
793                                          postRecvTag,
794                                          charmComm,
795                                          &((postedOne->postedRecvReqs)[completed_index])  ))
796                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
797             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
798         }
799 #else
800         if (postedOne) {
801             /* Has to re-allocate the buffer for the message */
802             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
803
804             /* and repost the recv */
805             START_EVENT();
806             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
807                                          MPI_POST_RECV_SIZE,
808                                          MPI_BYTE,
809                                          MPI_ANY_SOURCE,
810                                          POST_RECV_TAG,
811                                          charmComm,
812                                          &((postedOne->postedRecvReqs)[completed_index])  ))
813                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
814             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
815         }
816 #endif /* not MPI_DYNAMIC_POST_RECV */
817 #endif
818
819 #if CMI_EXERT_RECV_CAP
820         recvCnt++;
821 #elif CMI_DYNAMIC_EXERT_CAP
822         recvCnt++;
823 #if CMK_SMP
824         /* check sendMsgBuf to get the  number of messages that have not been sent
825              * which is only available in SMP mode
826          * MsgQueueLen indicates the number of messages that have not been released
827              * by MPI
828              */
829         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
830                 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
831             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
832         }
833 #else
834         /* MsgQueueLen indicates the number of messages that have not been released
835              * by MPI
836              */
837         if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
838             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
839         }
840 #endif
841
842 #endif
843
844     }
845
846 #if USE_ASYNC_RECV_FUNC
847 /* Another loop to check the irecved msgs list */
848 {
849     IRecvList irecvEnt;
850     int irecvDone = 0;
851     MPI_Status sts;
852     while(waitIrecvListHead->next) {
853         IRecvList irecvEnt = waitIrecvListHead->next;
854
855         START_EVENT();
856                 
857         /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
858         if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
859             CmiAbort("PumpMsgs: MPI_Test failed!\n");
860         if(!irecvDone) break; /* in-order recv */
861
862         END_TRACE_RECVCOMM((irecvEnt->msg));
863         /*printf("PE[%d]: irecv entry=%p finished with size=%d, msg=%p\n", CmiMyPe(), irecvEnt, irecvEnt->size, irecvEnt->msg);*/
864         
865         handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
866         waitIrecvListHead->next = irecvEnt->next;
867         irecvListEntryFree(irecvEnt);
868         recd = 1;        
869     }
870     if(waitIrecvListHead->next == NULL)
871         waitIrecvListTail = waitIrecvListHead;
872 }
873 #endif
874
875
876     MACHSTATE(2,"} PumpMsgs end ");
877     return recd;
878 }
879
880 /* blocking version */
881 static void PumpMsgsBlocking(void) {
882     static int maxbytes = 20000000;
883     static char *buf = NULL;
884     int nbytes, flg;
885     MPI_Status sts;
886     char *msg;
887     int recd=0;
888
889     if (!PCQueueEmpty(CmiGetState()->recv)) return;
890     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
891     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
892     if (CpvAccess(sent_msgs))  return;
893
894 #if 0
895     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
896 #endif
897
898     if (buf == NULL) {
899         buf = (char *) CmiAlloc(maxbytes);
900         _MEMCHECK(buf);
901     }
902
903
904 #if MPI_POST_RECV
905 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
906     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
907 #endif
908
909     START_TRACE_RECVCOMM(NULL);
910     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, charmComm,&sts))
911         CmiAbort("PumpMsgs: PMP_Recv failed!\n");    
912
913     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
914     msg = (char *) CmiAlloc(nbytes);
915     memcpy(msg, buf, nbytes);
916     END_TRACE_RECVCOMM(msg);
917
918 #if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
919     char tmp[32];
920     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
921     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
922 #endif
923
924     handleOneRecvedMsg(nbytes, msg);
925 }
926
927
928 #if CMK_SMP
929
930 /* called by communication thread in SMP */
931 static int SendMsgBuf() {
932     SMSG_LIST *msg_tmp;
933     char *msg;
934     int node, rank, size;
935     int i;
936     int sent = 0;
937
938 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
939     int sentCnt = 0;
940 #endif
941
942 #if CMI_DYNAMIC_EXERT_CAP
943     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
944 #endif
945
946     MACHSTATE(2,"SendMsgBuf begin {");
947 #if MULTI_SENDQUEUE
948     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
949         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
950             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
951 #else
952     /* single message sending queue */
953     /* CmiLock(sendMsgBufLock); */
954     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
955     /* CmiUnlock(sendMsgBufLock); */
956     while (NULL != msg_tmp) {
957 #endif
958             MPISendOneMsg(msg_tmp);
959             sent=1;
960
961 #if CMI_EXERT_SEND_CAP
962             if (++sentCnt == SEND_CAP) break;
963 #elif CMI_DYNAMIC_EXERT_CAP
964             if (++sentCnt >= dynamicSendCap) break;
965             if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
966                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
967 #endif
968
969 #if ! MULTI_SENDQUEUE
970             /* CmiLock(sendMsgBufLock); */
971             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
972             /* CmiUnlock(sendMsgBufLock); */
973 #endif
974         }
975 #if MULTI_SENDQUEUE
976     }
977 #endif
978     MACHSTATE(2,"}SendMsgBuf end ");
979     return sent;
980 }
981
982 static int MsgQueueEmpty() {
983     int i;
984 #if MULTI_SENDQUEUE
985     for (i=0; i<_Cmi_mynodesize; i++)
986         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
987 #else
988     return PCQueueEmpty(sendMsgBuf);
989 #endif
990     return 1;
991 }
992
993 /* test if all processors recv queues are empty */
994 static int RecvQueueEmpty() {
995     int i;
996     for (i=0; i<_Cmi_mynodesize; i++) {
997         CmiState cs=CmiGetStateN(i);
998         if (!PCQueueEmpty(cs->recv)) return 0;
999     }
1000     return 1;
1001 }
1002
1003
1004 #define REPORT_COMM_METRICS 0
1005 #if REPORT_COMM_METRICS
1006 static double pumptime = 0.0;
1007 static double releasetime = 0.0;
1008 static double sendtime = 0.0;
1009 #endif
1010
1011 #endif //end of CMK_SMP
1012
1013 static void AdvanceCommunicationForMPI(int whenidle) {
1014 #if REPORT_COMM_METRICS
1015     double t1, t2, t3, t4;
1016     t1 = CmiWallTimer();
1017 #endif
1018
1019 #if CMK_SMP
1020     PumpMsgs();
1021
1022 #if REPORT_COMM_METRICS
1023     t2 = CmiWallTimer();
1024 #endif
1025
1026     CmiReleaseSentMessages();
1027 #if REPORT_COMM_METRICS
1028     t3 = CmiWallTimer();
1029 #endif
1030
1031     SendMsgBuf();
1032
1033 #if REPORT_COMM_METRICS
1034     t4 = CmiWallTimer();
1035     pumptime += (t2-t1);
1036     releasetime += (t3-t2);
1037     sendtime += (t4-t3);
1038 #endif
1039
1040 #else /* non-SMP case */
1041     CmiReleaseSentMessages();
1042
1043 #if REPORT_COMM_METRICS
1044     t2 = CmiWallTimer();
1045 #endif
1046     PumpMsgs();
1047
1048 #if REPORT_COMM_METRICS
1049     t3 = CmiWallTimer();
1050     pumptime += (t3-t2);
1051     releasetime += (t2-t1);
1052 #endif
1053
1054 #endif /* end of #if CMK_SMP */
1055 }
1056 /* ######End of functions related with communication progress ###### */
1057
1058 static void MachinePostNonLocalForMPI() {
1059 #if !CMK_SMP
1060     if (no_outstanding_sends) {
1061         while (CpvAccess(MsgQueueLen)>0) {
1062             AdvanceCommunicationForMPI(0);
1063         }
1064     }
1065
1066     /* FIXME: I don't think the following codes are needed because
1067      * it repeats the same job of the next call of CmiGetNonLocal
1068      */
1069 #if 0
1070     if (!msg) {
1071         CmiReleaseSentMessages();
1072         if (PumpMsgs())
1073             return  PCQueuePop(cs->recv);
1074         else
1075             return 0;
1076     }
1077 #endif
1078 #else
1079   if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1080         CmiReleaseSentMessages();       
1081         /* ??? SendMsgBuf is a not a thread-safe function. If it is put
1082          * here and this function will be called in CmiNotifyStillIdle,
1083          * then a data-race problem occurs */
1084         /*SendMsgBuf();*/
1085   }
1086 #endif
1087 }
1088
1089 /* Idle-state related functions: called in non-smp mode */
1090 void CmiNotifyIdleForMPI(void) {
1091     CmiReleaseSentMessages();
1092     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1093 }
1094
1095 /* Network progress function is used to poll the network when for
1096    messages. This flushes receive buffers on some  implementations*/
1097 #if CMK_MACHINE_PROGRESS_DEFINED
1098 void CmiMachineProgressImpl() {
1099 #if !CMK_SMP
1100     PumpMsgs();
1101 #if CMK_IMMEDIATE_MSG
1102     CmiHandleImmediate();
1103 #endif
1104 #else
1105     /*Not implemented yet. Communication server does not seem to be
1106       thread safe, so only communication thread call it */
1107     if (CmiMyRank() == CmiMyNodeSize())
1108         CommunicationServerThread(0);
1109 #endif
1110 }
1111 #endif
1112
1113 /* ######Beginning of functions related with exiting programs###### */
1114 void DrainResourcesForMPI() {
1115 #if !CMK_SMP
1116     while (!CmiAllAsyncMsgsSent()) {
1117         PumpMsgs();
1118         CmiReleaseSentMessages();
1119     }
1120 #else
1121     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
1122         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1123             CmiReleaseSentMessages();
1124             SendMsgBuf();
1125             PumpMsgs();
1126         }
1127     }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1128         while(!CmiAllAsyncMsgsSent()) {
1129             CmiReleaseSentMessages();
1130         }
1131     }
1132 #endif
1133 #if CMK_MEM_CHECKPOINT
1134     if (CmiMyPe() == 0) mpi_end_spare();
1135 #endif
1136     MACHSTATE(2, "Machine exit barrier begin {");
1137     START_EVENT();
1138     if (MPI_SUCCESS != MPI_Barrier(charmComm))
1139         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
1140     END_EVENT(10);
1141     MACHSTATE(2, "} Machine exit barrier end");
1142 }
1143
1144 void MachineExitForMPI() {
1145     int i;
1146 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1147     int doPrint = 0;
1148     if (CmiMyNode()==0) doPrint = 1;
1149
1150     if (doPrint /*|| CmiMyNode()%11==0 */) {
1151 #if MPI_POST_RECV
1152         CmiPrintf("node[%d]: %llu posted receives,  %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1153 #endif
1154     }
1155 #endif
1156
1157 #if MPI_POST_RECV
1158     {
1159         MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
1160         if (ptr) {
1161             do {
1162                 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
1163                 ptr = ptr->next;
1164             } while (ptr!=CpvAccess(postRecvListHdr));
1165         }
1166     }
1167 #endif
1168
1169 #if REPORT_COMM_METRICS
1170 #if CMK_SMP
1171     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
1172               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
1173               pumptime, releasetime, sendtime);
1174 #else
1175     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
1176               CmiMyPe(), pumptime, releasetime, sendtime);
1177 #endif
1178 #endif
1179
1180    if(!CharmLibInterOperate) {
1181 #if ! CMK_AUTOBUILD
1182       signal(SIGINT, signal_int);
1183       MPI_Finalize();
1184 #endif
1185       exit(0);
1186     }
1187 }
1188
1189 static int machine_exit_idx;
1190 static void machine_exit(char *m) {
1191     EmergencyExit();
1192     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1193     fflush(stdout);
1194     CmiNodeBarrier();
1195     if (CmiMyRank() == 0) {
1196         MPI_Barrier(charmComm);
1197         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1198         MPI_Abort(charmComm, 1);
1199     } else {
1200         while (1) CmiYield();
1201     }
1202 }
1203
1204 static void KillOnAllSigs(int sigNo) {
1205     static int already_in_signal_handler = 0;
1206     char *m;
1207     if (already_in_signal_handler) return;   /* MPI_Abort(charmComm,1); */
1208     already_in_signal_handler = 1;
1209 #if CMK_CCS_AVAILABLE
1210     if (CpvAccess(cmiArgDebugFlag)) {
1211         CpdNotify(CPD_SIGNAL, sigNo);
1212         CpdFreeze();
1213     }
1214 #endif
1215     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1216              "Signal: %d\n",CmiMyPe(),sigNo);
1217     CmiPrintStackTrace(1);
1218
1219     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1220     CmiSetHandler(m, machine_exit_idx);
1221     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1222     machine_exit(m);
1223 }
1224 /* ######End of functions related with exiting programs###### */
1225
1226
1227 /* ######Beginning of functions related with starting programs###### */
1228 static void registerMPITraceEvents() {
1229 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1230     traceRegisterUserEvent("MPI_Barrier", 10);
1231     traceRegisterUserEvent("MPI_Send", 20);
1232     traceRegisterUserEvent("MPI_Recv", 30);
1233     traceRegisterUserEvent("MPI_Isend", 40);
1234     traceRegisterUserEvent("MPI_Irecv", 50);
1235     traceRegisterUserEvent("MPI_Test[any]", 60);
1236     traceRegisterUserEvent("MPI_Iprobe", 70);
1237 #endif
1238 }
1239
1240 #if MACHINE_DEBUG_LOG
1241 FILE *debugLog = NULL;
1242 #endif
1243
1244 static char *thread_level_tostring(int thread_level) {
1245 #if CMK_MPI_INIT_THREAD
1246     switch (thread_level) {
1247     case MPI_THREAD_SINGLE:
1248         return "MPI_THREAD_SINGLE";
1249     case MPI_THREAD_FUNNELED:
1250         return "MPI_THREAD_FUNNELED";
1251     case MPI_THREAD_SERIALIZED:
1252         return "MPI_THREAD_SERIALIZED";
1253     case MPI_THREAD_MULTIPLE :
1254         return "MPI_THREAD_MULTIPLE";
1255     default: {
1256         char *str = (char*)malloc(5);
1257         sprintf(str,"%d", thread_level);
1258         return str;
1259     }
1260     }
1261     return  "unknown";
1262 #else
1263     char *str = (char*)malloc(5);
1264     sprintf(str,"%d", thread_level);
1265     return str;
1266 #endif
1267 }
1268
1269 /**
1270  *  Obtain the number of nodes, my node id, and consuming machine layer
1271  *  specific arguments
1272  */
1273 static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1274     int n,i;
1275     int ver, subver;
1276     int provided;
1277     int thread_level;
1278     int myNID;
1279     int largc=*argc;
1280     char** largv=*argv;
1281
1282 #if MACHINE_DEBUG
1283     debugLog=NULL;
1284 #endif
1285 #if CMK_USE_HP_MAIN_FIX
1286 #if FOR_CPLUS
1287     _main(largc,largv);
1288 #endif
1289 #endif
1290
1291     if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
1292 #if CMK_SMP
1293       Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
1294 #else
1295       CmiAbort("+comm_thread_only_recv option can only be used with SMP version of Charm++");
1296 #endif
1297     }
1298
1299     if(!CharmLibInterOperate) {
1300 #if CMK_MPI_INIT_THREAD
1301 #if CMK_SMP
1302     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
1303         thread_level = MPI_THREAD_FUNNELED;
1304       else
1305         thread_level = MPI_THREAD_MULTIPLE;
1306 #else
1307       thread_level = MPI_THREAD_SINGLE;
1308 #endif
1309       MPI_Init_thread(argc, argv, thread_level, &provided);
1310       _thread_provided = provided;
1311 #else
1312       MPI_Init(argc, argv);
1313       thread_level = 0;
1314       _thread_provided = -1;
1315 #endif
1316     }
1317
1318     largc = *argc;
1319     largv = *argv;
1320     if(!CharmLibInterOperate) {
1321                         MPI_Comm_dup(MPI_COMM_WORLD,&charmComm);
1322       MPI_Comm_size(charmComm, numNodes);
1323                         MPI_Comm_rank(charmComm, myNodeID);
1324     }
1325
1326     myNID = *myNodeID;
1327
1328     MPI_Get_version(&ver, &subver);
1329     if(!CharmLibInterOperate) {
1330       if (myNID == 0) {
1331         printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
1332         printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
1333       }
1334     }
1335
1336 #if CMK_SMP
1337     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
1338         Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV; 
1339         if (myNID == 0) {
1340           printf("Charm++> +comm_thread_only_recv disabled\n");
1341         }
1342     }
1343 #endif
1344
1345     {
1346         int debug = CmiGetArgFlag(largv,"++debug");
1347         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1348         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1349 #if CMK_HAS_GETPID
1350             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1351             fflush(stdout);
1352             if (!debug_no_pause)
1353                 sleep(15);
1354 #else
1355             printf("++debug ignored.\n");
1356 #endif
1357         }
1358     }
1359
1360
1361 #if CMK_MEM_CHECKPOINT
1362     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1363        CmiAssert(num_workpes <= *numNodes);
1364        total_pes = *numNodes;
1365        *numNodes = num_workpes;
1366     }
1367     else
1368        total_pes = num_workpes = *numNodes;
1369     if (*myNodeID == 0)
1370        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1371     petorank = (int *)malloc(sizeof(int) * num_workpes);
1372     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1373     nextrank = num_workpes;
1374
1375     if (*myNodeID >= num_workpes) {    /* is spare processor */
1376       MPI_Status sts;
1377       int vals[2];
1378       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
1379       int newpe = vals[0];
1380       CpvAccess(_curRestartPhase) = vals[1];
1381
1382       if (newpe == -1) {
1383           MPI_Barrier(charmComm);
1384           MPI_Finalize();
1385           exit(0);
1386       }
1387
1388       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1389         /* update petorank */
1390       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
1391       nextrank = *myNodeID + 1;
1392       *myNodeID = newpe;
1393       myNID = newpe;
1394
1395        /* add +restartaftercrash to argv */
1396       char *phase_str;
1397       char **restart_argv;
1398       int i=0;
1399       while(largv[i]!= NULL) i++;
1400       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1401       i=0;
1402       while(largv[i]!= NULL){
1403                 restart_argv[i] = largv[i];
1404                 i++;
1405       }
1406       restart_argv[i] = "+restartaftercrash";
1407       phase_str = (char*)malloc(10);
1408       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1409       restart_argv[i+1]=phase_str;
1410       restart_argv[i+2]=NULL;
1411       *argv = restart_argv;
1412       *argc = i+2;
1413       largc = *argc;
1414       largv = *argv;
1415     }
1416 #endif
1417
1418     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1419     if (idleblock && _Cmi_mynode == 0) {
1420         printf("Charm++: Running in idle blocking mode.\n");
1421     }
1422
1423 #if CMK_CHARMDEBUG
1424     /* setup signal handlers */
1425     signal(SIGSEGV, KillOnAllSigs);
1426     signal(SIGFPE, KillOnAllSigs);
1427     signal(SIGILL, KillOnAllSigs);
1428     signal_int = signal(SIGINT, KillOnAllSigs);
1429     signal(SIGTERM, KillOnAllSigs);
1430     signal(SIGABRT, KillOnAllSigs);
1431 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1432     signal(SIGQUIT, KillOnAllSigs);
1433     signal(SIGBUS, KillOnAllSigs);
1434 #   endif /*UNIX*/
1435 #endif
1436
1437 #if CMK_NO_OUTSTANDING_SENDS
1438     no_outstanding_sends=1;
1439 #endif
1440     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1441         no_outstanding_sends = 1;
1442         if (myNID == 0)
1443             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1444                    no_outstanding_sends?"":" not");
1445     }
1446
1447     request_max=MAX_QLEN;
1448     CmiGetArgInt(largv,"+requestmax",&request_max);
1449     /*printf("request max=%d\n", request_max);*/
1450
1451 #if MPI_POST_RECV
1452     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1453     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1454     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1455     CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
1456     CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
1457     CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
1458     CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
1459     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1460     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1461     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1462     if (myNID==0) {
1463         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
1464                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
1465                MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
1466     }
1467 #endif
1468
1469 #if CMI_DYNAMIC_EXERT_CAP
1470     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1471     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1472     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1473     if (myNID==0) {
1474         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1475                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1476     }
1477 #endif
1478
1479 #if USE_ASYNC_RECV_FUNC
1480     CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
1481     if(myNID==0) {
1482         printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
1483     }
1484 #endif
1485
1486     /* checksum flag */
1487     if (CmiGetArgFlag(largv,"+checksum")) {
1488 #if CMK_ERROR_CHECKING
1489         checksum_flag = 1;
1490         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1491 #else
1492         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1493 #endif
1494     }
1495
1496     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1497     for (i=0; i<_Cmi_mynodesize+1; i++) {
1498 #if MULTI_SENDQUEUE
1499         procState[i].sendMsgBuf = PCQueueCreate();
1500 #endif
1501         procState[i].recvLock = CmiCreateLock();
1502     }
1503 #if CMK_SMP
1504 #if !MULTI_SENDQUEUE
1505     sendMsgBuf = PCQueueCreate();
1506     sendMsgBufLock = CmiCreateLock();
1507 #endif
1508 #endif
1509 }
1510
1511 static void MachinePreCommonInitForMPI(int everReturn) {
1512
1513 #if MPI_POST_RECV
1514     int doInit = 1;
1515     int i;
1516
1517 #if CMK_SMP
1518     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1519 #endif
1520
1521     /* Currently, in mpi smp, the main thread will be the comm thread, so
1522      *  only the comm thread should post recvs. Cpvs, however, need to be
1523      * created on rank 0 (the ptrs to the actual cpv memory), while
1524      * other ranks are busy waiting for this to finish. So cpv initialize
1525      * routines have to be called on every ranks, although they are only
1526      * useful on comm thread (whose rank is not zero) -Chao Mei
1527      */
1528     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1529     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1530     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1531     CpvInitialize(char **, CmiPostedRecvBuffers);
1532
1533     CpvAccess(CmiPostedRecvRequests) = NULL;
1534     CpvAccess(CmiPostedRecvBuffers) = NULL;
1535
1536     CpvInitialize(MPIPostRecvList *, postRecvListHdr);
1537     CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
1538     CpvInitialize(int, msgRecvCnt);
1539
1540     CpvAccess(postRecvListHdr) = NULL;
1541     CpvAccess(curPostRecvPtr) = NULL;
1542     CpvAccess(msgRecvCnt) = 0;
1543
1544 #if MPI_DYNAMIC_POST_RECV
1545     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1546 #endif
1547
1548     if (doInit) {
1549 #if MPI_DYNAMIC_POST_RECV
1550         MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
1551         /* including two more buckets that are out of the range [LOWERSIZE, UPPERSIZE] */
1552         MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
1553         CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1554         memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1555 #else
1556         /* Post some extra recvs to help out with incoming messages */
1557         /* On some MPIs the messages are unexpected and thus slow */
1558
1559         CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
1560
1561         /* An array of request handles for posted recvs */
1562         CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
1563         CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
1564         CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1565         /* An array of buffers for posted recvs */
1566         CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
1567         CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
1568         CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
1569
1570         /* Post Recvs */
1571         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1572             char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE); /* Note: could be aligned allocation?? */
1573             CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
1574             if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
1575                                          MPI_POST_RECV_SIZE,
1576                                          MPI_BYTE,
1577                                          MPI_ANY_SOURCE,
1578                                          POST_RECV_TAG,
1579                                          charmComm,
1580                                          CpvAccess(postRecvListHdr)->postedRecvReqs+i  ))
1581                 CmiAbort("MPI_Irecv failed\n");
1582         }
1583 #endif
1584     }
1585 #endif /* end of MPI_POST_RECV */
1586
1587 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
1588     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1589     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1590     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1591 #endif
1592
1593 #if USE_ASYNC_RECV_FUNC
1594 #if CMK_SMP
1595     /* allocate the guardian entry only on comm thread considering NUMA */
1596     if(CmiMyRank() == CmiMyNodeSize()) {
1597         waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1598         waitIrecvListHead->next = NULL;
1599     }
1600 #else    
1601     waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1602     waitIrecvListHead->next = NULL;
1603 #endif
1604 #endif
1605 }
1606
1607 static void MachinePostCommonInitForMPI(int everReturn) {
1608
1609     CmiIdleState *s=CmiNotifyGetState();
1610
1611     CpvInitialize(SMSG_LIST *, sent_msgs);
1612     CpvInitialize(SMSG_LIST *, end_sent);
1613     CpvInitialize(int, MsgQueueLen);
1614     CpvAccess(sent_msgs) = NULL;
1615     CpvAccess(end_sent) = NULL;
1616     CpvAccess(MsgQueueLen) = 0;
1617
1618     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1619
1620 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1621     CpvInitialize(double, projTraceStart);
1622     /* only PE 0 needs to care about registration (to generate sts file). */
1623     if (CmiMyPe() == 0) {
1624         registerMachineUserEventsFunction(&registerMPITraceEvents);
1625     }
1626 #endif
1627
1628 #if CMK_SMP
1629     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1630     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1631     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV)
1632       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1633 #else
1634     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1635 #endif
1636
1637 #if MACHINE_DEBUG_LOG
1638     if (CmiMyRank() == 0) {
1639         char ln[200];
1640         sprintf(ln,"debugLog.%d",CmiMyNode());
1641         debugLog=fopen(ln,"w");
1642     }
1643 #endif
1644 }
1645 /* ######End of functions related with starting programs###### */
1646
1647 /***********************************************************************
1648  *
1649  * Abort function:
1650  *
1651  ************************************************************************/
1652
1653 void LrtsAbort(const char *message) {
1654     char *m;
1655     /* if CharmDebug is attached simply try to send a message to it */
1656 #if CMK_CCS_AVAILABLE
1657     if (CpvAccess(cmiArgDebugFlag)) {
1658         CpdNotify(CPD_ABORT, message);
1659         CpdFreeze();
1660     }
1661 #endif
1662     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1663              "Reason: %s\n",CmiMyPe(),message);
1664     /*  CmiError(message); */
1665     CmiPrintStackTrace(0);
1666     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1667     CmiSetHandler(m, machine_exit_idx);
1668     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1669     machine_exit(m);
1670     /* Program never reaches here */
1671     MPI_Abort(charmComm, 1);
1672 }
1673
1674 /**************************  TIMER FUNCTIONS **************************/
1675 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1676
1677 /* MPI calls are not threadsafe, even the timer on some machines */
1678 static CmiNodeLock  timerLock = 0;
1679                                 static int _absoluteTime = 0;
1680                                                            static double starttimer = 0;
1681                                                                                       static int _is_global = 0;
1682
1683 int CmiTimerIsSynchronized() {
1684     int  flag;
1685     void *v;
1686
1687     /*  check if it using synchronized timer */
1688     if (MPI_SUCCESS != MPI_Attr_get(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
1689         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1690     if (flag) {
1691         _is_global = *(int*)v;
1692         if (_is_global && CmiMyPe() == 0)
1693             printf("Charm++> MPI timer is synchronized\n");
1694     }
1695     return _is_global;
1696 }
1697
1698 int CmiTimerAbsolute() {
1699     return _absoluteTime;
1700 }
1701
1702 double CmiStartTimer() {
1703     return 0.0;
1704 }
1705
1706 double CmiInitTime() {
1707     return starttimer;
1708 }
1709
1710 void CmiTimerInit(char **argv) {
1711     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1712     if (_absoluteTime && CmiMyPe() == 0)
1713         printf("Charm++> absolute MPI timer is used\n");
1714
1715 #if ! CMK_MEM_CHECKPOINT
1716     _is_global = CmiTimerIsSynchronized();
1717 #else
1718     _is_global = 0;
1719 #endif
1720
1721     if (_is_global) {
1722         if (CmiMyRank() == 0) {
1723             double minTimer;
1724 #if CMK_TIMER_USE_XT3_DCLOCK
1725             starttimer = dclock();
1726 #else
1727             starttimer = MPI_Wtime();
1728 #endif
1729
1730             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1731                           charmComm );
1732             starttimer = minTimer;
1733         }
1734     } else { /* we don't have a synchronous timer, set our own start time */
1735 #if ! CMK_MEM_CHECKPOINT
1736         CmiBarrier();
1737         CmiBarrier();
1738         CmiBarrier();
1739 #endif
1740 #if CMK_TIMER_USE_XT3_DCLOCK
1741         starttimer = dclock();
1742 #else
1743         starttimer = MPI_Wtime();
1744 #endif
1745     }
1746
1747 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1748     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1749         timerLock = CmiCreateLock();
1750 #endif
1751     CmiNodeAllBarrier();          /* for smp */
1752 }
1753
1754 /**
1755  * Since the timerLock is never created, and is
1756  * always NULL, then all the if-condition inside
1757  * the timer functions could be disabled right
1758  * now in the case of SMP. --Chao Mei
1759  */
1760 double CmiTimer(void) {
1761     double t;
1762 #if 0 && CMK_SMP
1763     if (timerLock) CmiLock(timerLock);
1764 #endif
1765
1766 #if CMK_TIMER_USE_XT3_DCLOCK
1767     t = dclock();
1768 #else
1769     t = MPI_Wtime();
1770 #endif
1771
1772 #if 0 && CMK_SMP
1773     if (timerLock) CmiUnlock(timerLock);
1774 #endif
1775
1776     return _absoluteTime?t: (t-starttimer);
1777 }
1778
1779 double CmiWallTimer(void) {
1780     double t;
1781 #if 0 && CMK_SMP
1782     if (timerLock) CmiLock(timerLock);
1783 #endif
1784
1785 #if CMK_TIMER_USE_XT3_DCLOCK
1786     t = dclock();
1787 #else
1788     t = MPI_Wtime();
1789 #endif
1790
1791 #if 0 && CMK_SMP
1792     if (timerLock) CmiUnlock(timerLock);
1793 #endif
1794
1795     return _absoluteTime? t: (t-starttimer);
1796 }
1797
1798 double CmiCpuTimer(void) {
1799     double t;
1800 #if 0 && CMK_SMP
1801     if (timerLock) CmiLock(timerLock);
1802 #endif
1803 #if CMK_TIMER_USE_XT3_DCLOCK
1804     t = dclock() - starttimer;
1805 #else
1806     t = MPI_Wtime() - starttimer;
1807 #endif
1808 #if 0 && CMK_SMP
1809     if (timerLock) CmiUnlock(timerLock);
1810 #endif
1811     return t;
1812 }
1813
1814 #endif     /* CMK_TIMER_USE_SPECIAL */
1815
1816 /************Barrier Related Functions****************/
1817 /* must be called on all ranks including comm thread in SMP */
1818 int CmiBarrier() {
1819 #if CMK_SMP
1820     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1821     CmiNodeAllBarrier();
1822     if (CmiMyRank() == CmiMyNodeSize())
1823 #else
1824     if (CmiMyRank() == 0)
1825 #endif
1826     {
1827         /**
1828          *  The call of CmiBarrier is usually before the initialization
1829          *  of trace module of Charm++, therefore, the START_EVENT
1830          *  and END_EVENT are disabled here. -Chao Mei
1831          */
1832         /*START_EVENT();*/
1833
1834         if (MPI_SUCCESS != MPI_Barrier(charmComm))
1835             CmiAbort("Timernit: MPI_Barrier failed!\n");
1836
1837         /*END_EVENT(10);*/
1838     }
1839     CmiNodeAllBarrier();
1840     return 0;
1841 }
1842
1843 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1844 int CmiBarrierZero() {
1845     int i;
1846 #if CMK_SMP
1847     if (CmiMyRank() == CmiMyNodeSize())
1848 #else
1849     if (CmiMyRank() == 0)
1850 #endif
1851     {
1852         char msg[1];
1853         MPI_Status sts;
1854         if (CmiMyNode() == 0)  {
1855             for (i=0; i<CmiNumNodes()-1; i++) {
1856                 START_EVENT();
1857
1858                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
1859                     CmiPrintf("MPI_Recv failed!\n");
1860
1861                 END_EVENT(30);
1862             }
1863         } else {
1864             START_EVENT();
1865
1866             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,charmComm))
1867                 printf("MPI_Send failed!\n");
1868
1869             END_EVENT(20);
1870         }
1871     }
1872     CmiNodeAllBarrier();
1873     return 0;
1874 }
1875
1876
1877 #if CMK_MEM_CHECKPOINT
1878
1879 void mpi_restart_crashed(int pe, int rank)
1880 {
1881     int vals[2];
1882     vals[0] = pe;
1883     vals[1] = CpvAccess(_curRestartPhase)+1;
1884     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
1885     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
1886 }
1887
1888 /* notify spare processors to exit */
1889 void mpi_end_spare()
1890 {
1891     int i;
1892     for (i=nextrank; i<total_pes; i++) {
1893         int vals[2] = {-1,-1};
1894         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
1895     }
1896 }
1897
1898 int find_spare_mpirank(int pe)
1899 {
1900     if (nextrank == total_pes) {
1901       CmiAbort("Charm++> No spare processor available.");
1902     }
1903     petorank[pe] = nextrank;
1904     nextrank++;
1905     return nextrank-1;
1906 }
1907
1908 void CkDieNow()
1909 {
1910     CmiPrintf("[%d] die now.\n", CmiMyPe());
1911
1912       /* release old messages */
1913     while (!CmiAllAsyncMsgsSent()) {
1914         PumpMsgs();
1915         CmiReleaseSentMessages();
1916     }
1917     MPI_Barrier(charmComm);
1918     MPI_Finalize();
1919     exit(0);
1920 }
1921
1922 #endif
1923
1924 /*======Beginning of Msg Histogram or Dynamic Post-Recv Related Funcs=====*/
1925 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
1926 /* Functions related with capturing msg histogram */
1927
1928 #if MPI_DYNAMIC_POST_RECV
1929 /* Consume all messages in the request buffers */
1930 static void consumeAllMsgs()
1931 {
1932     MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
1933     if (ptr) {
1934         do {
1935             int i;
1936             for (i=0; i<ptr->bufCnt; i++) {
1937                 int done = 0;
1938                 MPI_Status sts;
1939
1940                 /* Indicating this entry has been tested before */
1941                 if (ptr->postedRecvBufs[i] == NULL) continue;
1942
1943                 START_TRACE_RECVCOMM(NULL);
1944                 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
1945                     CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
1946                 if (done) {
1947                     int nbytes;
1948                     char *msg;                    
1949                     
1950                     if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
1951                         CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
1952                     /* ready to handle this msg */
1953                     msg = (ptr->postedRecvBufs)[i];
1954                     (ptr->postedRecvBufs)[i] = NULL;
1955                     
1956                     END_TRACE_RECVCOMM(msg);
1957                     handleOneRecvedMsg(nbytes, msg);
1958                 } else {
1959                     if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
1960                         CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
1961                 }
1962             }
1963             ptr = ptr->next;
1964         } while (ptr != CpvAccess(curPostRecvPtr));
1965     }
1966 }
1967
1968 static void recordMsgHistogramInfo(int size)
1969 {
1970     int idx = 0;
1971     size -= MPI_POST_RECV_LOWERSIZE;
1972     if (size > 0)
1973         idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
1974
1975     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
1976     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
1977 }
1978
1979 #define POST_RECV_USE_STATIC_PARAM 0
1980 #define POST_RECV_REPORT_STS 0
1981
1982 #if POST_RECV_REPORT_STS
1983 static int buildDynCallCnt = 0;
1984 #endif
1985
1986 static void buildDynamicRecvBuffers()
1987 {
1988     int i;
1989
1990     int local_MSG_CNT_THRESHOLD;
1991     int local_MSG_INC;
1992
1993 #if POST_RECV_REPORT_STS
1994     buildDynCallCnt++;
1995 #endif
1996
1997     /* For debugging usage */
1998     reportMsgHistogramInfo();
1999
2000     CpvAccess(msgRecvCnt) = 0;
2001     /* consume all outstanding msgs */
2002     consumeAllMsgs();
2003
2004 #if POST_RECV_USE_STATIC_PARAM
2005     local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
2006     local_MSG_INC = MPI_POST_RECV_MSG_INC;
2007 #else
2008     {
2009         int total = 0;
2010         int count = 0;
2011         for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2012             int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2013             /* avg is temporarily used for counting how many buckets are non-zero */
2014             if (tmp > 0)  {
2015                 total += tmp;
2016                 count++;
2017             }
2018         }
2019         if (count == 1) local_MSG_CNT_THRESHOLD = 1; /* Just filter out those zero-count msgs */
2020         else local_MSG_CNT_THRESHOLD = total / count /3; /* Catch >50% msgs NEED-BETTER-SCHEME HERE!!*/
2021         local_MSG_INC = total/count; /* Not having a good heuristic right now */
2022 #if POST_RECV_REPORT_STS
2023         printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
2024 #endif
2025     }
2026 #endif
2027
2028     /* First continue to find the first msg range that requires post recv */
2029     /* Ignore the fist and the last one because they are not tracked */
2030     MPIPostRecvList *newHdr = NULL;
2031     MPIPostRecvList *newListPtr = newHdr;
2032     MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
2033     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2034         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2035         if (count >= local_MSG_CNT_THRESHOLD) {
2036
2037 #if POST_RECV_REPORT_STS
2038             /* Report histogram results */
2039             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2040             int high = low + MSG_HISTOGRAM_BINSIZE;
2041             int reportCnt;
2042             if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
2043             else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2044             printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
2045 #endif
2046             /* find if this msg idx exists, the "i" is the msgSizeIdx, in the current list */
2047             int notFound = 1;
2048             MPIPostRecvList *newEntry = NULL;
2049             while (ptr) {
2050                 if (ptr->msgSizeIdx < i) {
2051                     /* free the buffer for this range of msg size */
2052                     MPIPostRecvList *nextptr = ptr->next;
2053
2054                     free(ptr->postedRecvReqs);
2055                     int j;
2056                     for (j=0; j<ptr->bufCnt; j++) {
2057                         if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2058                     }
2059                     free(ptr->postedRecvBufs);
2060                     ptr = nextptr;
2061                 } else if (ptr->msgSizeIdx == i) {
2062                     int newBufCnt, j;
2063                     int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2064                     newEntry = ptr;
2065                     /* Do some adjustment according to the current statistics */
2066                     if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
2067                     else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2068                     if (newBufCnt != ptr->bufCnt) {
2069                         /* free old buffers, and allocate new buffers */
2070                         free(ptr->postedRecvReqs);
2071                         ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
2072                         for (j=0; j<ptr->bufCnt; j++) {
2073                             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2074                         }
2075                         free(ptr->postedRecvBufs);
2076                         ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
2077                     }
2078
2079                     /* re-post those buffers */
2080                     ptr->bufCnt = newBufCnt;
2081                     for (j=0; j<ptr->bufCnt; j++) {
2082                         ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2083                         if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
2084                                                      MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
2085                                                      charmComm, ptr->postedRecvReqs+j))
2086                             CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2087                     }
2088
2089                     /* We already posted bufs for this range of msg size */
2090                     ptr = ptr->next;
2091                     /* Need to set ptr to NULL as the buf list comes to an end and the while loop exits */
2092                     if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
2093                     notFound = 0;
2094                     break;
2095                 } else {
2096                     /* The msgSizeIdx is larger than i */
2097                     break;
2098                 }
2099                 if (ptr == CpvAccess(postRecvListHdr)) {
2100                     ptr = NULL;
2101                     break;
2102                 }
2103             } /* end while(ptr): iterating the posted recv buffer list */
2104
2105             if (notFound) {
2106                 /* the current range of msg size is not found in the list */
2107                 int j;
2108                 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2109                 newEntry = malloc(sizeof(MPIPostRecvList));
2110                 MPIPostRecvList *one = newEntry;
2111                 one->msgSizeIdx = i;
2112                 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
2113                 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2114                 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
2115                 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
2116                 for (j=0; j<one->bufCnt; j++) {
2117                     one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2118                     if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
2119                                                  MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
2120                                                  charmComm, one->postedRecvReqs+j))
2121                         CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2122                 }
2123             } /* end if notFound */
2124
2125             /* Update the new list with the newEntry */
2126             CmiAssert(newEntry != NULL);
2127             if (newHdr == NULL) {
2128                 newHdr = newEntry;
2129                 newListPtr = newEntry;
2130                 newHdr->next = newHdr;
2131             } else {
2132                 newListPtr->next = newEntry;
2133                 newListPtr = newEntry;
2134                 newListPtr->next = newHdr;
2135             }
2136         } /* end if the count of this msg size range exceeds the threshold */
2137     } /* end for loop over the histogram buckets */
2138
2139     /* Free remaining entries in the list */
2140     while (ptr) {
2141         /* free the buffer for this range of msg size */
2142         MPIPostRecvList *nextptr = ptr->next;
2143
2144         free(ptr->postedRecvReqs);
2145         int j;
2146         for (j=0; j<ptr->bufCnt; j++) {
2147             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2148         }
2149         free(ptr->postedRecvBufs);
2150         ptr = nextptr;
2151         if (ptr == CpvAccess(postRecvListHdr)) break;
2152     }
2153
2154     CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
2155     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
2156 } /* end of function buildDynamicRecvBuffers */
2157
2158 static void examineMsgHistogramInfo(int size)
2159 {
2160     int total = CpvAccess(msgRecvCnt)++;
2161     if (total < MPI_POST_RECV_FREQ) {
2162         recordMsgHistogramInfo(size);
2163     } else {
2164         buildDynamicRecvBuffers();
2165     }
2166 }
2167 #else
2168 /* case when CAPTURE_MSG_HISTOGRAM is defined */
2169 static void recordMsgHistogramInfo(int size)
2170 {
2171     int idx = size/MSG_HISTOGRAM_BINSIZE;
2172     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
2173     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
2174 }
2175 #endif /* end of MPI_DYNAMIC_POST_RECV */
2176
2177 static void reportMsgHistogramInfo()
2178 {
2179 #if MPI_DYNAMIC_POST_RECV
2180     int i, count;
2181     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
2182     if (count > 0) {
2183         printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
2184     }
2185     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2186         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2187         if (count > 0) {
2188             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2189             int high = low + MSG_HISTOGRAM_BINSIZE;
2190             printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
2191         }
2192     }
2193     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
2194     if (count > 0) {
2195         printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
2196     }
2197 #else
2198     int i;
2199     for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
2200         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2201         if (count > 0) {
2202             int low = i*MSG_HISTOGRAM_BINSIZE;
2203             int high = low + MSG_HISTOGRAM_BINSIZE;
2204             printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
2205         }
2206     }
2207 #endif
2208 }
2209 #endif /* end of CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV */
2210
2211 void CmiSetupMachineRecvBuffersUser()
2212 {
2213 #if MPI_DYNAMIC_POST_RECV
2214     buildDynamicRecvBuffers();
2215 #endif
2216 }
2217 /*=======End of Msg Histogram or Dynamic Post-Recv Related Funcs======*/
2218
2219
2220 /*@}*/
2221