216bf9de7ad9d9d49363d810fe8ac9f8fcdffa9e
[charm.git] / src / arch / mpi / machine.C
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12
13 #ifdef AMPI
14 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
15 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
16 #  error "Can't build Charm++ using AMPI version of mpi.h header"
17 #endif
18
19 /*Support for ++debug: */
20 #if defined(_WIN32)
21 #ifndef WIN32_LEAN_AND_MEAN
22 #define WIN32_LEAN_AND_MEAN
23 #endif
24 #include <windows.h>
25 #include <wincon.h>
26 #include <sys/types.h>
27 #include <sys/timeb.h>
28 static char* strsignal(int sig) {
29   static char outbuf[32];
30   sprintf(outbuf, "%d", sig);
31   return outbuf;
32 }
33 #include <process.h>
34 #define getpid _getpid
35 #else
36 #include <unistd.h> /*For getpid()*/
37 #endif
38 #include <stdlib.h> /*For sleep()*/
39
40 #include "machine.h"
41 #include "pcqueue.h"
42
43 /* Msg types to have different actions taken for different message types
44  * REGULAR                     - Regular Charm++ message
45  * ONESIDED_BUFFER_SEND        - Nocopy Entry Method API Send buffer
46  * ONESIDED_BUFFER_RECV        - Nocopy Entry Method API Recv buffer
47  * ONESIDED_BUFFER_DIRECT_RECV - Nocopy Direct API Recv buffer
48  * ONESIDED_BUFFER_DIRECT_SEND - Nocopy Direct API Send buffer
49  * POST_DIRECT_RECV            - Metadata message with Direct Recv buffer information
50  * POST_DIRECT_SEND            - Metadata message with Direct Send buffer information
51  * */
52
53 #define CMI_MSGTYPE(msg)            ((CmiMsgHeaderBasic *)msg)->msgType
54 enum mpiMsgTypes { REGULAR, ONESIDED_BUFFER_SEND, ONESIDED_BUFFER_RECV, ONESIDED_BUFFER_DIRECT_RECV, ONESIDED_BUFFER_DIRECT_SEND, POST_DIRECT_RECV, POST_DIRECT_SEND};
55
56 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
57 /* Whether to use multiple send queue in SMP mode */
58 #define MULTI_SENDQUEUE    0
59
60 /* ###Beginning of flow control related macros ### */
61 #define CMI_EXERT_SEND_CAP 0
62 #define CMI_EXERT_RECV_CAP 0
63
64 #define CMI_DYNAMIC_EXERT_CAP 0
65 /* This macro defines the max number of msgs in the sender msg buffer
66  * that is allowed for recving operation to continue
67  */
68 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
69 #define CMI_DYNAMIC_MAXCAPSIZE 1000
70 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
71 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
72 /* initial values, -1 indiates there's no cap */
73 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
74 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
75 MPI_Comm charmComm;
76 int tagUb;
77
78 #if CMI_EXERT_SEND_CAP
79 static int SEND_CAP=3;
80 #endif
81
82 #if CMI_EXERT_RECV_CAP
83 static int RECV_CAP=2;
84 #endif
85 /* ###End of flow control related macros ### */
86
87 /* ###Beginning of machine-layer-tracing related macros ### */
88 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
89 #define CMI_MPI_TRACE_MOREDETAILED 0
90 #undef CMI_MACH_TRACE_USEREVENTS
91 #define CMI_MACH_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_SMP_TRACE_COMMTHREAD
94 #define CMK_SMP_TRACE_COMMTHREAD 0
95 #endif
96
97 #define CMK_TRACE_COMMOVERHEAD 0
98 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
99 #undef CMI_MACH_TRACE_USEREVENTS
100 #define CMI_MACH_TRACE_USEREVENTS 1
101 #else
102 #undef CMK_TRACE_COMMOVERHEAD
103 #define CMK_TRACE_COMMOVERHEAD 0
104 #endif
105
106 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 #define  START_EVENT()
112 #define  END_EVENT(x)
113 #endif
114
115 #if CMK_SMP_TRACE_COMMTHREAD
116 #define START_TRACE_SENDCOMM(msg)  \
117                         int isTraceEligible = traceBeginCommOp(msg); \
118                         if(isTraceEligible) traceSendMsgComm(msg);
119 #define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
120
121 #define CONDITIONAL_TRACE_USER_EVENT(x) \
122                         do{ \
123                             double etime = CmiWallTimer(); \
124                             if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
125                                 traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
126                             }\
127                         }while(0);
128 #else
129 #define START_TRACE_SENDCOMM(msg)
130 #define END_TRACE_SENDCOMM(msg)
131 #define CONDITIONAL_TRACE_USER_EVENT(x)
132 #endif
133
134 /* ###End of machine-layer-tracing related macros ### */
135
136 /* ###Beginning of POST_RECV related macros ### */
137 /*
138  * If MPI_POST_RECV is defined, we provide default values for
139  * size and number of posted recieves. If MPI_POST_RECV_COUNT
140  * is set then a default value for MPI_POST_RECV_SIZE is used
141  * if not specified by the user.
142  */
143 #define MPI_POST_RECV 0
144
145 /* Making those parameters configurable for testing them easily */
146
147 #if MPI_POST_RECV
148 #define MPI_DYNAMIC_POST_RECV 0
149
150 /* Note the tag offset of a msg is determined by
151  * (its size - MPI_RECV_LOWERSIZE)/MPI_POST_RECV_INC.
152  * based on POST_RECV_TAG.
153  */
154 static int MPI_POST_RECV_COUNT=10;
155
156 /* The range of msgs to be tracked for histogramming */
157 static int MPI_POST_RECV_LOWERSIZE=8000;
158 static int MPI_POST_RECV_UPPERSIZE=64000;
159
160 /* The increment of msg size to be tracked, i.e. the histogram bucket size */
161 static int MPI_POST_RECV_INC = 1000;
162
163 /* The unit increment of msg cnt for increase #buf for a post recved msg */
164 static int MPI_POST_RECV_MSG_INC = 400;
165
166 /* If the #msg exceeds this value, post recv is created for such msg */
167 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
168
169 /* The frequency of checking the existing posted recv buffers in the unit of #msgs */
170 static int MPI_POST_RECV_FREQ = 1000;
171
172 static int MPI_POST_RECV_SIZE;
173
174 typedef struct mpiPostRecvList {
175     /* POST_RECV_TAG + msgSizeIdx is the recv tag;
176      * Based on this value, this buf corresponds to msg size ranging
177      * [msgSizeIdx*MPI_POST_RECV_INC, (msgSizeIdx+1)*MPI_POST_RECV_INC)
178      */
179     int msgSizeIdx;
180     int bufCnt;
181     MPI_Request *postedRecvReqs;
182     char **postedRecvBufs;
183     struct mpiPostRecvList *next;
184 } MPIPostRecvList;
185 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
186 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
187 CpvDeclare(int, msgRecvCnt);
188
189 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
190 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
191 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
192 CpvDeclare(char**,CmiPostedRecvBuffers);
193
194 /* Note: currently MPI doesn't provide a function whether a request is in progress.
195  * For example, a irecv has been filled partially. Then a call to MPI_Test still returns
196  * indicating it has not been finished. If only relying on this result, then calling
197  * MPI_Cancel will result in a loss of this msg. The dynamic post recv mechanism
198  * can only be safely used in a synchronized point such as load balancing.
199  */
200 #if MPI_DYNAMIC_POST_RECV
201 static int MSG_HISTOGRAM_BINSIZE;
202 static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
203 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
204 static void recordMsgHistogramInfo(int size);
205 static void reportMsgHistogramInfo(void);
206 #endif /* end of MPI_DYNAMIC_POST_RECV defined */
207
208 #endif /* end of MPI_POST_RECV defined */
209
210 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
211 #define TAG     1375
212 #if MPI_POST_RECV
213 #define POST_RECV_TAG       (TAG+1)
214 #define BARRIER_ZERO_TAG  TAG
215 #else
216 #define BARRIER_ZERO_TAG   (TAG-1)
217 #endif
218
219 #define USE_MPI_CTRLMSG_SCHEME 0
220
221 /* Defining this macro will use MPI_Irecv instead of MPI_Recv for
222  * large messages. This could save synchronization overhead caused by
223  * the rzv protocol used by MPI
224  */
225 #define USE_ASYNC_RECV_FUNC 0
226
227 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
228 static int IRECV_MSG_THRESHOLD = 8000;
229 typedef struct IRecvListEntry{
230     MPI_Request req;
231     char *msg;
232     int size;
233     struct IRecvListEntry *next;
234 }*IRecvList;
235
236 static IRecvList freedIrecvList = NULL; /* used to recycle the entries */
237 static IRecvList waitIrecvListHead = NULL; /* points to the guardian entry, i.e., the next of it points to the first entry */
238 static IRecvList waitIrecvListTail = NULL; /* points to the last entry */
239
240 static IRecvList irecvListEntryAllocate(void){
241     IRecvList ret;
242     if(freedIrecvList == NULL) {
243         ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));        
244         return ret;
245     } else {
246         ret = freedIrecvList;
247         freedIrecvList = freedIrecvList->next;
248         return ret;
249     }
250 }
251 static void irecvListEntryFree(IRecvList used){
252     used->next = freedIrecvList;
253     freedIrecvList = used;
254 }
255
256 #endif /* end of USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME */
257
258 /* Providing functions for external usage to set up the dynamic recv buffer
259  * when the user is aware that it's safe to call such function
260  */
261 void CmiSetupMachineRecvBuffers(void);
262
263 #define CAPTURE_MSG_HISTOGRAM 0
264 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
265 static int MSG_HISTOGRAM_BINSIZE=1000;
266 static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
267 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
268 static void recordMsgHistogramInfo(int size);
269 static void reportMsgHistogramInfo(void);
270 #endif
271
272 /* ###End of POST_RECV related related macros ### */
273
274
275 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
276 #define MAX_QLEN 200
277
278 /* =======End of Definitions of Performance-Specific Macros =======*/
279
280
281 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
282 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
283 #define CHARM_MAGIC_NUMBER               126
284
285 #if CMK_ERROR_CHECKING
286 CMI_EXTERNC
287 unsigned char computeCheckSum(unsigned char *data, int len);
288 static int checksum_flag = 0;
289 #define CMI_SET_CHECKSUM(msg, len)      \
290         if (checksum_flag)  {   \
291           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
292           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
293         }
294 #define CMI_CHECK_CHECKSUM(msg, len)    \
295         if (checksum_flag)      \
296           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
297             CmiAbort("Fatal error: checksum doesn't agree!\n");
298 #else
299 #define CMI_SET_CHECKSUM(msg, len)
300 #define CMI_CHECK_CHECKSUM(msg, len)
301 #endif
302 /* =====End of Definitions of Message-Corruption Related Macros=====*/
303
304 /* =====Beginning of Declarations of Machine Specific Variables===== */
305 #include <signal.h>
306 #if !defined(_WIN32)
307 struct sigaction signal_int;
308 #else
309 void (*signal_int)(int);
310 #endif
311 static int _thread_provided = -1; /* Indicating MPI thread level */
312 static int idleblock = 0;
313
314 #if __FAULT__ 
315 typedef struct crashedrank{
316   int rank;
317   struct crashedrank *next;
318 } crashedRankList;
319 CpvDeclare(crashedRankList *, crashedRankHdr);
320 CpvDeclare(crashedRankList *, crashedRankPtr);
321 int isRankDie(int rank);
322 #endif
323
324 #if CMK_ONESIDED_IMPL
325 int srcRank;
326 #if CMK_SMP
327 //Lock used for incrementing rdmaTag in SMP mode
328 static CmiNodeLock rdmaTagLock = 0;
329 #endif
330
331 #define RDMA_BASE_TAG     TAG+2
332 #define RDMA_ACK_TAG      TAG-2
333 int rdmaTag=RDMA_BASE_TAG;
334 #include "machine-rdma.h"
335 #include "machine-onesided.h"
336 #endif //end of CMK_ONESIDED_IMPL
337
338 /* A simple list for msgs that have been sent by MPI_Isend */
339 typedef struct msg_list {
340     char *msg;
341     struct msg_list *next;
342     int size, destpe, mode, type;
343     MPI_Request req;
344 #if CMK_ONESIDED_IMPL
345     void *ref;
346     // This field can store the pointer to any structure that might have to be accessed.
347     // For rdma messages, it stores the pointer to rdma buffer specific information (ack, tag)
348     //for rdma messages, tag is greater than RDMA_BASE_TAG; regular messages, it is 0
349 #endif
350 #if __FAULT__
351     int dstrank; //used in fault tolerance protocol, if the destination is the died rank, delete the msg
352 #endif
353 } SMSG_LIST;
354
355 CpvDeclare(SMSG_LIST *, sent_msgs);
356 CpvDeclare(SMSG_LIST *, end_sent);
357
358 CpvDeclare(int, MsgQueueLen);
359 static int request_max;
360 /*FLAG: consume outstanding Isends in scheduler loop*/
361 static int no_outstanding_sends=0;
362
363 #if NODE_0_IS_CONVHOST
364 int inside_comm = 0;
365 #endif
366
367 typedef struct ProcState {
368 #if MULTI_SENDQUEUE
369     PCQueue      postMsgBuf;       /* per processor message sending queue */
370 #endif
371     CmiNodeLock  recvLock;                  /* for cs->recv */
372 } ProcState;
373 static ProcState  *procState;
374
375 #if CMK_SMP && !MULTI_SENDQUEUE
376 PCQueue postMsgBuf;
377 static CmiNodeLock  postMsgBufLock = NULL;        /* for postMsgBuf */
378 #endif
379 /* =====End of Declarations of Machine Specific Variables===== */
380
381 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
382 #define FAIL_TAG   1200
383 int num_workpes, total_pes;
384 int *petorank = NULL;
385 int  nextrank;
386 void mpi_end_spare(void);
387 #endif
388
389 /* =====Beginning of Declarations of Machine Specific Functions===== */
390 /* Utility functions */
391
392 static size_t CheckAllAsyncMsgsSent(void);
393 static void ReleasePostedMessages(void);
394 static int PumpMsgs(void);
395 static void PumpMsgsBlocking(void);
396
397 #if CMK_SMP
398 static int MsgQueueEmpty(void);
399 static int RecvQueueEmpty(void);
400 static int SendMsgBuf(void);
401 static  void EnqueueMsg(void *m, int size, int node, int mode, int type, void *ref);
402 #endif
403
404 /* ### End of Machine-running Related Functions ### */
405
406 /* ### Beginning of Idle-state Related Functions ### */
407 void CmiNotifyIdleForMPI(void);
408 /* ### End of Idle-state Related Functions ### */
409
410 /* =====End of Declarations of Machine Specific Functions===== */
411
412 /**
413  *  Macros that overwrites the common codes, such as
414  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
415  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
416  *  CMK_OFFLOAD_BCAST_PROCESS etc.
417  */
418 #define CMK_HAS_SIZE_IN_MSGHDR 0
419 #include "machine-lrts.h"
420 #include "machine-common-core.C"
421
422 #if USE_MPI_CTRLMSG_SCHEME
423 #include "machine-ctrlmsg.c"
424 #endif
425
426 SMSG_LIST *allocateSmsgList(char *msg, int destNode, int size, int mode, int type, void *ref) {
427   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
428   msg_tmp->msg = msg;
429   msg_tmp->destpe = destNode;
430   msg_tmp->size = size;
431   msg_tmp->next = 0;
432   msg_tmp->mode = mode;
433   msg_tmp->type = type;
434 #if CMK_ONESIDED_IMPL
435   msg_tmp->ref = ref;
436 #endif
437   return msg_tmp;
438 }
439
440 /* The machine specific msg-sending function */
441
442 #if CMK_SMP
443 static void EnqueueMsg(void *m, int size, int node, int mode, int type, void *ref) {
444     /*SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
445     SMSG_LIST *msg_tmp = allocateSmsgList((char *)m, node, size, mode, type, ref);
446     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
447     msg_tmp->msg = (char *)m;
448     msg_tmp->size = size;
449     msg_tmp->destpe = node;
450     msg_tmp->next = 0;
451     msg_tmp->mode = mode;
452 #if CMK_ONESIDED_IMPL
453     msg_tmp->ref = NULL;
454 #endif
455
456 #if MULTI_SENDQUEUE
457     PCQueuePush(procState[CmiMyRank()].postMsgBuf,(char *)msg_tmp);
458 #else
459     /*CmiLock(postMsgBufLock);*/
460     PCQueuePush(postMsgBuf,(char *)msg_tmp);
461     /*CmiUnlock(postMsgBufLock);*/
462 #endif
463
464     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, postMsgBuf, PCQueueLength(postMsgBuf));
465 }
466 #endif
467
468 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
469 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
470     int node = smsg->destpe;
471     int size = smsg->size;
472     char *msg = smsg->msg;
473     int mode = smsg->mode;
474     int dstrank;
475
476     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
477 #if CMK_ERROR_CHECKING
478     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
479     CMI_SET_CHECKSUM(msg, size);
480 #endif
481
482 #if MPI_POST_RECV
483     if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
484 #if MPI_DYNAMIC_POST_RECV
485         int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
486         START_TRACE_SENDCOMM(msg);
487         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,charmComm,&(smsg->req)))
488             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
489         END_TRACE_SENDCOMM(msg);
490 #else
491         START_TRACE_SENDCOMM(msg);
492         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,charmComm,&(smsg->req)))
493             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
494         END_TRACE_SENDCOMM(msg);
495 #endif
496     } else {
497         START_TRACE_SENDCOMM(msg);
498             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
499             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
500         END_TRACE_SENDCOMM(msg);
501     }
502 #elif USE_MPI_CTRLMSG_SCHEME
503     sendViaCtrlMsg(node, size, msg, smsg);
504 #else
505 /* branch not using MPI_POST_RECV or USE_MPI_CTRLMSG_SCHEME */
506
507 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
508         dstrank = petorank[node];
509         smsg->dstrank = dstrank;
510 #else
511         dstrank=node;
512 #endif
513     START_TRACE_SENDCOMM(msg)
514     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
515         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
516     END_TRACE_SENDCOMM(msg)
517 #endif /* end of #if MPI_POST_RECV */
518
519     MACHSTATE(3,"}MPI_Isend end");
520     CpvAccess(MsgQueueLen)++;
521     if (CpvAccess(sent_msgs)==0)
522         CpvAccess(sent_msgs) = smsg;
523     else {
524         CpvAccess(end_sent)->next = smsg;
525     }
526     CpvAccess(end_sent) = smsg;
527
528 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
529     if (mode == P2P_SYNC || mode == P2P_ASYNC)
530     {
531     while (CpvAccess(MsgQueueLen) > request_max) {
532         ReleasePostedMessages();
533         PumpMsgs();
534     }
535     }
536 #endif
537
538     return (CmiCommHandle) &(smsg->req);
539 }
540
541 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode) {
542     /* Ignoring the mode for MPI layer */
543
544     CmiState cs = CmiGetState();
545     SMSG_LIST *msg_tmp;
546
547     CmiAssert(destNode != CmiMyNodeGlobal());
548     // Mark the message type as REGULAR to indicate a regular charm message
549     CMI_MSGTYPE(msg) = REGULAR;
550 #if CMK_SMP
551     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
552       EnqueueMsg(msg, size, destNode, mode, REGULAR, NULL);
553       return 0;
554     }
555 #endif
556     /* non smp */
557     /*msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
558     msg_tmp = allocateSmsgList(msg, destNode, size, mode, REGULAR, NULL);
559     return MPISendOneMsg(msg_tmp);
560 }
561
562 static size_t CheckAllAsyncMsgsSent(void) {
563     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
564     MPI_Status sts;
565     int done;
566
567     while (msg_tmp!=0) {
568         done = 0;
569         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
570             CmiAbort("CheckAllAsyncMsgsSent: MPI_Test failed!\n");
571 #if __FAULT__ 
572         if(isRankDie(msg_tmp->dstrank)){
573           //CmiPrintf("[%d][%d] msg to crashed rank\n",CmiMyPartition(),CmiMyPe());
574           //CmiAbort("unexpected send");
575           done = 1;
576         }
577 #endif
578         if (!done)
579             return 0;
580         msg_tmp = msg_tmp->next;
581         /*    MsgQueueLen--; ????? */
582     }
583     return 1;
584 }
585
586 int CheckAsyncMsgSent(CmiCommHandle c) {
587
588     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
589     int done;
590     MPI_Status sts;
591
592     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
593         msg_tmp = msg_tmp->next;
594     if (msg_tmp) {
595         done = 0;
596         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
597             CmiAbort("CheckAsyncMsgSent: MPI_Test failed!\n");
598         return ((done)?1:0);
599     } else {
600         return 1;
601     }
602 }
603
604 #if CMK_ONESIDED_IMPL
605 #include "machine-onesided.c"
606 #endif
607
608 /* ######Beginning of functions related with communication progress ###### */
609 static void ReleasePostedMessages(void) {
610     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
611     SMSG_LIST *prev=0;
612     SMSG_LIST *temp;
613
614     int done;
615     MPI_Status sts;
616
617
618     MACHSTATE1(2,"ReleasePostedMessages begin on %d {", CmiMyPe());
619     while (msg_tmp!=0) {
620         done =0;
621 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
622         double startT = CmiWallTimer();
623 #endif
624         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
625             CmiAbort("ReleasePostedMessages: MPI_Test failed!\n");
626 #if __FAULT__ 
627         if (isRankDie(msg_tmp->dstrank)){
628           done = 1;
629         }
630 #endif
631         if (done) {
632             MACHSTATE2(3,"ReleasePostedMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
633             CpvAccess(MsgQueueLen)--;
634             /* Release the message */
635             temp = msg_tmp->next;
636             if (prev==0) /* first message */
637                 CpvAccess(sent_msgs) = temp;
638             else
639                 prev->next = temp;
640 #if CMK_ONESIDED_IMPL
641             // Update end_sent for consistent states during possible insertions
642             if(CpvAccess(end_sent) == msg_tmp) {
643               CpvAccess(end_sent) = prev;
644             }
645             if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_RECV) {
646                 // MPI_Irecv posted as a part of the Direct API was completed
647                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
648
649                 // Invoke the destination ack
650                 ncpyOpInfo->ackMode = CMK_DEST_ACK; // Only invoke destination ack
651
652                 // On the destination the NcpyOperationInfo is freed for the Direct API
653                 // but not freed for the Entry Method API as it a part of the parameter marshalled message
654                 // and is enentually freed by the RTS
655                 CmiInvokeNcpyAck(ncpyOpInfo);
656
657             } else if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_SEND) {
658                 // MPI_Isend posted as a part of the Direct API was completed
659                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
660                 // Invoke the source ack
661                 ncpyOpInfo->ackMode = CMK_SRC_ACK; // Only invoke the source ack
662
663                 // Free the NcpyOperationInfo on the source
664                 ncpyOpInfo->freeMe = CMK_FREE_NCPYOPINFO;
665
666                 CmiInvokeNcpyAck(ncpyOpInfo);
667             }
668             else if(msg_tmp->type == POST_DIRECT_SEND || msg_tmp->type == POST_DIRECT_RECV) {
669                 // do nothing as the received message is a NcpyOperationInfo object
670                 // which is freed in the above code (either ONESIDED_BUFFER_DIRECT_RECV or
671                 // ONESIDED_BUFFER_DIRECT_SEND)
672             }
673             else
674 #endif
675             {
676               CmiFree(msg_tmp->msg);
677             }
678             /* CmiFree(msg_tmp); */
679             free(msg_tmp);
680             msg_tmp = temp;
681         } else {
682             prev = msg_tmp;
683             msg_tmp = msg_tmp->next;
684         }
685 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
686         {
687             double endT = CmiWallTimer();
688             /* only record the event if it takes more than 1ms */
689             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
690         }
691 #endif
692     }
693     MACHSTATE(2,"} ReleasePostedMessages end");
694 }
695
696 static int PumpMsgs(void) {
697     int nbytes, flg, res;
698     char *msg;
699     MPI_Status sts;
700     int recd=0;
701
702 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
703     int recvCnt=0;
704 #endif
705
706     MACHSTATE(2,"PumpMsgs begin {");
707
708 #if CMI_DYNAMIC_EXERT_CAP
709     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
710 #endif
711
712     while (1) {
713         int doSyncRecv = 1;
714 #if CMI_EXERT_RECV_CAP
715         if (recvCnt==RECV_CAP) break;
716 #elif CMI_DYNAMIC_EXERT_CAP
717         if (recvCnt >= dynamicRecvCap) break;
718 #endif
719
720 #if USE_MPI_CTRLMSG_SCHEME
721         doSyncRecv = 0;
722         nbytes = recvViaCtrlMsg();
723   recd = 1;
724         if(nbytes == -1) break;
725 #elif MPI_POST_RECV
726                 /* First check posted recvs then do  probe unmatched outstanding messages */
727         MPIPostRecvList *postedOne = NULL;
728         int completed_index = -1;
729         flg = 0;
730 #if MPI_DYNAMIC_POST_RECV
731         MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
732         if (oldPostRecvPtr) {
733             /* post recv buf inited */
734             do {
735                 /* round-robin iteration over the list */
736                 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
737                 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
738                     CmiAbort("PumpMsgs: MPI_Testany failed!\n");
739
740                 if (flg) {
741                     postedOne = cur;
742                     break;
743                 }
744                 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
745             } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
746         }
747 #else
748         MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
749         if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
750             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
751 #endif
752         CONDITIONAL_TRACE_USER_EVENT(60); /* MPI_Test related user event */
753         if (flg) {
754             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
755                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
756
757             recd = 1;
758 #if !MPI_DYNAMIC_POST_RECV
759             postedOne = CpvAccess(curPostRecvPtr);
760 #endif
761             msg = (postedOne->postedRecvBufs)[completed_index];
762             (postedOne->postedRecvBufs)[completed_index] = NULL;
763
764             CpvAccess(Cmi_posted_recv_total)++;
765         } else {
766             START_EVENT();
767             res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, charmComm, &flg, &sts);
768             if (res != MPI_SUCCESS)
769                 CmiAbort("MPI_Iprobe failed\n");
770             if (!flg) break;
771             
772             CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
773             recd = 1;
774             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
775             msg = (char *) CmiAlloc(nbytes);
776
777 #if USE_ASYNC_RECV_FUNC
778             if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
779 #endif            
780             if(doSyncRecv){
781                 START_EVENT();
782                 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
783                     CmiAbort("PumpMsgs: MPI_Recv failed!\n");                
784             }
785 #if USE_ASYNC_RECV_FUNC        
786             else {
787                 START_EVENT();
788                 IRecvList one = irecvListEntryAllocate();
789                 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
790                     CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
791                 /*printf("[%d]: irecv msg=%p, nbytes=%d, src=%d, tag=%d\n", CmiMyPe(), msg, nbytes, sts.MPI_SOURCE, sts.MPI_TAG);*/
792                 one->msg = msg;
793                 one->size = nbytes;
794                 one->next = NULL;
795                 waitIrecvListTail->next = one;
796                 waitIrecvListTail = one;
797                 CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
798             }
799 #endif
800             CpvAccess(Cmi_unposted_recv_total)++;
801         }
802 #else
803         /* Original version of not using MPI_POST_RECV and USE_MPI_CTRLMSG_SCHEME */
804         START_EVENT();
805         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
806         if (res != MPI_SUCCESS)
807             CmiAbort("MPI_Iprobe failed\n");
808
809         if (!flg) break;
810         CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
811         
812         recd = 1;
813         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
814         msg = (char *) CmiAlloc(nbytes);
815
816 #if USE_ASYNC_RECV_FUNC
817         if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
818 #endif        
819         if(doSyncRecv){
820             START_EVENT();
821             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
822                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
823         }
824 #if USE_ASYNC_RECV_FUNC        
825         else {
826             IRecvList one = irecvListEntryAllocate();
827             if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
828                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
829             one->msg = msg;
830             one->size = nbytes;
831             one->next = NULL;
832             waitIrecvListTail->next = one;
833             waitIrecvListTail = one;
834             /*printf("PE[%d]: MPI_Irecv msg=%p, size=%d, entry=%p\n", CmiMyPe(), msg, nbytes, one);*/
835             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
836         }
837 #endif
838
839 #endif /*end of !MPI_POST_RECV and !USE_MPI_CTRLMSG_SCHEME*/
840
841                 if(doSyncRecv){
842                         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
843                         CMI_CHECK_CHECKSUM(msg, nbytes);
844         #if CMK_ERROR_CHECKING
845                         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
846                                 CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
847                                 CmiFree(msg);
848                                 CmiAbort("Abort!\n");
849                                 continue;
850                         }
851         #endif
852             if(CMI_MSGTYPE(msg) == REGULAR) {
853               handleOneRecvedMsg(nbytes, msg);
854             } else if(CMI_MSGTYPE(msg) == POST_DIRECT_RECV || CMI_MSGTYPE(msg) == POST_DIRECT_SEND) {
855
856               NcpyOperationInfo *ncpyOpInfoMsg = (NcpyOperationInfo *)msg;
857               resetNcpyOpInfoPointers(ncpyOpInfoMsg);
858
859               int postMsgType, myPe, otherPe;
860               const void *myBuffer;
861               if(CMI_MSGTYPE(msg) == POST_DIRECT_RECV) {
862                 // Direct Buffer destination, post MPI_Irecv
863                 postMsgType = ONESIDED_BUFFER_DIRECT_RECV;
864                 myPe = ncpyOpInfoMsg->destPe;
865                 otherPe = ncpyOpInfoMsg->srcPe;
866                 myBuffer = ncpyOpInfoMsg->destPtr;
867               }
868               else {
869                 // Direct Buffer Source, post MPI_Isend
870                 postMsgType = ONESIDED_BUFFER_DIRECT_SEND;
871                 myPe = ncpyOpInfoMsg->srcPe;
872                 otherPe = ncpyOpInfoMsg->destPe;
873                 myBuffer = ncpyOpInfoMsg->srcPtr;
874               }
875
876               MPIPostOneBuffer(myBuffer,
877                                ncpyOpInfoMsg,
878                                ncpyOpInfoMsg->srcSize,
879                                otherPe,
880                                ncpyOpInfoMsg->tag,
881                                postMsgType);
882
883             } else {
884               CmiAbort("Invalid Type of message\n");
885             }
886         }
887         
888 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
889         recordMsgHistogramInfo(nbytes);
890 #endif
891
892 #if  MPI_POST_RECV
893 #if MPI_DYNAMIC_POST_RECV
894         if (postedOne) {
895             //printf("[%d]: get one posted recv\n", CmiMyPe());
896             /* Get the upper size of this buffer */
897             int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
898             int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
899             /* Has to re-allocate the buffer for the message */
900             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
901
902             /* and repost the recv */
903             START_EVENT();
904
905             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
906                                          postRecvBufSize,
907                                          MPI_BYTE,
908                                          MPI_ANY_SOURCE,
909                                          postRecvTag,
910                                          charmComm,
911                                          &((postedOne->postedRecvReqs)[completed_index])  ))
912                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
913             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
914         }
915 #else
916         if (postedOne) {
917             /* Has to re-allocate the buffer for the message */
918             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
919
920             /* and repost the recv */
921             START_EVENT();
922             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
923                                          MPI_POST_RECV_SIZE,
924                                          MPI_BYTE,
925                                          MPI_ANY_SOURCE,
926                                          POST_RECV_TAG,
927                                          charmComm,
928                                          &((postedOne->postedRecvReqs)[completed_index])  ))
929                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
930             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
931         }
932 #endif /* not MPI_DYNAMIC_POST_RECV */
933 #endif
934
935 #if CMI_EXERT_RECV_CAP
936         recvCnt++;
937 #elif CMI_DYNAMIC_EXERT_CAP
938         recvCnt++;
939 #if CMK_SMP
940         /* check postMsgBuf to get the  number of messages that have not been sent
941              * which is only available in SMP mode
942          * MsgQueueLen indicates the number of messages that have not been released
943              * by MPI
944              */
945         if (PCQueueLength(postMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
946                 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
947             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
948         }
949 #else
950         /* MsgQueueLen indicates the number of messages that have not been released
951              * by MPI
952              */
953         if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
954             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
955         }
956 #endif
957
958 #endif
959
960     }
961
962 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
963 /* Another loop to check the irecved msgs list */
964 {
965         /*TODO: msg cap (throttling) is not exerted here */
966     IRecvList irecvEnt;
967     int irecvDone = 0;
968     MPI_Status sts;
969     while(waitIrecvListHead->next) {
970         IRecvList irecvEnt = waitIrecvListHead->next;
971                 
972         /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
973         if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
974             CmiAbort("PumpMsgs: MPI_Test failed!\n");
975         if(!irecvDone) break; /* in-order recv */
976
977         /*printf("PE[%d]: irecv entry=%p finished with size=%d, msg=%p\n", CmiMyPe(), irecvEnt, irecvEnt->size, irecvEnt->msg);*/
978         
979         handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
980         waitIrecvListHead->next = irecvEnt->next;
981         irecvListEntryFree(irecvEnt);
982         //recd = 1;        
983     }
984     if(waitIrecvListHead->next == NULL)
985         waitIrecvListTail = waitIrecvListHead;
986 }
987 #endif
988
989
990     MACHSTATE(2,"} PumpMsgs end ");
991     return recd;
992 }
993
994 /* blocking version */
995 static void PumpMsgsBlocking(void) {
996     static int maxbytes = 20000000;
997     static char *buf = NULL;
998     int nbytes, flg;
999     MPI_Status sts;
1000     char *msg;
1001     int recd=0;
1002
1003     if (!PCQueueEmpty(CmiGetState()->recv)) return;
1004     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
1005     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
1006     if (CpvAccess(sent_msgs))  return;
1007
1008 #if 0
1009     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
1010 #endif
1011
1012     if (buf == NULL) {
1013         buf = (char *) CmiAlloc(maxbytes);
1014         _MEMCHECK(buf);
1015     }
1016
1017
1018 #if MPI_POST_RECV
1019 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
1020     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
1021 #endif
1022
1023     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, charmComm,&sts))
1024         CmiAbort("PumpMsgs: PMP_Recv failed!\n");    
1025
1026     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
1027     msg = (char *) CmiAlloc(nbytes);
1028     memcpy(msg, buf, nbytes);
1029
1030 #if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
1031     char tmp[32];
1032     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
1033     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
1034 #endif
1035
1036     handleOneRecvedMsg(nbytes, msg);
1037 }
1038
1039
1040 #if CMK_SMP
1041
1042 /* called by communication thread in SMP */
1043 static int SendMsgBuf(void) {
1044     SMSG_LIST *msg_tmp;
1045     char *msg;
1046     int node, rank, size;
1047     int i;
1048     int sent = 0;
1049
1050 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1051     int sentCnt = 0;
1052 #endif
1053
1054 #if CMI_DYNAMIC_EXERT_CAP
1055     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1056 #endif
1057
1058     MACHSTATE(2,"SendMsgBuf begin {");
1059 #if MULTI_SENDQUEUE
1060     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
1061         if (!PCQueueEmpty(procState[i].postMsgBuf)) {
1062             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].postMsgBuf);
1063 #else
1064     /* single message sending queue */
1065     /* CmiLock(postMsgBufLock); */
1066     msg_tmp = (SMSG_LIST *)PCQueuePop(postMsgBuf);
1067     /* CmiUnlock(postMsgBufLock); */
1068     while (NULL != msg_tmp) {
1069 #endif
1070
1071 #if CMK_ONESIDED_IMPL
1072             if(msg_tmp->type == ONESIDED_BUFFER_DIRECT_RECV || msg_tmp->type == ONESIDED_BUFFER_DIRECT_SEND) {
1073                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)(msg_tmp->ref);
1074                 MPISendOrRecvOneBuffer(msg_tmp, ncpyOpInfo->tag);
1075             }
1076             else
1077 #endif
1078             {
1079                 MPISendOneMsg(msg_tmp);
1080             }
1081             sent=1;
1082
1083 #if CMI_EXERT_SEND_CAP
1084             if (++sentCnt == SEND_CAP) break;
1085 #elif CMI_DYNAMIC_EXERT_CAP
1086             if (++sentCnt >= dynamicSendCap) break;
1087             if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1088                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1089 #endif
1090
1091 #if ! MULTI_SENDQUEUE
1092             /* CmiLock(postMsgBufLock); */
1093             msg_tmp = (SMSG_LIST *)PCQueuePop(postMsgBuf);
1094             /* CmiUnlock(postMsgBufLock); */
1095 #endif
1096         }
1097 #if MULTI_SENDQUEUE
1098     }
1099 #endif
1100     MACHSTATE(2,"}SendMsgBuf end ");
1101     return sent;
1102 }
1103
1104 static int MsgQueueEmpty() {
1105     int i;
1106 #if MULTI_SENDQUEUE
1107     for (i=0; i<_Cmi_mynodesize; i++)
1108         if (!PCQueueEmpty(procState[i].postMsgBuf)) return 0;
1109 #else
1110     return PCQueueEmpty(postMsgBuf);
1111 #endif
1112     return 1;
1113 }
1114
1115 /* test if all processors recv queues are empty */
1116 static int RecvQueueEmpty() {
1117     int i;
1118     for (i=0; i<_Cmi_mynodesize; i++) {
1119         CmiState cs=CmiGetStateN(i);
1120         if (!PCQueueEmpty(cs->recv)) return 0;
1121     }
1122     return 1;
1123 }
1124
1125
1126 #define REPORT_COMM_METRICS 0
1127 #if REPORT_COMM_METRICS
1128 static double pumptime = 0.0;
1129 static double releasetime = 0.0;
1130 static double sendtime = 0.0;
1131 #endif
1132
1133 #endif //end of CMK_SMP
1134
1135 void LrtsAdvanceCommunication(int whenidle) {
1136 #if REPORT_COMM_METRICS
1137     double t1, t2, t3, t4;
1138     t1 = CmiWallTimer();
1139 #endif
1140
1141 #if CMK_SMP
1142     PumpMsgs();
1143
1144 #if REPORT_COMM_METRICS
1145     t2 = CmiWallTimer();
1146 #endif
1147
1148     ReleasePostedMessages();
1149 #if REPORT_COMM_METRICS
1150     t3 = CmiWallTimer();
1151 #endif
1152
1153     SendMsgBuf();
1154
1155 #if REPORT_COMM_METRICS
1156     t4 = CmiWallTimer();
1157     pumptime += (t2-t1);
1158     releasetime += (t3-t2);
1159     sendtime += (t4-t3);
1160 #endif
1161
1162 #else /* non-SMP case */
1163     ReleasePostedMessages();
1164
1165 #if REPORT_COMM_METRICS
1166     t2 = CmiWallTimer();
1167 #endif
1168     PumpMsgs();
1169
1170 #if REPORT_COMM_METRICS
1171     t3 = CmiWallTimer();
1172     pumptime += (t3-t2);
1173     releasetime += (t2-t1);
1174 #endif
1175
1176 #endif /* end of #if CMK_SMP */
1177 }
1178 /* ######End of functions related with communication progress ###### */
1179
1180 void LrtsPostNonLocal(void) {
1181 #if !CMK_SMP
1182     if (no_outstanding_sends) {
1183         while (CpvAccess(MsgQueueLen)>0) {
1184             LrtsAdvanceCommunication(0);
1185         }
1186     }
1187
1188     /* FIXME: I don't think the following codes are needed because
1189      * it repeats the same job of the next call of CmiGetNonLocal
1190      */
1191 #if 0
1192     if (!msg) {
1193         ReleasePostedMessages();
1194         if (PumpMsgs())
1195             return  PCQueuePop(cs->recv);
1196         else
1197             return 0;
1198     }
1199 #endif
1200 #else
1201   if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1202         ReleasePostedMessages();       
1203         /* ??? SendMsgBuf is a not a thread-safe function. If it is put
1204          * here and this function will be called in CmiNotifyStillIdle,
1205          * then a data-race problem occurs */
1206         /*SendMsgBuf();*/
1207   }
1208 #endif
1209 }
1210
1211 /* Idle-state related functions: called in non-smp mode */
1212 void CmiNotifyIdleForMPI(void) {
1213     ReleasePostedMessages();
1214     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1215 }
1216
1217 /* Network progress function is used to poll the network when for
1218    messages. This flushes receive buffers on some  implementations*/
1219 #if CMK_MACHINE_PROGRESS_DEFINED
1220 CMI_EXTERNC
1221 void CommunicationServerThread(int);
1222
1223 void CmiMachineProgressImpl(void) {
1224 #if !CMK_SMP
1225     PumpMsgs();
1226 #if CMK_IMMEDIATE_MSG
1227     CmiHandleImmediate();
1228 #endif
1229 #else
1230     /*Not implemented yet. Communication server does not seem to be
1231       thread safe, so only communication thread call it */
1232     if (CmiMyRank() == CmiMyNodeSize())
1233         CommunicationServerThread(0);
1234 #endif
1235 }
1236 #endif
1237
1238 /* ######Beginning of functions related with exiting programs###### */
1239 void LrtsDrainResources(void) {
1240 #if !CMK_SMP
1241     while (!CheckAllAsyncMsgsSent()) {
1242         PumpMsgs();
1243         ReleasePostedMessages();
1244     }
1245 #else
1246     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
1247         while (!MsgQueueEmpty() || !CheckAllAsyncMsgsSent()) {
1248             ReleasePostedMessages();
1249             SendMsgBuf();
1250             PumpMsgs();
1251         }
1252     }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1253         while(!CheckAllAsyncMsgsSent()) {
1254             ReleasePostedMessages();
1255         }
1256     }
1257 #endif
1258 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
1259     if (CmiMyPe() == 0 && CmiMyPartition()==0)
1260     { 
1261       mpi_end_spare();
1262     }
1263 #endif
1264     MACHSTATE(2, "Machine exit barrier begin {");
1265     START_EVENT();
1266     if (MPI_SUCCESS != MPI_Barrier(charmComm))
1267         CmiAbort("LrtsDrainResources: MPI_Barrier failed!\n");
1268     END_EVENT(10);
1269     MACHSTATE(2, "} Machine exit barrier end");
1270 }
1271
1272 void LrtsExit(int exitcode) {
1273     int i;
1274 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1275     int doPrint = 0;
1276     if (CmiMyNode()==0) doPrint = 1;
1277
1278     if (doPrint /*|| CmiMyNode()%11==0 */) {
1279 #if MPI_POST_RECV
1280         CmiPrintf("node[%d]: %llu posted receives,  %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1281 #endif
1282     }
1283 #endif
1284
1285 #if MPI_POST_RECV
1286     {
1287         MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
1288         if (ptr) {
1289             do {
1290                 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
1291                 ptr = ptr->next;
1292             } while (ptr!=CpvAccess(postRecvListHdr));
1293         }
1294     }
1295 #endif
1296
1297 #if REPORT_COMM_METRICS
1298 #if CMK_SMP
1299     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
1300               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
1301               pumptime, releasetime, sendtime);
1302 #else
1303     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
1304               CmiMyPe(), pumptime, releasetime, sendtime);
1305 #endif
1306 #endif
1307     
1308    if(!CharmLibInterOperate || userDrivenMode) {
1309 #if ! CMK_AUTOBUILD
1310 #if !defined(_WIN32)
1311       sigaction(SIGINT, &signal_int, NULL);
1312 #else
1313       signal(SIGINT, signal_int);
1314 #endif
1315       MPI_Finalize();
1316 #endif
1317       exit(exitcode);
1318     }
1319 }
1320
1321 static void KillOnAllSigs(int sigNo) {
1322     static int already_in_signal_handler = 0;
1323     char *m;
1324     if (already_in_signal_handler) return;   /* MPI_Abort(charmComm,1); */
1325     already_in_signal_handler = 1;
1326
1327     CmiAbortHelper("Caught Signal", strsignal(sigNo), NULL, 1, 1);
1328 }
1329 /* ######End of functions related with exiting programs###### */
1330
1331
1332 /* ######Beginning of functions related with starting programs###### */
1333 static void registerMPITraceEvents(void) {
1334 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1335     traceRegisterUserEvent("MPI_Barrier", 10);
1336     traceRegisterUserEvent("MPI_Send", 20);
1337     traceRegisterUserEvent("MPI_Recv", 30);
1338     traceRegisterUserEvent("MPI_Isend", 40);
1339     traceRegisterUserEvent("MPI_Irecv", 50);
1340     traceRegisterUserEvent("MPI_Test[any]", 60);
1341     traceRegisterUserEvent("MPI_Iprobe", 70);
1342 #endif
1343 }
1344
1345 static const char *thread_level_tostring(int thread_level) {
1346 #if CMK_MPI_INIT_THREAD
1347     switch (thread_level) {
1348     case MPI_THREAD_SINGLE:
1349         return "MPI_THREAD_SINGLE";
1350     case MPI_THREAD_FUNNELED:
1351         return "MPI_THREAD_FUNNELED";
1352     case MPI_THREAD_SERIALIZED:
1353         return "MPI_THREAD_SERIALIZED";
1354     case MPI_THREAD_MULTIPLE :
1355         return "MPI_THREAD_MULTIPLE";
1356     default: {
1357         char *str = (char *)malloc(5); // XXX: leaked
1358         sprintf(str,"%d", thread_level);
1359         return str;
1360     }
1361     }
1362     return  "unknown";
1363 #else
1364     char *str = (char *)malloc(5); // XXX: leaked
1365     sprintf(str,"%d", thread_level);
1366     return str;
1367 #endif
1368 }
1369
1370 CMI_EXTERNC_VARIABLE int quietMode;
1371
1372 /**
1373  *  Obtain the number of nodes, my node id, and consuming machine layer
1374  *  specific arguments
1375  */
1376 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1377     int n,i;
1378     int ver, subver;
1379     int provided;
1380     int thread_level;
1381     int myNID;
1382     int largc=*argc;
1383     char** largv=*argv;
1384     int tagUbGetResult;
1385     void *tagUbVal;
1386
1387     if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
1388 #if CMK_SMP
1389       Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
1390 #else
1391       CmiAbort("+comm_thread_only_recv option can only be used with SMP version of Charm++");
1392 #endif
1393     }
1394
1395     *argc = CmiGetArgc(largv);     /* update it in case it is out of sync */
1396
1397     if(!CharmLibInterOperate || userDrivenMode) {
1398 #if CMK_MPI_INIT_THREAD
1399 #if CMK_SMP
1400     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
1401         thread_level = MPI_THREAD_FUNNELED;
1402       else
1403         thread_level = MPI_THREAD_MULTIPLE;
1404 #else
1405       thread_level = MPI_THREAD_SINGLE;
1406 #endif
1407       MPI_Init_thread(argc, argv, thread_level, &provided);
1408       _thread_provided = provided;
1409 #else
1410       MPI_Init(argc, argv);
1411       thread_level = 0;
1412       _thread_provided = -1;
1413 #endif
1414     }
1415
1416     largc = *argc;
1417     largv = *argv;
1418     if(!CharmLibInterOperate || userDrivenMode) {
1419       MPI_Comm_dup(MPI_COMM_WORLD, &charmComm);
1420     }
1421     MPI_Comm_size(charmComm, numNodes);
1422     MPI_Comm_rank(charmComm, myNodeID);
1423
1424 #if CMK_ONESIDED_IMPL
1425     /* srcRank stores the rank of the sender MPI process
1426      * and is a global variable used for rdma md messages */
1427     srcRank = *myNodeID;
1428     MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tagUbVal, &tagUbGetResult);
1429     CmiAssert(tagUbGetResult);
1430     tagUb = *(int *)tagUbVal;
1431 #endif
1432
1433     MPI_Bcast(&_Cmi_mynodesize, 1, MPI_INT, 0, charmComm);
1434
1435     myNID = *myNodeID;
1436
1437     MPI_Get_version(&ver, &subver);
1438     if(!CharmLibInterOperate) {
1439       if ((myNID == 0) && (!quietMode)) {
1440         printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
1441         printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
1442       }
1443     }
1444
1445 #if CMK_SMP
1446     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
1447         Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV; 
1448         if ((myNID == 0) && (!quietMode)) {
1449           printf("Charm++> +comm_thread_only_recv disabled\n");
1450         }
1451     }
1452 #endif
1453
1454     {
1455         int debug = CmiGetArgFlag(largv,"++debug");
1456         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1457         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1458 #if CMK_HAS_GETPID
1459             if (!quietMode) printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1460             fflush(stdout);
1461             if (!debug_no_pause)
1462                 sleep(15);
1463 #else
1464             if (!quietMode) printf("++debug ignored.\n");
1465 #endif
1466         }
1467     }
1468
1469
1470 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
1471     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1472        CmiAssert(num_workpes <= *numNodes);
1473        total_pes = *numNodes;
1474        *numNodes = num_workpes;
1475     }
1476     else
1477        total_pes = num_workpes = *numNodes;
1478     if (*myNodeID == 0)
1479        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1480     petorank = (int *)malloc(sizeof(int) * num_workpes);
1481     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1482     nextrank = num_workpes;
1483
1484     if (*myNodeID >= num_workpes) {    /* is spare processor */
1485       if(CmiGetArgFlag(largv,"+isomalloc_sync")){
1486           MPI_Barrier(charmComm);
1487           MPI_Barrier(charmComm);
1488           MPI_Barrier(charmComm);
1489           MPI_Barrier(charmComm);
1490       }
1491       MPI_Status sts;
1492       int vals[2];
1493       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
1494       int newpe = vals[0];
1495       CpvAccess(_curRestartPhase) = vals[1];
1496       
1497       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1498
1499       if (newpe == -1) {
1500           MPI_Barrier(charmComm);
1501           //MPI_Barrier(charmComm);
1502           MPI_Finalize();
1503           exit(0);
1504       }
1505
1506         /* update petorank */
1507       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
1508       nextrank = *myNodeID + 1;
1509       *myNodeID = newpe;
1510       myNID = newpe;
1511
1512        /* add +restartaftercrash to argv */
1513       char *phase_str;
1514       char **restart_argv;
1515       int i=0;
1516       while(largv[i]!= NULL) i++;
1517       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1518       i=0;
1519       while(largv[i]!= NULL){
1520                 restart_argv[i] = largv[i];
1521                 i++;
1522       }
1523       static char s_restartaftercrash[] = "+restartaftercrash";
1524       restart_argv[i] = s_restartaftercrash;
1525       phase_str = (char*)malloc(10);
1526       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1527       restart_argv[i+1]=phase_str;
1528       restart_argv[i+2]=NULL;
1529       *argv = restart_argv;
1530       *argc = i+2;
1531       largc = *argc;
1532       largv = *argv;
1533     }
1534 #endif
1535
1536     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1537     if (idleblock && _Cmi_mynode == 0 && !quietMode) {
1538         printf("Charm++: Running in idle blocking mode.\n");
1539     }
1540
1541 #if CMK_CHARMDEBUG
1542     /* setup signal handlers */
1543 #if !defined(_WIN32)
1544     struct sigaction sa;
1545     sa.sa_handler = KillOnAllSigs;
1546     sigemptyset(&sa.sa_mask);
1547     sa.sa_flags = SA_RESTART;
1548
1549     sigaction(SIGSEGV, &sa, NULL);
1550     sigaction(SIGFPE, &sa, NULL);
1551     sigaction(SIGILL, &sa, NULL);
1552     sigaction(SIGINT, &sa, &signal_int);
1553     sigaction(SIGTERM, &sa, NULL);
1554     sigaction(SIGABRT, &sa, NULL);
1555 #else
1556     signal(SIGSEGV, KillOnAllSigs);
1557     signal(SIGFPE, KillOnAllSigs);
1558     signal(SIGILL, KillOnAllSigs);
1559     signal_int = signal(SIGINT, KillOnAllSigs);
1560     signal(SIGTERM, KillOnAllSigs);
1561     signal(SIGABRT, KillOnAllSigs);
1562 #endif
1563 #if !defined(_WIN32) /*UNIX-only signals*/
1564     sigaction(SIGQUIT, &sa, NULL);
1565     sigaction(SIGBUS, &sa, NULL);
1566 #endif /*UNIX*/
1567 #endif
1568
1569 #if CMK_NO_OUTSTANDING_SENDS
1570     no_outstanding_sends=1;
1571 #endif
1572     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1573         no_outstanding_sends = 1;
1574         if ((myNID == 0) && (!quietMode))
1575             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1576                    no_outstanding_sends?"":" not");
1577     }
1578
1579     request_max=MAX_QLEN;
1580     CmiGetArgInt(largv,"+requestmax",&request_max);
1581     /*printf("request max=%d\n", request_max);*/
1582
1583 #if MPI_POST_RECV
1584     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1585     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1586     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1587     CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
1588     CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
1589     CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
1590     CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
1591     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1592     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1593     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1594     if ((myNID==0) && (!quietMode)) {
1595         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
1596                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
1597                MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
1598     }
1599 #endif
1600         
1601 #if USE_MPI_CTRLMSG_SCHEME
1602         CmiGetArgInt(largv, "+ctrlMsgCnt", &MPI_CTRL_MSG_CNT);
1603         if((myNID == 0) && (!quietMode)){
1604                 printf("Charm++: using the alternative ctrl msg scheme with %d pre-posted ctrl msgs\n", MPI_CTRL_MSG_CNT);
1605         }
1606 #endif
1607
1608 #if CMI_EXERT_SEND_CAP
1609     CmiGetArgInt(largv, "+dynCapSend", &SEND_CAP);
1610     if ((myNID==0) && (!quietMode)) {
1611         printf("Charm++: using static send cap %d\n", SEND_CAP);
1612     }
1613 #endif
1614 #if CMI_EXERT_RECV_CAP
1615     CmiGetArgInt(largv, "+dynCapRecv", &RECV_CAP);
1616     if ((myNID==0) && (!quietMode)) {
1617         printf("Charm++: using static recv cap %d\n", RECV_CAP);
1618     }
1619 #endif
1620 #if CMI_DYNAMIC_EXERT_CAP 
1621     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1622     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1623     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1624     if ((myNID==0) && (!quietMode)) {
1625         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1626                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1627     }
1628 #endif
1629
1630 #if USE_ASYNC_RECV_FUNC
1631     CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
1632     if((myNID==0) && (!quietMode)) {
1633         printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
1634     }
1635 #endif
1636
1637     /* checksum flag */
1638     if (CmiGetArgFlag(largv,"+checksum")) {
1639 #if CMK_ERROR_CHECKING
1640         checksum_flag = 1;
1641         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1642 #else
1643         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1644 #endif
1645     }
1646
1647     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1648     for (i=0; i<_Cmi_mynodesize+1; i++) {
1649 #if MULTI_SENDQUEUE
1650         procState[i].postMsgBuf = PCQueueCreate();
1651 #endif
1652         procState[i].recvLock = CmiCreateLock();
1653     }
1654 #if CMK_SMP
1655 #if !MULTI_SENDQUEUE
1656     postMsgBuf = PCQueueCreate();
1657     postMsgBufLock = CmiCreateLock();
1658 #endif
1659
1660 #if CMK_ONESIDED_IMPL && CMK_SMP
1661     rdmaTagLock = CmiCreateLock();
1662 #endif
1663 #endif
1664 }
1665
1666 INLINE_KEYWORD void LrtsNotifyIdle(void) {}
1667
1668 INLINE_KEYWORD void LrtsBeginIdle(void) {}
1669
1670 INLINE_KEYWORD void LrtsStillIdle(void) {}
1671
1672 void LrtsPreCommonInit(int everReturn) {
1673
1674 #if USE_MPI_CTRLMSG_SCHEME
1675         #if CMK_SMP
1676                 if(CmiMyRank() == CmiMyNodeSize()) createCtrlMsgIrecvBufs();
1677         #else
1678                 createCtrlMsgIrecvBufs();
1679         #endif
1680 #elif MPI_POST_RECV
1681     int doInit = 1;
1682     int i;
1683
1684 #if CMK_SMP
1685     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1686 #endif
1687
1688     /* Currently, in mpi smp, the main thread will be the comm thread, so
1689      *  only the comm thread should post recvs. Cpvs, however, need to be
1690      * created on rank 0 (the ptrs to the actual cpv memory), while
1691      * other ranks are busy waiting for this to finish. So cpv initialize
1692      * routines have to be called on every ranks, although they are only
1693      * useful on comm thread (whose rank is not zero) -Chao Mei
1694      */
1695     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1696     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1697     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1698     CpvInitialize(char **, CmiPostedRecvBuffers);
1699
1700     CpvAccess(CmiPostedRecvRequests) = NULL;
1701     CpvAccess(CmiPostedRecvBuffers) = NULL;
1702
1703     CpvInitialize(MPIPostRecvList *, postRecvListHdr);
1704     CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
1705     CpvInitialize(int, msgRecvCnt);
1706
1707     CpvAccess(postRecvListHdr) = NULL;
1708     CpvAccess(curPostRecvPtr) = NULL;
1709     CpvAccess(msgRecvCnt) = 0;
1710
1711 #if MPI_DYNAMIC_POST_RECV
1712     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1713 #endif
1714
1715     if (doInit) {
1716 #if MPI_DYNAMIC_POST_RECV
1717         MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
1718         /* including two more buckets that are out of the range [LOWERSIZE, UPPERSIZE] */
1719         MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
1720         CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1721         memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1722 #else
1723         /* Post some extra recvs to help out with incoming messages */
1724         /* On some MPIs the messages are unexpected and thus slow */
1725
1726         CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
1727
1728         /* An array of request handles for posted recvs */
1729         CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
1730         CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
1731         CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1732         /* An array of buffers for posted recvs */
1733         CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
1734         CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
1735         CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
1736
1737         /* Post Recvs */
1738         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1739             char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE); /* Note: could be aligned allocation?? */
1740             CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
1741             if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
1742                                          MPI_POST_RECV_SIZE,
1743                                          MPI_BYTE,
1744                                          MPI_ANY_SOURCE,
1745                                          POST_RECV_TAG,
1746                                          charmComm,
1747                                          CpvAccess(postRecvListHdr)->postedRecvReqs+i  ))
1748                 CmiAbort("MPI_Irecv failed\n");
1749         }
1750 #endif
1751     }
1752 #endif /* end of MPI_POST_RECV  and USE_MPI_CTRLMSG_SCHEME */
1753         
1754 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
1755     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1756     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1757     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1758 #endif
1759
1760 #if USE_ASYNC_RECV_FUNC || USE_MPI_CTRLMSG_SCHEME
1761 #if CMK_SMP
1762     /* allocate the guardian entry only on comm thread considering NUMA */
1763     if(CmiMyRank() == CmiMyNodeSize()) {
1764         waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1765         waitIrecvListHead->next = NULL;
1766     }
1767 #else    
1768     waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1769     waitIrecvListHead->next = NULL;
1770 #endif
1771 #endif
1772 #if __FAULT__ 
1773     CpvInitialize(crashedRankList *, crashedRankHdr);
1774     CpvInitialize(crashedRankList *, crashedRankPtr);
1775     CpvAccess(crashedRankHdr) = NULL;
1776     CpvAccess(crashedRankPtr) = NULL;
1777 #endif
1778 }
1779
1780 void LrtsPostCommonInit(int everReturn) {
1781
1782
1783     CpvInitialize(SMSG_LIST *, sent_msgs);
1784     CpvInitialize(SMSG_LIST *, end_sent);
1785     CpvInitialize(int, MsgQueueLen);
1786     CpvAccess(sent_msgs) = NULL;
1787     CpvAccess(end_sent) = NULL;
1788     CpvAccess(MsgQueueLen) = 0;
1789
1790 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1791     CpvInitialize(double, projTraceStart);
1792     /* only PE 0 needs to care about registration (to generate sts file). */
1793     if (CmiMyPe() == 0) {
1794         registerMachineUserEventsFunction(&registerMPITraceEvents);
1795     }
1796 #endif
1797
1798 }
1799 /* ######End of functions related with starting programs###### */
1800
1801 /***********************************************************************
1802  *
1803  * Abort function:
1804  *
1805  ************************************************************************/
1806
1807 void LrtsAbort(const char *message) {
1808     MPI_Abort(charmComm, 1);
1809     CMI_NORETURN_FUNCTION_END
1810 }
1811
1812 /**************************  TIMER FUNCTIONS **************************/
1813 #if CMK_TIMER_USE_SPECIAL
1814
1815 /* MPI calls are not threadsafe, even the timer on some machines */
1816 static CmiNodeLock  timerLock = 0;
1817 static int _absoluteTime = 0;
1818 static double starttimer = 0; 
1819 static int _is_global =0;
1820
1821 int CmiTimerIsSynchronized(void) {
1822     int  flag;
1823     void *v;
1824
1825     /*  check if it using synchronized timer */
1826 #if MPI_VERSION >= 2
1827     if (MPI_SUCCESS != MPI_Comm_get_attr(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
1828 #else
1829     if (MPI_SUCCESS != MPI_Attr_get(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
1830 #endif
1831         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1832     if (flag) {
1833         _is_global = *(int*)v;
1834         if (_is_global && CmiMyPe() == 0)
1835             printf("Charm++> MPI timer is synchronized\n");
1836     }
1837     return _is_global;
1838 }
1839
1840 int CmiTimerAbsolute(void) {
1841     return _absoluteTime;
1842 }
1843
1844 double CmiStartTimer(void) {
1845     return 0.0;
1846 }
1847
1848 double CmiInitTime(void) {
1849     return starttimer;
1850 }
1851
1852 void CmiTimerInit(char **argv) {
1853     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1854     if (_absoluteTime && CmiMyPe() == 0)
1855         printf("Charm++> absolute MPI timer is used\n");
1856
1857 #if ! CMK_MEM_CHECKPOINT && ! CMK_MESSAGE_LOGGING
1858     _is_global = CmiTimerIsSynchronized();
1859 #else
1860     _is_global = 0;
1861 #endif
1862
1863     if (_is_global) {
1864         if (CmiMyRank() == 0) {
1865             double minTimer;
1866             starttimer = MPI_Wtime();
1867
1868             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1869                           charmComm );
1870             starttimer = minTimer;
1871         }
1872     } else { /* we don't have a synchronous timer, set our own start time */
1873 #if ! CMK_MEM_CHECKPOINT && ! CMK_MESSAGE_LOGGING
1874         CmiBarrier();
1875         CmiBarrier();
1876         CmiBarrier();
1877 #endif
1878         starttimer = MPI_Wtime();
1879     }
1880
1881 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1882     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1883         timerLock = CmiCreateLock();
1884 #endif
1885     CmiNodeAllBarrier();          /* for smp */
1886 }
1887
1888 /**
1889  * Since the timerLock is never created, and is
1890  * always NULL, then all the if-condition inside
1891  * the timer functions could be disabled right
1892  * now in the case of SMP. --Chao Mei
1893  */
1894 double CmiTimer(void) {
1895     double t;
1896 #if 0 && CMK_SMP
1897     if (timerLock) CmiLock(timerLock);
1898 #endif
1899
1900     t = MPI_Wtime();
1901
1902 #if 0 && CMK_SMP
1903     if (timerLock) CmiUnlock(timerLock);
1904 #endif
1905
1906     return _absoluteTime?t: (t-starttimer);
1907 }
1908
1909 double CmiWallTimer(void) {
1910     double t;
1911 #if 0 && CMK_SMP
1912     if (timerLock) CmiLock(timerLock);
1913 #endif
1914
1915     t = MPI_Wtime();
1916
1917 #if 0 && CMK_SMP
1918     if (timerLock) CmiUnlock(timerLock);
1919 #endif
1920
1921     return _absoluteTime? t: (t-starttimer);
1922 }
1923
1924 double CmiCpuTimer(void) {
1925     double t;
1926 #if 0 && CMK_SMP
1927     if (timerLock) CmiLock(timerLock);
1928 #endif
1929     t = MPI_Wtime() - starttimer;
1930 #if 0 && CMK_SMP
1931     if (timerLock) CmiUnlock(timerLock);
1932 #endif
1933     return t;
1934 }
1935
1936 #endif     /* CMK_TIMER_USE_SPECIAL */
1937
1938 void LrtsBarrier(void)
1939 {
1940     if (MPI_SUCCESS != MPI_Barrier(charmComm))
1941         CmiAbort("Timernit: MPI_Barrier failed!\n");
1942 }
1943
1944 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1945 int CmiBarrierZero(void) {
1946     int i;
1947     if (CmiMyRank() == 0)
1948     {
1949         char msg[1];
1950         MPI_Status sts;
1951         if (CmiMyNode() == 0)  {
1952             for (i=0; i<CmiNumNodes()-1; i++) {
1953                 START_EVENT();
1954                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
1955                     CmiPrintf("MPI_Recv failed!\n");
1956
1957                 END_EVENT(30);
1958             }
1959         } else {
1960             START_EVENT();
1961
1962             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,charmComm))
1963                 printf("MPI_Send failed!\n");
1964
1965             END_EVENT(20);
1966         }
1967     }
1968     CmiNodeAllBarrier();
1969     return 0;
1970 }
1971
1972
1973 #if CMK_MEM_CHECKPOINT || CMK_MESSAGE_LOGGING
1974
1975 void mpi_restart_crashed(int pe, int rank)
1976 {
1977     int vals[2];
1978     vals[0] = CmiGetPeGlobal(pe,CmiMyPartition());
1979     vals[1] = CpvAccess(_curRestartPhase)+1;
1980     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
1981     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
1982 }
1983
1984 /* notify spare processors to exit */
1985 void mpi_end_spare(void)
1986 {
1987     int i;
1988     for (i=nextrank; i<total_pes; i++) {
1989         int vals[2] = {-1,-1};
1990         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
1991     }
1992 }
1993
1994 int find_spare_mpirank(int _pe,int partition)
1995 {
1996     if (nextrank == total_pes) {
1997       CmiAbort("Charm++> No spare processor available.");
1998     }
1999     int pe = CmiGetPeGlobal(_pe,partition);
2000     crashedRankList * crashedRank= (crashedRankList *)(malloc(sizeof(crashedRankList)));
2001     crashedRank->rank = petorank[pe];
2002     crashedRank->next=NULL;
2003     if(CpvAccess(crashedRankHdr)==NULL){
2004       CpvAccess(crashedRankHdr) = crashedRank;
2005       CpvAccess(crashedRankPtr) = CpvAccess(crashedRankHdr);
2006     }else{
2007       CpvAccess(crashedRankPtr)->next = crashedRank;
2008       CpvAccess(crashedRankPtr) = crashedRank;
2009     }
2010     petorank[pe] = nextrank;
2011     nextrank++;
2012     return nextrank-1;
2013 }
2014
2015 int isRankDie(int rank){
2016   crashedRankList * cur = CpvAccess(crashedRankHdr);
2017   crashedRankList * head = CpvAccess(crashedRankHdr);
2018   while(cur!=NULL){
2019     if(rank == cur->rank){
2020       CpvAccess(crashedRankHdr) = head;
2021       return 1;
2022     }
2023     cur = cur->next;
2024   }
2025   CpvAccess(crashedRankHdr) = head;
2026   return 0;
2027 }
2028
2029 void CkDieNow(void)
2030 {
2031 #if __FAULT__
2032     CmiPrintf("[%d] die now.\n", CmiMyPe());
2033
2034       /* release old messages */
2035     while (!CheckAllAsyncMsgsSent()) {
2036         PumpMsgs();
2037         ReleasePostedMessages();
2038     }
2039     MPI_Barrier(charmComm);
2040  //   MPI_Barrier(charmComm);
2041     MPI_Finalize();
2042     exit(0);
2043 #endif
2044 }
2045
2046 #endif
2047
2048 /*======Beginning of Msg Histogram or Dynamic Post-Recv Related Funcs=====*/
2049 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
2050 /* Functions related with capturing msg histogram */
2051
2052 #if MPI_DYNAMIC_POST_RECV
2053 /* Consume all messages in the request buffers */
2054 static void consumeAllMsgs(void)
2055 {
2056     MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
2057     if (ptr) {
2058         do {
2059             int i;
2060             for (i=0; i<ptr->bufCnt; i++) {
2061                 int done = 0;
2062                 MPI_Status sts;
2063
2064                 /* Indicating this entry has been tested before */
2065                 if (ptr->postedRecvBufs[i] == NULL) continue;
2066
2067                 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
2068                     CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
2069                 if (done) {
2070                     int nbytes;
2071                     char *msg;                    
2072                     
2073                     if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
2074                         CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
2075                     /* ready to handle this msg */
2076                     msg = (ptr->postedRecvBufs)[i];
2077                     (ptr->postedRecvBufs)[i] = NULL;
2078                     
2079                     handleOneRecvedMsg(nbytes, msg);
2080                 } else {
2081                     if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
2082                         CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
2083                 }
2084             }
2085             ptr = ptr->next;
2086         } while (ptr != CpvAccess(curPostRecvPtr));
2087     }
2088 }
2089
2090 static void recordMsgHistogramInfo(int size)
2091 {
2092     int idx = 0;
2093     size -= MPI_POST_RECV_LOWERSIZE;
2094     if (size > 0)
2095         idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
2096
2097     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
2098     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
2099 }
2100
2101 #define POST_RECV_USE_STATIC_PARAM 0
2102 #define POST_RECV_REPORT_STS 0
2103
2104 #if POST_RECV_REPORT_STS
2105 static int buildDynCallCnt = 0;
2106 #endif
2107
2108 static void buildDynamicRecvBuffers(void)
2109 {
2110     int i;
2111
2112     int local_MSG_CNT_THRESHOLD;
2113     int local_MSG_INC;
2114
2115 #if POST_RECV_REPORT_STS
2116     buildDynCallCnt++;
2117 #endif
2118
2119     /* For debugging usage */
2120     reportMsgHistogramInfo();
2121
2122     CpvAccess(msgRecvCnt) = 0;
2123     /* consume all outstanding msgs */
2124     consumeAllMsgs();
2125
2126 #if POST_RECV_USE_STATIC_PARAM
2127     local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
2128     local_MSG_INC = MPI_POST_RECV_MSG_INC;
2129 #else
2130     {
2131         int total = 0;
2132         int count = 0;
2133         for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2134             int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2135             /* avg is temporarily used for counting how many buckets are non-zero */
2136             if (tmp > 0)  {
2137                 total += tmp;
2138                 count++;
2139             }
2140         }
2141         if (count == 1) local_MSG_CNT_THRESHOLD = 1; /* Just filter out those zero-count msgs */
2142         else local_MSG_CNT_THRESHOLD = total / count /3; /* Catch >50% msgs NEED-BETTER-SCHEME HERE!!*/
2143         local_MSG_INC = total/count; /* Not having a good heuristic right now */
2144 #if POST_RECV_REPORT_STS
2145         printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
2146 #endif
2147     }
2148 #endif
2149
2150     /* First continue to find the first msg range that requires post recv */
2151     /* Ignore the fist and the last one because they are not tracked */
2152     MPIPostRecvList *newHdr = NULL;
2153     MPIPostRecvList *newListPtr = newHdr;
2154     MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
2155     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2156         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2157         if (count >= local_MSG_CNT_THRESHOLD) {
2158
2159 #if POST_RECV_REPORT_STS
2160             /* Report histogram results */
2161             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2162             int high = low + MSG_HISTOGRAM_BINSIZE;
2163             int reportCnt;
2164             if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
2165             else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2166             printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
2167 #endif
2168             /* find if this msg idx exists, the "i" is the msgSizeIdx, in the current list */
2169             int notFound = 1;
2170             MPIPostRecvList *newEntry = NULL;
2171             while (ptr) {
2172                 if (ptr->msgSizeIdx < i) {
2173                     /* free the buffer for this range of msg size */
2174                     MPIPostRecvList *nextptr = ptr->next;
2175
2176                     free(ptr->postedRecvReqs);
2177                     int j;
2178                     for (j=0; j<ptr->bufCnt; j++) {
2179                         if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2180                     }
2181                     free(ptr->postedRecvBufs);
2182                     ptr = nextptr;
2183                 } else if (ptr->msgSizeIdx == i) {
2184                     int newBufCnt, j;
2185                     int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2186                     newEntry = ptr;
2187                     /* Do some adjustment according to the current statistics */
2188                     if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
2189                     else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2190                     if (newBufCnt != ptr->bufCnt) {
2191                         /* free old buffers, and allocate new buffers */
2192                         free(ptr->postedRecvReqs);
2193                         ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
2194                         for (j=0; j<ptr->bufCnt; j++) {
2195                             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2196                         }
2197                         free(ptr->postedRecvBufs);
2198                         ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
2199                     }
2200
2201                     /* re-post those buffers */
2202                     ptr->bufCnt = newBufCnt;
2203                     for (j=0; j<ptr->bufCnt; j++) {
2204                         ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2205                         if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
2206                                                      MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
2207                                                      charmComm, ptr->postedRecvReqs+j))
2208                             CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2209                     }
2210
2211                     /* We already posted bufs for this range of msg size */
2212                     ptr = ptr->next;
2213                     /* Need to set ptr to NULL as the buf list comes to an end and the while loop exits */
2214                     if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
2215                     notFound = 0;
2216                     break;
2217                 } else {
2218                     /* The msgSizeIdx is larger than i */
2219                     break;
2220                 }
2221                 if (ptr == CpvAccess(postRecvListHdr)) {
2222                     ptr = NULL;
2223                     break;
2224                 }
2225             } /* end while(ptr): iterating the posted recv buffer list */
2226
2227             if (notFound) {
2228                 /* the current range of msg size is not found in the list */
2229                 int j;
2230                 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2231                 newEntry = malloc(sizeof(MPIPostRecvList));
2232                 MPIPostRecvList *one = newEntry;
2233                 one->msgSizeIdx = i;
2234                 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
2235                 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2236                 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
2237                 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
2238                 for (j=0; j<one->bufCnt; j++) {
2239                     one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2240                     if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
2241                                                  MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
2242                                                  charmComm, one->postedRecvReqs+j))
2243                         CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2244                 }
2245             } /* end if notFound */
2246
2247             /* Update the new list with the newEntry */
2248             CmiAssert(newEntry != NULL);
2249             if (newHdr == NULL) {
2250                 newHdr = newEntry;
2251                 newListPtr = newEntry;
2252                 newHdr->next = newHdr;
2253             } else {
2254                 newListPtr->next = newEntry;
2255                 newListPtr = newEntry;
2256                 newListPtr->next = newHdr;
2257             }
2258         } /* end if the count of this msg size range exceeds the threshold */
2259     } /* end for loop over the histogram buckets */
2260
2261     /* Free remaining entries in the list */
2262     while (ptr) {
2263         /* free the buffer for this range of msg size */
2264         MPIPostRecvList *nextptr = ptr->next;
2265
2266         free(ptr->postedRecvReqs);
2267         int j;
2268         for (j=0; j<ptr->bufCnt; j++) {
2269             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2270         }
2271         free(ptr->postedRecvBufs);
2272         ptr = nextptr;
2273         if (ptr == CpvAccess(postRecvListHdr)) break;
2274     }
2275
2276     CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
2277     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
2278 } /* end of function buildDynamicRecvBuffers */
2279
2280 static void examineMsgHistogramInfo(int size)
2281 {
2282     int total = CpvAccess(msgRecvCnt)++;
2283     if (total < MPI_POST_RECV_FREQ) {
2284         recordMsgHistogramInfo(size);
2285     } else {
2286         buildDynamicRecvBuffers();
2287     }
2288 }
2289 #else
2290 /* case when CAPTURE_MSG_HISTOGRAM is defined */
2291 static void recordMsgHistogramInfo(int size)
2292 {
2293     int idx = size/MSG_HISTOGRAM_BINSIZE;
2294     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
2295     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
2296 }
2297 #endif /* end of MPI_DYNAMIC_POST_RECV */
2298
2299 void reportMsgHistogramInfo()
2300 {
2301 #if MPI_DYNAMIC_POST_RECV
2302     int i, count;
2303     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
2304     if (count > 0) {
2305         printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
2306     }
2307     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2308         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2309         if (count > 0) {
2310             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2311             int high = low + MSG_HISTOGRAM_BINSIZE;
2312             printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
2313         }
2314     }
2315     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
2316     if (count > 0) {
2317         printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
2318     }
2319 #else
2320     int i;
2321     for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
2322         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2323         if (count > 0) {
2324             int low = i*MSG_HISTOGRAM_BINSIZE;
2325             int high = low + MSG_HISTOGRAM_BINSIZE;
2326             printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
2327         }
2328     }
2329 #endif
2330 }
2331 #endif /* end of CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV */
2332
2333 void CmiSetupMachineRecvBuffersUser(void)
2334 {
2335 #if MPI_DYNAMIC_POST_RECV
2336     buildDynamicRecvBuffers();
2337 #endif
2338 }
2339 /*=======End of Msg Histogram or Dynamic Post-Recv Related Funcs======*/
2340
2341
2342 /*@}*/
2343