remove all machinespecicfor** macro
[charm.git] / src / arch / mpi / machine.c
1
2 /** @file
3  * MPI based machine layer
4  * @ingroup Machine
5  */
6 /*@{*/
7
8 #include <stdio.h>
9 #include <errno.h>
10 #include "converse.h"
11 #include <mpi.h>
12 #if CMK_TIMER_USE_XT3_DCLOCK
13 #include <catamount/dclock.h>
14 #endif
15
16
17 #ifdef AMPI
18 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
19 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
20 #  error "Can't build Charm++ using AMPI version of mpi.h header"
21 #endif
22
23 /*Support for ++debug: */
24 #if defined(_WIN32) && ! defined(__CYGWIN__)
25 #include <windows.h>
26 #include <wincon.h>
27 #include <sys/types.h>
28 #include <sys/timeb.h>
29 static void sleep(int secs) {
30     Sleep(1000*secs);
31 }
32 #else
33 #include <unistd.h> /*For getpid()*/
34 #endif
35 #include <stdlib.h> /*For sleep()*/
36
37 #include "machine.h"
38 #include "pcqueue.h"
39
40 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
41 /* Whether to use multiple send queue in SMP mode */
42 #define MULTI_SENDQUEUE    0
43
44 /* ###Beginning of flow control related macros ### */
45 #define CMI_EXERT_SEND_CAP 0
46 #define CMI_EXERT_RECV_CAP 0
47
48 #define CMI_DYNAMIC_EXERT_CAP 0
49 /* This macro defines the max number of msgs in the sender msg buffer
50  * that is allowed for recving operation to continue
51  */
52 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
53 #define CMI_DYNAMIC_MAXCAPSIZE 1000
54 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
55 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
56 /* initial values, -1 indiates there's no cap */
57 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
58 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
59 MPI_Comm charmComm;
60
61 #if CMI_EXERT_SEND_CAP
62 static int SEND_CAP=3;
63 #endif
64
65 #if CMI_EXERT_RECV_CAP
66 static int RECV_CAP=2;
67 #endif
68 /* ###End of flow control related macros ### */
69
70 /* ###Beginning of machine-layer-tracing related macros ### */
71 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
72 #define CMI_MPI_TRACE_MOREDETAILED 0
73 #undef CMI_MPI_TRACE_USEREVENTS
74 #define CMI_MPI_TRACE_USEREVENTS 1
75 #else
76 #undef CMK_SMP_TRACE_COMMTHREAD
77 #define CMK_SMP_TRACE_COMMTHREAD 0
78 #endif
79
80 #define CMK_TRACE_COMMOVERHEAD 0
81 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_TRACE_COMMOVERHEAD
86 #define CMK_TRACE_COMMOVERHEAD 0
87 #endif
88
89 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
90 CpvStaticDeclare(double, projTraceStart);
91 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
92 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
93 #else
94 #define  START_EVENT()
95 #define  END_EVENT(x)
96 #endif
97
98 #if CMK_SMP_TRACE_COMMTHREAD
99 #define START_TRACE_SENDCOMM(msg)  \
100                         int isTraceEligible = traceBeginCommOp(msg); \
101                         if(isTraceEligible) traceSendMsgComm(msg);
102 #define END_TRACE_SENDCOMM(msg) if(isTraceEligible) traceEndCommOp(msg);
103 #define START_TRACE_RECVCOMM(msg) CpvAccess(projTraceStart) = CmiWallTimer();
104 #define END_TRACE_RECVCOMM(msg) \
105                         if(traceBeginCommOp(msg)){ \
106                             traceChangeLastTimestamp(CpvAccess(projTraceStart)); \
107                             traceSendMsgComm(msg); \
108                             traceEndCommOp(msg); \
109                         }
110 #define CONDITIONAL_TRACE_USER_EVENT(x) \
111                         do{ \
112                             double etime = CmiWallTimer(); \
113                             if(etime - CpvAccess(projTraceStart) > 5*1e-6){ \
114                                 traceUserBracketEvent(x, CpvAccess(projTraceStart), etime); \
115                             }\
116                         }while(0);
117 #else
118 #define START_TRACE_SENDCOMM(msg)
119 #define END_TRACE_SENDCOMM(msg)
120 #define START_TRACE_RECVCOMM(msg)
121 #define END_TRACE_RECVCOMM(msg)
122 #define CONDITIONAL_TRACE_USER_EVENT(x)
123 #endif
124
125 /* ###End of machine-layer-tracing related macros ### */
126
127 /* ###Beginning of POST_RECV related macros ### */
128 /*
129  * If MPI_POST_RECV is defined, we provide default values for
130  * size and number of posted recieves. If MPI_POST_RECV_COUNT
131  * is set then a default value for MPI_POST_RECV_SIZE is used
132  * if not specified by the user.
133  */
134 #define MPI_POST_RECV 0
135
136 /* Making those parameters configurable for testing them easily */
137
138 #if MPI_POST_RECV
139 #define MPI_DYNAMIC_POST_RECV 0
140
141 /* Note the tag offset of a msg is determined by
142  * (its size - MPI_RECV_LOWERSIZE)/MPI_POST_RECV_INC.
143  * based on POST_RECV_TAG.
144  */
145 static int MPI_POST_RECV_COUNT=10;
146
147 /* The range of msgs to be tracked for histogramming */
148 static int MPI_POST_RECV_LOWERSIZE=8000;
149 static int MPI_POST_RECV_UPPERSIZE=64000;
150
151 /* The increment of msg size to be tracked, i.e. the histogram bucket size */
152 static int MPI_POST_RECV_INC = 1000;
153
154 /* The unit increment of msg cnt for increase #buf for a post recved msg */
155 static int MPI_POST_RECV_MSG_INC = 400;
156
157 /* If the #msg exceeds this value, post recv is created for such msg */
158 static int MPI_POST_RECV_MSG_CNT_THRESHOLD = 200;
159
160 /* The frequency of checking the existing posted recv buffers in the unit of #msgs */
161 static int MPI_POST_RECV_FREQ = 1000;
162
163 static int MPI_POST_RECV_SIZE;
164
165 typedef struct mpiPostRecvList {
166     /* POST_RECV_TAG + msgSizeIdx is the recv tag;
167      * Based on this value, this buf corresponds to msg size ranging
168      * [msgSizeIdx*MPI_POST_RECV_INC, (msgSizeIdx+1)*MPI_POST_RECV_INC)
169      */
170     int msgSizeIdx;
171     int bufCnt;
172     MPI_Request *postedRecvReqs;
173     char **postedRecvBufs;
174     struct mpiPostRecvList *next;
175 } MPIPostRecvList;
176 CpvDeclare(MPIPostRecvList *, postRecvListHdr);
177 CpvDeclare(MPIPostRecvList *, curPostRecvPtr);
178 CpvDeclare(int, msgRecvCnt);
179
180 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
181 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
182 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
183 CpvDeclare(char**,CmiPostedRecvBuffers);
184
185 /* Note: currently MPI doesn't provide a function whether a request is in progress.
186  * For example, a irecv has been filled partially. Then a call to MPI_Test still returns
187  * indicating it has not been finished. If only relying on this result, then calling
188  * MPI_Cancel will result in a loss of this msg. The dynamic post recv mechanism
189  * can only be safely used in a synchronized point such as load balancing.
190  */
191 #if MPI_DYNAMIC_POST_RECV
192 static int MSG_HISTOGRAM_BINSIZE;
193 static int MAX_HISTOGRAM_BUCKETS; /* only cares msg size less 2 MB */
194 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
195 static void recordMsgHistogramInfo(int size);
196 static void reportMsgHistogramInfo();
197 #endif /* end of MPI_DYNAMIC_POST_RECV defined */
198
199 #endif /* end of MPI_POST_RECV defined */
200
201 /* Defining this macro will use MPI_Irecv instead of MPI_Recv for
202  * large messages. This could save synchronization overhead caused by
203  * the rzv protocol used by MPI
204  */
205 #define USE_ASYNC_RECV_FUNC 0
206
207 #ifdef USE_ASYNC_RECV_FUNC
208 static int IRECV_MSG_THRESHOLD = 8000;
209 typedef struct IRecvListEntry{
210     MPI_Request req;
211     char *msg;
212     int size;
213     struct IRecvListEntry *next;
214 }*IRecvList;
215
216 static IRecvList freedIrecvList = NULL; /* used to recycle the entries */
217 static IRecvList waitIrecvListHead = NULL; /* points to the guardian entry, i.e., the next of it points to the first entry */
218 static IRecvList waitIrecvListTail = NULL; /* points to the last entry */
219
220 static IRecvList irecvListEntryAllocate(){
221     IRecvList ret;
222     if(freedIrecvList == NULL) {
223         ret = (IRecvList)malloc(sizeof(struct IRecvListEntry));        
224         return ret;
225     } else {
226         ret = freedIrecvList;
227         freedIrecvList = freedIrecvList->next;
228         return ret;
229     }
230 }
231 static void irecvListEntryFree(IRecvList used){
232     used->next = freedIrecvList;
233     freedIrecvList = used;
234 }
235
236 #endif /* end of USE_ASYNC_RECV_FUNC */
237
238 /* Providing functions for external usage to set up the dynamic recv buffer
239  * when the user is aware that it's safe to call such function
240  */
241 void CmiSetupMachineRecvBuffers();
242
243 #define CAPTURE_MSG_HISTOGRAM 0
244 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
245 static int MSG_HISTOGRAM_BINSIZE=1000;
246 static int MAX_HISTOGRAM_BUCKETS=2000; /* only cares msg size less 2 MB */
247 CpvDeclare(int *, MSG_HISTOGRAM_ARRAY);
248 static void recordMsgHistogramInfo(int size);
249 static void reportMsgHistogramInfo();
250 #endif
251
252 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
253 #define TAG     1375
254 #if MPI_POST_RECV
255 #define POST_RECV_TAG       (TAG+1)
256 #define BARRIER_ZERO_TAG  TAG
257 #else
258 #define BARRIER_ZERO_TAG   (TAG-1)
259 #endif
260 /* ###End of POST_RECV related related macros ### */
261
262 #if CMK_BLUEGENEL
263 #define MAX_QLEN 8
264 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
265 #else
266 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
267 #define MAX_QLEN 200
268 #endif
269 /* =======End of Definitions of Performance-Specific Macros =======*/
270
271
272 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
273 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
274 #define CHARM_MAGIC_NUMBER               126
275
276 #if CMK_ERROR_CHECKING
277 extern unsigned char computeCheckSum(unsigned char *data, int len);
278 static int checksum_flag = 0;
279 #define CMI_SET_CHECKSUM(msg, len)      \
280         if (checksum_flag)  {   \
281           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
282           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
283         }
284 #define CMI_CHECK_CHECKSUM(msg, len)    \
285         if (checksum_flag)      \
286           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
287             CmiAbort("Fatal error: checksum doesn't agree!\n");
288 #else
289 #define CMI_SET_CHECKSUM(msg, len)
290 #define CMI_CHECK_CHECKSUM(msg, len)
291 #endif
292 /* =====End of Definitions of Message-Corruption Related Macros=====*/
293
294 /* =====Beginning of Declarations of Machine Specific Variables===== */
295 #include <signal.h>
296 void (*signal_int)(int);
297
298 static int _thread_provided = -1; /* Indicating MPI thread level */
299 static int idleblock = 0;
300
301 /* A simple list for msgs that have been sent by MPI_Isend */
302 typedef struct msg_list {
303     char *msg;
304     struct msg_list *next;
305     int size, destpe, mode;
306     MPI_Request req;
307 } SMSG_LIST;
308
309 CpvStaticDeclare(SMSG_LIST *, sent_msgs);
310 CpvStaticDeclare(SMSG_LIST *, end_sent);
311
312 CpvStaticDeclare(int, MsgQueueLen);
313 static int request_max;
314 /*FLAG: consume outstanding Isends in scheduler loop*/
315 static int no_outstanding_sends=0;
316
317 #if NODE_0_IS_CONVHOST
318 int inside_comm = 0;
319 #endif
320
321 typedef struct ProcState {
322 #if MULTI_SENDQUEUE
323     PCQueue      sendMsgBuf;       /* per processor message sending queue */
324 #endif
325     CmiNodeLock  recvLock;                  /* for cs->recv */
326 } ProcState;
327 static ProcState  *procState;
328
329 #if CMK_SMP && !MULTI_SENDQUEUE
330 static PCQueue sendMsgBuf;
331 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
332 #endif
333 /* =====End of Declarations of Machine Specific Variables===== */
334
335 #if CMK_MEM_CHECKPOINT
336 #define FAIL_TAG   1200
337 int num_workpes, total_pes;
338 int *petorank = NULL;
339 int  nextrank;
340 void mpi_end_spare();
341 #endif
342
343 /* =====Beginning of Declarations of Machine Specific Functions===== */
344 /* Utility functions */
345 #if CMK_BLUEGENEL
346 extern void MPID_Progress_test();
347 #endif
348 static size_t CmiAllAsyncMsgsSent(void);
349 static void CmiReleaseSentMessages(void);
350 static int PumpMsgs(void);
351 static void PumpMsgsBlocking(void);
352
353 #if CMK_SMP
354 static int MsgQueueEmpty();
355 static int RecvQueueEmpty();
356 static int SendMsgBuf();
357 static  void EnqueueMsg(void *m, int size, int node, int mode);
358 #endif
359
360 /* ### End of Machine-running Related Functions ### */
361
362 /* ### Beginning of Idle-state Related Functions ### */
363 void CmiNotifyIdleForMPI(void);
364 /* ### End of Idle-state Related Functions ### */
365
366 /* =====End of Declarations of Machine Specific Functions===== */
367
368 /**
369  *  Macros that overwrites the common codes, such as
370  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
371  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
372  *  CMK_OFFLOAD_BCAST_PROCESS etc.
373  */
374 #define CMK_HAS_SIZE_IN_MSGHDR 0
375 #include "machine-lrts.h"
376 #include "machine-common-core.c"
377
378 /* The machine specific msg-sending function */
379
380 #if CMK_SMP
381 static void EnqueueMsg(void *m, int size, int node, int mode) {
382     /*SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
383     SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
384     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
385     msg_tmp->msg = m;
386     msg_tmp->size = size;
387     msg_tmp->destpe = node;
388     msg_tmp->next = 0;
389     msg_tmp->mode = mode;
390
391 #if MULTI_SENDQUEUE
392     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
393 #else
394     /*CmiLock(sendMsgBufLock);*/
395     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
396     /*CmiUnlock(sendMsgBufLock);*/
397 #endif
398
399     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
400 }
401 #endif
402
403 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
404 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
405     int node = smsg->destpe;
406     int size = smsg->size;
407     char *msg = smsg->msg;
408     int mode = smsg->mode;
409     int dstrank;
410
411     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
412 #if CMK_ERROR_CHECKING
413     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
414     CMI_SET_CHECKSUM(msg, size);
415 #endif
416
417 #if MPI_POST_RECV
418     if (size>=MPI_POST_RECV_LOWERSIZE && size < MPI_POST_RECV_UPPERSIZE) {
419 #if MPI_DYNAMIC_POST_RECV
420         int sendTagOffset = (size-MPI_POST_RECV_LOWERSIZE)/MPI_POST_RECV_INC+1;
421         START_TRACE_SENDCOMM(msg);
422         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG+sendTagOffset,charmComm,&(smsg->req)))
423             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
424         END_TRACE_SENDCOMM(msg);
425 #else
426         START_TRACE_SENDCOMM(msg);
427         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,charmComm,&(smsg->req)))
428             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
429         END_TRACE_SENDCOMM(msg);
430 #endif
431     } else {
432         START_TRACE_SENDCOMM(msg);
433             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,charmComm,&(smsg->req)))
434             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
435         END_TRACE_SENDCOMM(msg);
436     }
437 #else
438 /* branch not using MPI_POST_RECV */
439
440 #if CMK_MEM_CHECKPOINT
441         dstrank = petorank[node];
442 #else
443         dstrank=node;
444 #endif
445     START_TRACE_SENDCOMM(msg)
446     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,charmComm,&(smsg->req)))
447         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
448     END_TRACE_SENDCOMM(msg)
449 #endif /* end of #if MPI_POST_RECV */
450
451     MACHSTATE(3,"}MPI_Isend end");
452     CpvAccess(MsgQueueLen)++;
453     if (CpvAccess(sent_msgs)==0)
454         CpvAccess(sent_msgs) = smsg;
455     else
456         CpvAccess(end_sent)->next = smsg;
457     CpvAccess(end_sent) = smsg;
458
459 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
460     if (mode == P2P_SYNC || mode == P2P_ASYNC)
461     {
462     while (CpvAccess(MsgQueueLen) > request_max) {
463         CmiReleaseSentMessages();
464         PumpMsgs();
465     }
466     }
467 #endif
468
469     return (CmiCommHandle) &(smsg->req);
470 }
471
472 CmiCommHandle LrtsSendFunc(int destPE, int size, char *msg, int mode) {
473     /* Ignoring the mode for MPI layer */
474
475     int destNode = CmiNodeOf(destPE);
476     CmiState cs = CmiGetState();
477     SMSG_LIST *msg_tmp;
478     int  rank;
479
480     CmiAssert(destNode != CmiMyNode());
481 #if CMK_SMP
482     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV) {
483       EnqueueMsg(msg, size, destNode, mode);
484       return 0;
485     }
486 #endif
487     /* non smp */
488     /*msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));*/
489     msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
490     msg_tmp->msg = msg;
491     msg_tmp->destpe = destNode;
492     msg_tmp->size = size;
493     msg_tmp->next = 0;
494     msg_tmp->mode = mode;
495     return MPISendOneMsg(msg_tmp);
496 }
497
498 static size_t CmiAllAsyncMsgsSent(void) {
499     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
500     MPI_Status sts;
501     int done;
502
503     while (msg_tmp!=0) {
504         done = 0;
505         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
506             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
507         if (!done)
508             return 0;
509         msg_tmp = msg_tmp->next;
510         /*    MsgQueueLen--; ????? */
511     }
512     return 1;
513 }
514
515 int CmiAsyncMsgSent(CmiCommHandle c) {
516
517     SMSG_LIST *msg_tmp = CpvAccess(sent_msgs);
518     int done;
519     MPI_Status sts;
520
521     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
522         msg_tmp = msg_tmp->next;
523     if (msg_tmp) {
524         done = 0;
525         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
526             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
527         return ((done)?1:0);
528     } else {
529         return 1;
530     }
531 }
532
533 void CmiReleaseCommHandle(CmiCommHandle c) {
534     return;
535 }
536
537 /* ######Beginning of functions related with communication progress ###### */
538 static void CmiReleaseSentMessages(void) {
539     SMSG_LIST *msg_tmp=CpvAccess(sent_msgs);
540     SMSG_LIST *prev=0;
541     SMSG_LIST *temp;
542     int done;
543     MPI_Status sts;
544
545 #if CMK_BLUEGENEL
546     MPID_Progress_test();
547 #endif
548
549     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
550     while (msg_tmp!=0) {
551         done =0;
552 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
553         double startT = CmiWallTimer();
554 #endif
555         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
556             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
557         if (done) {
558             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
559             CpvAccess(MsgQueueLen)--;
560             /* Release the message */
561             temp = msg_tmp->next;
562             if (prev==0) /* first message */
563                 CpvAccess(sent_msgs) = temp;
564             else
565                 prev->next = temp;
566             CmiFree(msg_tmp->msg);
567             /* CmiFree(msg_tmp); */
568             free(msg_tmp);
569             msg_tmp = temp;
570         } else {
571             prev = msg_tmp;
572             msg_tmp = msg_tmp->next;
573         }
574 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
575         {
576             double endT = CmiWallTimer();
577             /* only record the event if it takes more than 1ms */
578             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
579         }
580 #endif
581     }
582     CpvAccess(end_sent) = prev;
583     MACHSTATE(2,"} CmiReleaseSentMessages end");
584 }
585
586 static int PumpMsgs(void) {
587     int nbytes, flg, res;
588     char *msg;
589     MPI_Status sts;
590     int recd=0;
591
592 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
593     int recvCnt=0;
594 #endif
595
596 #if CMK_BLUEGENEL
597     MPID_Progress_test();
598 #endif
599
600     MACHSTATE(2,"PumpMsgs begin {");
601
602 #if CMI_DYNAMIC_EXERT_CAP
603     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
604 #endif
605
606     while (1) {
607         int doSyncRecv = 1;
608 #if CMI_EXERT_RECV_CAP
609         if (recvCnt==RECV_CAP) break;
610 #elif CMI_DYNAMIC_EXERT_CAP
611         if (recvCnt >= dynamicRecvCap) break;
612 #endif
613
614         START_TRACE_RECVCOMM(NULL);
615
616         /* First check posted recvs then do  probe unmatched outstanding messages */
617 #if MPI_POST_RECV
618         MPIPostRecvList *postedOne = NULL;
619         int completed_index = -1;
620         flg = 0;
621 #if MPI_DYNAMIC_POST_RECV
622         MPIPostRecvList *oldPostRecvPtr = CpvAccess(curPostRecvPtr);
623         if (oldPostRecvPtr) {
624             /* post recv buf inited */
625             do {
626                 /* round-robin iteration over the list */
627                 MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
628                 if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
629                     CmiAbort("PumpMsgs: MPI_Testany failed!\n");
630
631                 if (flg) {
632                     postedOne = cur;
633                     break;
634                 }
635                 CpvAccess(curPostRecvPtr) = CpvAccess(curPostRecvPtr)->next;
636             } while (CpvAccess(curPostRecvPtr) != oldPostRecvPtr);
637         }
638 #else
639         MPIPostRecvList *cur = CpvAccess(curPostRecvPtr);
640         if (MPI_SUCCESS != MPI_Testany(cur->bufCnt, cur->postedRecvReqs, &completed_index, &flg, &sts))
641             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
642 #endif
643         CONDITIONAL_TRACE_USER_EVENT(60); /* MPI_Test related user event */
644         if (flg) {
645             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
646                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
647
648             recd = 1;
649 #if !MPI_DYNAMIC_POST_RECV
650             postedOne = CpvAccess(curPostRecvPtr);
651 #endif
652             msg = (postedOne->postedRecvBufs)[completed_index];
653             (postedOne->postedRecvBufs)[completed_index] = NULL;
654
655             CpvAccess(Cmi_posted_recv_total)++;
656         } else {
657             START_EVENT();
658             res = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, charmComm, &flg, &sts);
659             if (res != MPI_SUCCESS)
660                 CmiAbort("MPI_Iprobe failed\n");
661             if (!flg) break;
662             
663             CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
664             recd = 1;
665             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
666             msg = (char *) CmiAlloc(nbytes);
667
668 #if USE_ASYNC_RECV_FUNC
669             if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
670 #endif            
671             if(doSyncRecv){
672                 START_EVENT();
673                 if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
674                     CmiAbort("PumpMsgs: MPI_Recv failed!\n");                
675             }
676 #if USE_ASYNC_RECV_FUNC        
677             else {
678                 START_EVENT();
679                 IRecvList one = irecvListEntryAllocate();
680                 if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
681                     CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
682                 /*printf("[%d]: irecv msg=%p, nbytes=%d, src=%d, tag=%d\n", CmiMyPe(), msg, nbytes, sts.MPI_SOURCE, sts.MPI_TAG);*/
683                 one->msg = msg;
684                 one->size = nbytes;
685                 one->next = NULL;
686                 waitIrecvListTail->next = one;
687                 waitIrecvListTail = one;
688                 CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
689             }
690 #endif
691             CpvAccess(Cmi_unposted_recv_total)++;
692         }
693 #else
694         /* Original version */
695         START_EVENT();
696         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, charmComm, &flg, &sts);
697         if (res != MPI_SUCCESS)
698             CmiAbort("MPI_Iprobe failed\n");
699
700         if (!flg) break;
701         CONDITIONAL_TRACE_USER_EVENT(70); /* MPI_Iprobe related user event */
702         
703         recd = 1;
704         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
705         msg = (char *) CmiAlloc(nbytes);
706
707 #if USE_ASYNC_RECV_FUNC
708         if(nbytes >= IRECV_MSG_THRESHOLD) doSyncRecv = 0;
709 #endif        
710         if(doSyncRecv){
711             START_EVENT();
712             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, charmComm,&sts))
713                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");            
714         }
715 #if USE_ASYNC_RECV_FUNC        
716         else {
717             START_EVENT();
718             IRecvList one = irecvListEntryAllocate();
719             if(MPI_SUCCESS != MPI_Irecv(msg, nbytes, MPI_BYTE, sts.MPI_SOURCE, sts.MPI_TAG, charmComm, &(one->req)))
720                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
721             one->msg = msg;
722             one->size = nbytes;
723             one->next = NULL;
724             waitIrecvListTail->next = one;
725             waitIrecvListTail = one;
726             /*printf("PE[%d]: MPI_Irecv msg=%p, size=%d, entry=%p\n", CmiMyPe(), msg, nbytes, one);*/
727             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
728         }
729 #endif
730
731 #endif /*end of not MPI_POST_RECV */
732
733         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
734         CMI_CHECK_CHECKSUM(msg, nbytes);
735 #if CMK_ERROR_CHECKING
736         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
737             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
738             CmiFree(msg);
739             CmiAbort("Abort!\n");
740             continue;
741         }
742 #endif
743
744         if(doSyncRecv){
745             END_TRACE_RECVCOMM(msg);
746             handleOneRecvedMsg(nbytes, msg);
747         }
748         
749 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
750         recordMsgHistogramInfo(nbytes);
751 #endif
752
753 #if  MPI_POST_RECV
754 #if MPI_DYNAMIC_POST_RECV
755         if (postedOne) {
756             //printf("[%d]: get one posted recv\n", CmiMyPe());
757             /* Get the upper size of this buffer */
758             int postRecvBufSize = postedOne->msgSizeIdx*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
759             int postRecvTag = POST_RECV_TAG + postedOne->msgSizeIdx;
760             /* Has to re-allocate the buffer for the message */
761             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(postRecvBufSize);
762
763             /* and repost the recv */
764             START_EVENT();
765
766             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
767                                          postRecvBufSize,
768                                          MPI_BYTE,
769                                          MPI_ANY_SOURCE,
770                                          postRecvTag,
771                                          charmComm,
772                                          &((postedOne->postedRecvReqs)[completed_index])  ))
773                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
774             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
775         }
776 #else
777         if (postedOne) {
778             /* Has to re-allocate the buffer for the message */
779             (postedOne->postedRecvBufs)[completed_index] = (char *)CmiAlloc(MPI_POST_RECV_SIZE);
780
781             /* and repost the recv */
782             START_EVENT();
783             if (MPI_SUCCESS != MPI_Irecv((postedOne->postedRecvBufs)[completed_index] ,
784                                          MPI_POST_RECV_SIZE,
785                                          MPI_BYTE,
786                                          MPI_ANY_SOURCE,
787                                          POST_RECV_TAG,
788                                          charmComm,
789                                          &((postedOne->postedRecvReqs)[completed_index])  ))
790                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
791             CONDITIONAL_TRACE_USER_EVENT(50); /* MPI_Irecv related user events */
792         }
793 #endif /* not MPI_DYNAMIC_POST_RECV */
794 #endif
795
796 #if CMI_EXERT_RECV_CAP
797         recvCnt++;
798 #elif CMI_DYNAMIC_EXERT_CAP
799         recvCnt++;
800 #if CMK_SMP
801         /* check sendMsgBuf to get the  number of messages that have not been sent
802              * which is only available in SMP mode
803          * MsgQueueLen indicates the number of messages that have not been released
804              * by MPI
805              */
806         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
807                 || CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
808             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
809         }
810 #else
811         /* MsgQueueLen indicates the number of messages that have not been released
812              * by MPI
813              */
814         if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
815             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
816         }
817 #endif
818
819 #endif
820
821     }
822
823 #if USE_ASYNC_RECV_FUNC
824 /* Another loop to check the irecved msgs list */
825 {
826     IRecvList irecvEnt;
827     int irecvDone = 0;
828     MPI_Status sts;
829     while(waitIrecvListHead->next) {
830         IRecvList irecvEnt = waitIrecvListHead->next;
831
832         START_EVENT();
833                 
834         /*printf("PE[%d]: check irecv entry=%p\n", CmiMyPe(), irecvEnt);*/
835         if(MPI_SUCCESS != MPI_Test(&(irecvEnt->req), &irecvDone, &sts))
836             CmiAbort("PumpMsgs: MPI_Test failed!\n");
837         if(!irecvDone) break; /* in-order recv */
838
839         END_TRACE_RECVCOMM((irecvEnt->msg));
840         /*printf("PE[%d]: irecv entry=%p finished with size=%d, msg=%p\n", CmiMyPe(), irecvEnt, irecvEnt->size, irecvEnt->msg);*/
841         
842         handleOneRecvedMsg(irecvEnt->size, irecvEnt->msg);
843         waitIrecvListHead->next = irecvEnt->next;
844         irecvListEntryFree(irecvEnt);
845         recd = 1;        
846     }
847     if(waitIrecvListHead->next == NULL)
848         waitIrecvListTail = waitIrecvListHead;
849 }
850 #endif
851
852
853     MACHSTATE(2,"} PumpMsgs end ");
854     return recd;
855 }
856
857 /* blocking version */
858 static void PumpMsgsBlocking(void) {
859     static int maxbytes = 20000000;
860     static char *buf = NULL;
861     int nbytes, flg;
862     MPI_Status sts;
863     char *msg;
864     int recd=0;
865
866     if (!PCQueueEmpty(CmiGetState()->recv)) return;
867     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
868     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
869     if (CpvAccess(sent_msgs))  return;
870
871 #if 0
872     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
873 #endif
874
875     if (buf == NULL) {
876         buf = (char *) CmiAlloc(maxbytes);
877         _MEMCHECK(buf);
878     }
879
880
881 #if MPI_POST_RECV
882 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
883     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
884 #endif
885
886     START_TRACE_RECVCOMM(NULL);
887     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, charmComm,&sts))
888         CmiAbort("PumpMsgs: PMP_Recv failed!\n");    
889
890     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
891     msg = (char *) CmiAlloc(nbytes);
892     memcpy(msg, buf, nbytes);
893     END_TRACE_RECVCOMM(msg);
894
895 #if CMK_SMP_TRACE_COMMTHREAD && CMI_MPI_TRACE_MOREDETAILED
896     char tmp[32];
897     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
898     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
899 #endif
900
901     handleOneRecvedMsg(nbytes, msg);
902 }
903
904
905 #if CMK_SMP
906
907 /* called by communication thread in SMP */
908 static int SendMsgBuf() {
909     SMSG_LIST *msg_tmp;
910     char *msg;
911     int node, rank, size;
912     int i;
913     int sent = 0;
914
915 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
916     int sentCnt = 0;
917 #endif
918
919 #if CMI_DYNAMIC_EXERT_CAP
920     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
921 #endif
922
923     MACHSTATE(2,"SendMsgBuf begin {");
924 #if MULTI_SENDQUEUE
925     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
926         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
927             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
928 #else
929     /* single message sending queue */
930     /* CmiLock(sendMsgBufLock); */
931     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
932     /* CmiUnlock(sendMsgBufLock); */
933     while (NULL != msg_tmp) {
934 #endif
935             MPISendOneMsg(msg_tmp);
936             sent=1;
937
938 #if CMI_EXERT_SEND_CAP
939             if (++sentCnt == SEND_CAP) break;
940 #elif CMI_DYNAMIC_EXERT_CAP
941             if (++sentCnt >= dynamicSendCap) break;
942             if (CpvAccess(MsgQueueLen) > CMI_DYNAMIC_OUTGOING_THRESHOLD)
943                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
944 #endif
945
946 #if ! MULTI_SENDQUEUE
947             /* CmiLock(sendMsgBufLock); */
948             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
949             /* CmiUnlock(sendMsgBufLock); */
950 #endif
951         }
952 #if MULTI_SENDQUEUE
953     }
954 #endif
955     MACHSTATE(2,"}SendMsgBuf end ");
956     return sent;
957 }
958
959 static int MsgQueueEmpty() {
960     int i;
961 #if MULTI_SENDQUEUE
962     for (i=0; i<_Cmi_mynodesize; i++)
963         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
964 #else
965     return PCQueueEmpty(sendMsgBuf);
966 #endif
967     return 1;
968 }
969
970 /* test if all processors recv queues are empty */
971 static int RecvQueueEmpty() {
972     int i;
973     for (i=0; i<_Cmi_mynodesize; i++) {
974         CmiState cs=CmiGetStateN(i);
975         if (!PCQueueEmpty(cs->recv)) return 0;
976     }
977     return 1;
978 }
979
980
981 #define REPORT_COMM_METRICS 0
982 #if REPORT_COMM_METRICS
983 static double pumptime = 0.0;
984 static double releasetime = 0.0;
985 static double sendtime = 0.0;
986 #endif
987
988 #endif //end of CMK_SMP
989
990 void LrtsAdvanceCommunication(int whenidle) {
991 #if REPORT_COMM_METRICS
992     double t1, t2, t3, t4;
993     t1 = CmiWallTimer();
994 #endif
995
996 #if CMK_SMP
997     PumpMsgs();
998
999 #if REPORT_COMM_METRICS
1000     t2 = CmiWallTimer();
1001 #endif
1002
1003     CmiReleaseSentMessages();
1004 #if REPORT_COMM_METRICS
1005     t3 = CmiWallTimer();
1006 #endif
1007
1008     SendMsgBuf();
1009
1010 #if REPORT_COMM_METRICS
1011     t4 = CmiWallTimer();
1012     pumptime += (t2-t1);
1013     releasetime += (t3-t2);
1014     sendtime += (t4-t3);
1015 #endif
1016
1017 #else /* non-SMP case */
1018     CmiReleaseSentMessages();
1019
1020 #if REPORT_COMM_METRICS
1021     t2 = CmiWallTimer();
1022 #endif
1023     PumpMsgs();
1024
1025 #if REPORT_COMM_METRICS
1026     t3 = CmiWallTimer();
1027     pumptime += (t3-t2);
1028     releasetime += (t2-t1);
1029 #endif
1030
1031 #endif /* end of #if CMK_SMP */
1032 }
1033 /* ######End of functions related with communication progress ###### */
1034
1035 void LrtsPostNonLocal() {
1036 #if !CMK_SMP
1037     if (no_outstanding_sends) {
1038         while (CpvAccess(MsgQueueLen)>0) {
1039             LrtsAdvanceCommunication(0);
1040         }
1041     }
1042
1043     /* FIXME: I don't think the following codes are needed because
1044      * it repeats the same job of the next call of CmiGetNonLocal
1045      */
1046 #if 0
1047     if (!msg) {
1048         CmiReleaseSentMessages();
1049         if (PumpMsgs())
1050             return  PCQueuePop(cs->recv);
1051         else
1052             return 0;
1053     }
1054 #endif
1055 #else
1056   if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1057         CmiReleaseSentMessages();       
1058         /* ??? SendMsgBuf is a not a thread-safe function. If it is put
1059          * here and this function will be called in CmiNotifyStillIdle,
1060          * then a data-race problem occurs */
1061         /*SendMsgBuf();*/
1062   }
1063 #endif
1064 }
1065
1066 /* Idle-state related functions: called in non-smp mode */
1067 void CmiNotifyIdleForMPI(void) {
1068     CmiReleaseSentMessages();
1069     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1070 }
1071
1072 /* Network progress function is used to poll the network when for
1073    messages. This flushes receive buffers on some  implementations*/
1074 #if CMK_MACHINE_PROGRESS_DEFINED
1075 void CmiMachineProgressImpl() {
1076 #if !CMK_SMP
1077     PumpMsgs();
1078 #if CMK_IMMEDIATE_MSG
1079     CmiHandleImmediate();
1080 #endif
1081 #else
1082     /*Not implemented yet. Communication server does not seem to be
1083       thread safe, so only communication thread call it */
1084     if (CmiMyRank() == CmiMyNodeSize())
1085         CommunicationServerThread(0);
1086 #endif
1087 }
1088 #endif
1089
1090 /* ######Beginning of functions related with exiting programs###### */
1091 void LrtsDrainResources() {
1092 #if !CMK_SMP
1093     while (!CmiAllAsyncMsgsSent()) {
1094         PumpMsgs();
1095         CmiReleaseSentMessages();
1096     }
1097 #else
1098     if(Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV){
1099         while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1100             CmiReleaseSentMessages();
1101             SendMsgBuf();
1102             PumpMsgs();
1103         }
1104     }else if(Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV) {
1105         while(!CmiAllAsyncMsgsSent()) {
1106             CmiReleaseSentMessages();
1107         }
1108     }
1109 #endif
1110 #if CMK_MEM_CHECKPOINT
1111     if (CmiMyPe() == 0) mpi_end_spare();
1112 #endif
1113     MACHSTATE(2, "Machine exit barrier begin {");
1114     START_EVENT();
1115     if (MPI_SUCCESS != MPI_Barrier(charmComm))
1116         CmiAbort("LrtsDrainResources: MPI_Barrier failed!\n");
1117     END_EVENT(10);
1118     MACHSTATE(2, "} Machine exit barrier end");
1119 }
1120
1121 void LrtsExit() {
1122     int i;
1123 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1124     int doPrint = 0;
1125     if (CmiMyNode()==0) doPrint = 1;
1126
1127     if (doPrint /*|| CmiMyNode()%11==0 */) {
1128 #if MPI_POST_RECV
1129         CmiPrintf("node[%d]: %llu posted receives,  %llu unposted receives\n", CmiMyNode(), CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1130 #endif
1131     }
1132 #endif
1133
1134 #if MPI_POST_RECV
1135     {
1136         MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
1137         if (ptr) {
1138             do {
1139                 for (i=0; i<ptr->bufCnt; i++) MPI_Cancel(ptr->postedRecvReqs+i);
1140                 ptr = ptr->next;
1141             } while (ptr!=CpvAccess(postRecvListHdr));
1142         }
1143     }
1144 #endif
1145
1146 #if REPORT_COMM_METRICS
1147 #if CMK_SMP
1148     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
1149               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
1150               pumptime, releasetime, sendtime);
1151 #else
1152     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
1153               CmiMyPe(), pumptime, releasetime, sendtime);
1154 #endif
1155 #endif
1156
1157    if(!CharmLibInterOperate) {
1158 #if ! CMK_AUTOBUILD
1159       signal(SIGINT, signal_int);
1160       MPI_Finalize();
1161 #endif
1162       exit(0);
1163     }
1164 }
1165
1166 static int machine_exit_idx;
1167 static void machine_exit(char *m) {
1168     EmergencyExit();
1169     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1170     fflush(stdout);
1171     CmiNodeBarrier();
1172     if (CmiMyRank() == 0) {
1173         MPI_Barrier(charmComm);
1174         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1175         MPI_Abort(charmComm, 1);
1176     } else {
1177         while (1) CmiYield();
1178     }
1179 }
1180
1181 static void KillOnAllSigs(int sigNo) {
1182     static int already_in_signal_handler = 0;
1183     char *m;
1184     if (already_in_signal_handler) return;   /* MPI_Abort(charmComm,1); */
1185     already_in_signal_handler = 1;
1186 #if CMK_CCS_AVAILABLE
1187     if (CpvAccess(cmiArgDebugFlag)) {
1188         CpdNotify(CPD_SIGNAL, sigNo);
1189         CpdFreeze();
1190     }
1191 #endif
1192     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1193              "Signal: %d\n",CmiMyPe(),sigNo);
1194     CmiPrintStackTrace(1);
1195
1196     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1197     CmiSetHandler(m, machine_exit_idx);
1198     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1199     machine_exit(m);
1200 }
1201 /* ######End of functions related with exiting programs###### */
1202
1203
1204 /* ######Beginning of functions related with starting programs###### */
1205 static void registerMPITraceEvents() {
1206 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1207     traceRegisterUserEvent("MPI_Barrier", 10);
1208     traceRegisterUserEvent("MPI_Send", 20);
1209     traceRegisterUserEvent("MPI_Recv", 30);
1210     traceRegisterUserEvent("MPI_Isend", 40);
1211     traceRegisterUserEvent("MPI_Irecv", 50);
1212     traceRegisterUserEvent("MPI_Test[any]", 60);
1213     traceRegisterUserEvent("MPI_Iprobe", 70);
1214 #endif
1215 }
1216
1217 #if MACHINE_DEBUG_LOG
1218 FILE *debugLog = NULL;
1219 #endif
1220
1221 static char *thread_level_tostring(int thread_level) {
1222 #if CMK_MPI_INIT_THREAD
1223     switch (thread_level) {
1224     case MPI_THREAD_SINGLE:
1225         return "MPI_THREAD_SINGLE";
1226     case MPI_THREAD_FUNNELED:
1227         return "MPI_THREAD_FUNNELED";
1228     case MPI_THREAD_SERIALIZED:
1229         return "MPI_THREAD_SERIALIZED";
1230     case MPI_THREAD_MULTIPLE :
1231         return "MPI_THREAD_MULTIPLE";
1232     default: {
1233         char *str = (char*)malloc(5);
1234         sprintf(str,"%d", thread_level);
1235         return str;
1236     }
1237     }
1238     return  "unknown";
1239 #else
1240     char *str = (char*)malloc(5);
1241     sprintf(str,"%d", thread_level);
1242     return str;
1243 #endif
1244 }
1245
1246 /**
1247  *  Obtain the number of nodes, my node id, and consuming machine layer
1248  *  specific arguments
1249  */
1250 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
1251     int n,i;
1252     int ver, subver;
1253     int provided;
1254     int thread_level;
1255     int myNID;
1256     int largc=*argc;
1257     char** largv=*argv;
1258
1259 #if MACHINE_DEBUG
1260     debugLog=NULL;
1261 #endif
1262 #if CMK_USE_HP_MAIN_FIX
1263 #if FOR_CPLUS
1264     _main(largc,largv);
1265 #endif
1266 #endif
1267
1268     if (CmiGetArgFlag(largv, "+comm_thread_only_recv")) {
1269 #if CMK_SMP
1270       Cmi_smp_mode_setting = COMM_THREAD_ONLY_RECV;
1271 #else
1272       CmiAbort("+comm_thread_only_recv option can only be used with SMP version of Charm++");
1273 #endif
1274     }
1275
1276     *argc = CmiGetArgc(largv);     /* update it in case it is out of sync */
1277
1278     if(!CharmLibInterOperate) {
1279 #if CMK_MPI_INIT_THREAD
1280 #if CMK_SMP
1281     if (Cmi_smp_mode_setting == COMM_THREAD_SEND_RECV)
1282         thread_level = MPI_THREAD_FUNNELED;
1283       else
1284         thread_level = MPI_THREAD_MULTIPLE;
1285 #else
1286       thread_level = MPI_THREAD_SINGLE;
1287 #endif
1288       MPI_Init_thread(argc, argv, thread_level, &provided);
1289       _thread_provided = provided;
1290 #else
1291       MPI_Init(argc, argv);
1292       thread_level = 0;
1293       _thread_provided = -1;
1294 #endif
1295     }
1296
1297     largc = *argc;
1298     largv = *argv;
1299     if(!CharmLibInterOperate) {
1300                         MPI_Comm_dup(MPI_COMM_WORLD,&charmComm);
1301       MPI_Comm_size(charmComm, numNodes);
1302                         MPI_Comm_rank(charmComm, myNodeID);
1303     }
1304
1305     MPI_Bcast(&_Cmi_mynodesize, 1, MPI_INT, 0, MPI_COMM_WORLD);
1306
1307     myNID = *myNodeID;
1308
1309     MPI_Get_version(&ver, &subver);
1310     if(!CharmLibInterOperate) {
1311       if (myNID == 0) {
1312         printf("Charm++> Running on MPI version: %d.%d\n", ver, subver);
1313         printf("Charm++> level of thread support used: %s (desired: %s)\n", thread_level_tostring(_thread_provided), thread_level_tostring(thread_level));
1314       }
1315     }
1316
1317 #if CMK_SMP
1318     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV && _thread_provided != MPI_THREAD_MULTIPLE) {
1319         Cmi_smp_mode_setting = COMM_THREAD_SEND_RECV; 
1320         if (myNID == 0) {
1321           printf("Charm++> +comm_thread_only_recv disabled\n");
1322         }
1323     }
1324 #endif
1325
1326     {
1327         int debug = CmiGetArgFlag(largv,"++debug");
1328         int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
1329         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1330 #if CMK_HAS_GETPID
1331             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1332             fflush(stdout);
1333             if (!debug_no_pause)
1334                 sleep(15);
1335 #else
1336             printf("++debug ignored.\n");
1337 #endif
1338         }
1339     }
1340
1341
1342 #if CMK_MEM_CHECKPOINT
1343     if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
1344        CmiAssert(num_workpes <= *numNodes);
1345        total_pes = *numNodes;
1346        *numNodes = num_workpes;
1347     }
1348     else
1349        total_pes = num_workpes = *numNodes;
1350     if (*myNodeID == 0)
1351        CmiPrintf("Charm++> FT using %d processors and %d spare processors.\n", num_workpes, total_pes-num_workpes);
1352     petorank = (int *)malloc(sizeof(int) * num_workpes);
1353     for (i=0; i<num_workpes; i++)  petorank[i] = i;
1354     nextrank = num_workpes;
1355
1356     if (*myNodeID >= num_workpes) {    /* is spare processor */
1357       MPI_Status sts;
1358       int vals[2];
1359       MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, charmComm,&sts);
1360       int newpe = vals[0];
1361       CpvAccess(_curRestartPhase) = vals[1];
1362
1363       if (newpe == -1) {
1364           MPI_Barrier(charmComm);
1365           MPI_Finalize();
1366           exit(0);
1367       }
1368
1369       CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
1370         /* update petorank */
1371       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
1372       nextrank = *myNodeID + 1;
1373       *myNodeID = newpe;
1374       myNID = newpe;
1375
1376        /* add +restartaftercrash to argv */
1377       char *phase_str;
1378       char **restart_argv;
1379       int i=0;
1380       while(largv[i]!= NULL) i++;
1381       restart_argv = (char **)malloc(sizeof(char *)*(i+3));
1382       i=0;
1383       while(largv[i]!= NULL){
1384                 restart_argv[i] = largv[i];
1385                 i++;
1386       }
1387       restart_argv[i] = "+restartaftercrash";
1388       phase_str = (char*)malloc(10);
1389       sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
1390       restart_argv[i+1]=phase_str;
1391       restart_argv[i+2]=NULL;
1392       *argv = restart_argv;
1393       *argc = i+2;
1394       largc = *argc;
1395       largv = *argv;
1396     }
1397 #endif
1398
1399     idleblock = CmiGetArgFlag(largv, "+idleblocking");
1400     if (idleblock && _Cmi_mynode == 0) {
1401         printf("Charm++: Running in idle blocking mode.\n");
1402     }
1403
1404 #if CMK_CHARMDEBUG
1405     /* setup signal handlers */
1406     signal(SIGSEGV, KillOnAllSigs);
1407     signal(SIGFPE, KillOnAllSigs);
1408     signal(SIGILL, KillOnAllSigs);
1409     signal_int = signal(SIGINT, KillOnAllSigs);
1410     signal(SIGTERM, KillOnAllSigs);
1411     signal(SIGABRT, KillOnAllSigs);
1412 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1413     signal(SIGQUIT, KillOnAllSigs);
1414     signal(SIGBUS, KillOnAllSigs);
1415 #   endif /*UNIX*/
1416 #endif
1417
1418 #if CMK_NO_OUTSTANDING_SENDS
1419     no_outstanding_sends=1;
1420 #endif
1421     if (CmiGetArgFlag(largv,"+no_outstanding_sends")) {
1422         no_outstanding_sends = 1;
1423         if (myNID == 0)
1424             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1425                    no_outstanding_sends?"":" not");
1426     }
1427
1428     request_max=MAX_QLEN;
1429     CmiGetArgInt(largv,"+requestmax",&request_max);
1430     /*printf("request max=%d\n", request_max);*/
1431
1432 #if MPI_POST_RECV
1433     CmiGetArgInt(largv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1434     CmiGetArgInt(largv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1435     CmiGetArgInt(largv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1436     CmiGetArgInt(largv, "+postRecvThreshold", &MPI_POST_RECV_MSG_CNT_THRESHOLD);
1437     CmiGetArgInt(largv, "+postRecvBucketSize", &MPI_POST_RECV_INC);
1438     CmiGetArgInt(largv, "+postRecvMsgInc", &MPI_POST_RECV_MSG_INC);
1439     CmiGetArgInt(largv, "+postRecvCheckFreq", &MPI_POST_RECV_FREQ);
1440     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1441     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1442     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1443     if (myNID==0) {
1444         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes) with msg count threshold %d and msg histogram bucket size %d, #buf increment every %d msgs. The buffers are checked every %d msgs\n",
1445                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE,
1446                MPI_POST_RECV_MSG_CNT_THRESHOLD, MPI_POST_RECV_INC, MPI_POST_RECV_MSG_INC, MPI_POST_RECV_FREQ);
1447     }
1448 #endif
1449
1450 #if CMI_EXERT_SEND_CAP
1451     CmiGetArgInt(largv, "+dynCapSend", &SEND_CAP);
1452     if (myNID==0) {
1453         printf("Charm++: using static send cap %d\n", SEND_CAP);
1454     }
1455 #endif
1456 #if CMI_EXERT_RECV_CAP
1457     CmiGetArgInt(largv, "+dynCapRecv", &RECV_CAP);
1458     if (myNID==0) {
1459         printf("Charm++: using static recv cap %d\n", RECV_CAP);
1460     }
1461 #endif
1462 #if CMI_DYNAMIC_EXERT_CAP 
1463     CmiGetArgInt(largv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1464     CmiGetArgInt(largv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1465     CmiGetArgInt(largv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1466     if (myNID==0) {
1467         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1468                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1469     }
1470 #endif
1471
1472 #if USE_ASYNC_RECV_FUNC
1473     CmiGetArgInt(largv, "+irecvMsgThreshold", &IRECV_MSG_THRESHOLD);
1474     if(myNID==0) {
1475         printf("Charm++: for msg size larger than %d, MPI_Irecv is going to be used.\n", IRECV_MSG_THRESHOLD);
1476     }
1477 #endif
1478
1479     /* checksum flag */
1480     if (CmiGetArgFlag(largv,"+checksum")) {
1481 #if CMK_ERROR_CHECKING
1482         checksum_flag = 1;
1483         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1484 #else
1485         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1486 #endif
1487     }
1488
1489     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1490     for (i=0; i<_Cmi_mynodesize+1; i++) {
1491 #if MULTI_SENDQUEUE
1492         procState[i].sendMsgBuf = PCQueueCreate();
1493 #endif
1494         procState[i].recvLock = CmiCreateLock();
1495     }
1496 #if CMK_SMP
1497 #if !MULTI_SENDQUEUE
1498     sendMsgBuf = PCQueueCreate();
1499     sendMsgBufLock = CmiCreateLock();
1500 #endif
1501 #endif
1502 }
1503
1504 void LrtsPreCommonInit(int everReturn) {
1505
1506 #if MPI_POST_RECV
1507     int doInit = 1;
1508     int i;
1509
1510 #if CMK_SMP
1511     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1512 #endif
1513
1514     /* Currently, in mpi smp, the main thread will be the comm thread, so
1515      *  only the comm thread should post recvs. Cpvs, however, need to be
1516      * created on rank 0 (the ptrs to the actual cpv memory), while
1517      * other ranks are busy waiting for this to finish. So cpv initialize
1518      * routines have to be called on every ranks, although they are only
1519      * useful on comm thread (whose rank is not zero) -Chao Mei
1520      */
1521     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1522     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1523     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1524     CpvInitialize(char **, CmiPostedRecvBuffers);
1525
1526     CpvAccess(CmiPostedRecvRequests) = NULL;
1527     CpvAccess(CmiPostedRecvBuffers) = NULL;
1528
1529     CpvInitialize(MPIPostRecvList *, postRecvListHdr);
1530     CpvInitialize(MPIPostRecvList *, curPostRecvPtr);
1531     CpvInitialize(int, msgRecvCnt);
1532
1533     CpvAccess(postRecvListHdr) = NULL;
1534     CpvAccess(curPostRecvPtr) = NULL;
1535     CpvAccess(msgRecvCnt) = 0;
1536
1537 #if MPI_DYNAMIC_POST_RECV
1538     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1539 #endif
1540
1541     if (doInit) {
1542 #if MPI_DYNAMIC_POST_RECV
1543         MSG_HISTOGRAM_BINSIZE = MPI_POST_RECV_INC;
1544         /* including two more buckets that are out of the range [LOWERSIZE, UPPERSIZE] */
1545         MAX_HISTOGRAM_BUCKETS = (MPI_POST_RECV_UPPERSIZE - MPI_POST_RECV_LOWERSIZE)/MSG_HISTOGRAM_BINSIZE+2;
1546         CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1547         memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1548 #else
1549         /* Post some extra recvs to help out with incoming messages */
1550         /* On some MPIs the messages are unexpected and thus slow */
1551
1552         CpvAccess(postRecvListHdr) = (MPIPostRecvList *)malloc(sizeof(MPIPostRecvList));
1553
1554         /* An array of request handles for posted recvs */
1555         CpvAccess(postRecvListHdr)->msgSizeIdx = -1;
1556         CpvAccess(postRecvListHdr)->bufCnt = MPI_POST_RECV_COUNT;
1557         CpvAccess(postRecvListHdr)->postedRecvReqs = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1558         /* An array of buffers for posted recvs */
1559         CpvAccess(postRecvListHdr)->postedRecvBufs = (char**)malloc(MPI_POST_RECV_COUNT*sizeof(char *));
1560         CpvAccess(postRecvListHdr)->next = CpvAccess(postRecvListHdr);
1561         CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr);
1562
1563         /* Post Recvs */
1564         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1565             char *tmpbuf = (char *)CmiAlloc(MPI_POST_RECV_SIZE); /* Note: could be aligned allocation?? */
1566             CpvAccess(postRecvListHdr)->postedRecvBufs[i] = tmpbuf;
1567             if (MPI_SUCCESS != MPI_Irecv(tmpbuf,
1568                                          MPI_POST_RECV_SIZE,
1569                                          MPI_BYTE,
1570                                          MPI_ANY_SOURCE,
1571                                          POST_RECV_TAG,
1572                                          charmComm,
1573                                          CpvAccess(postRecvListHdr)->postedRecvReqs+i  ))
1574                 CmiAbort("MPI_Irecv failed\n");
1575         }
1576 #endif
1577     }
1578 #endif /* end of MPI_POST_RECV */
1579
1580 #if CAPTURE_MSG_HISTOGRAM && !MPI_DYNAMIC_POST_RECV
1581     CpvInitialize(int *, MSG_HISTOGRAM_ARRAY);
1582     CpvAccess(MSG_HISTOGRAM_ARRAY) = (int *)malloc(sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1583     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
1584 #endif
1585
1586 #if USE_ASYNC_RECV_FUNC
1587 #if CMK_SMP
1588     /* allocate the guardian entry only on comm thread considering NUMA */
1589     if(CmiMyRank() == CmiMyNodeSize()) {
1590         waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1591         waitIrecvListHead->next = NULL;
1592     }
1593 #else    
1594     waitIrecvListHead = waitIrecvListTail = irecvListEntryAllocate();
1595     waitIrecvListHead->next = NULL;
1596 #endif
1597 #endif
1598 }
1599
1600 void LrtsPostCommonInit(int everReturn) {
1601
1602     CmiIdleState *s=CmiNotifyGetState();
1603
1604     CpvInitialize(SMSG_LIST *, sent_msgs);
1605     CpvInitialize(SMSG_LIST *, end_sent);
1606     CpvInitialize(int, MsgQueueLen);
1607     CpvAccess(sent_msgs) = NULL;
1608     CpvAccess(end_sent) = NULL;
1609     CpvAccess(MsgQueueLen) = 0;
1610
1611     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1612
1613 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1614     CpvInitialize(double, projTraceStart);
1615     /* only PE 0 needs to care about registration (to generate sts file). */
1616     if (CmiMyPe() == 0) {
1617         registerMachineUserEventsFunction(&registerMPITraceEvents);
1618     }
1619 #endif
1620
1621 #if CMK_SMP
1622     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1623     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1624     if (Cmi_smp_mode_setting == COMM_THREAD_ONLY_RECV)
1625       CcdCallOnConditionKeep(CcdPERIODIC,(CcdVoidFn)LrtsPostNonLocal,NULL);
1626 #else
1627     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1628 #endif
1629
1630 #if MACHINE_DEBUG_LOG
1631     if (CmiMyRank() == 0) {
1632         char ln[200];
1633         sprintf(ln,"debugLog.%d",CmiMyNode());
1634         debugLog=fopen(ln,"w");
1635     }
1636 #endif
1637 }
1638 /* ######End of functions related with starting programs###### */
1639
1640 /***********************************************************************
1641  *
1642  * Abort function:
1643  *
1644  ************************************************************************/
1645
1646 void LrtsAbort(const char *message) {
1647     char *m;
1648     /* if CharmDebug is attached simply try to send a message to it */
1649 #if CMK_CCS_AVAILABLE
1650     if (CpvAccess(cmiArgDebugFlag)) {
1651         CpdNotify(CPD_ABORT, message);
1652         CpdFreeze();
1653     }
1654 #endif
1655     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1656              "Reason: %s\n",CmiMyPe(),message);
1657     /*  CmiError(message); */
1658     CmiPrintStackTrace(0);
1659     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1660     CmiSetHandler(m, machine_exit_idx);
1661     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1662     machine_exit(m);
1663     /* Program never reaches here */
1664     MPI_Abort(charmComm, 1);
1665 }
1666
1667 /**************************  TIMER FUNCTIONS **************************/
1668 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1669
1670 /* MPI calls are not threadsafe, even the timer on some machines */
1671 static CmiNodeLock  timerLock = 0;
1672                                 static int _absoluteTime = 0;
1673                                                            static double starttimer = 0;
1674                                                                                       static int _is_global = 0;
1675
1676 int CmiTimerIsSynchronized() {
1677     int  flag;
1678     void *v;
1679
1680     /*  check if it using synchronized timer */
1681     if (MPI_SUCCESS != MPI_Attr_get(charmComm, MPI_WTIME_IS_GLOBAL, &v, &flag))
1682         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1683     if (flag) {
1684         _is_global = *(int*)v;
1685         if (_is_global && CmiMyPe() == 0)
1686             printf("Charm++> MPI timer is synchronized\n");
1687     }
1688     return _is_global;
1689 }
1690
1691 int CmiTimerAbsolute() {
1692     return _absoluteTime;
1693 }
1694
1695 double CmiStartTimer() {
1696     return 0.0;
1697 }
1698
1699 double CmiInitTime() {
1700     return starttimer;
1701 }
1702
1703 void CmiTimerInit(char **argv) {
1704     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1705     if (_absoluteTime && CmiMyPe() == 0)
1706         printf("Charm++> absolute MPI timer is used\n");
1707
1708 #if ! CMK_MEM_CHECKPOINT
1709     _is_global = CmiTimerIsSynchronized();
1710 #else
1711     _is_global = 0;
1712 #endif
1713
1714     if (_is_global) {
1715         if (CmiMyRank() == 0) {
1716             double minTimer;
1717 #if CMK_TIMER_USE_XT3_DCLOCK
1718             starttimer = dclock();
1719 #else
1720             starttimer = MPI_Wtime();
1721 #endif
1722
1723             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1724                           charmComm );
1725             starttimer = minTimer;
1726         }
1727     } else { /* we don't have a synchronous timer, set our own start time */
1728 #if ! CMK_MEM_CHECKPOINT
1729         CmiBarrier();
1730         CmiBarrier();
1731         CmiBarrier();
1732 #endif
1733 #if CMK_TIMER_USE_XT3_DCLOCK
1734         starttimer = dclock();
1735 #else
1736         starttimer = MPI_Wtime();
1737 #endif
1738     }
1739
1740 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1741     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1742         timerLock = CmiCreateLock();
1743 #endif
1744     CmiNodeAllBarrier();          /* for smp */
1745 }
1746
1747 /**
1748  * Since the timerLock is never created, and is
1749  * always NULL, then all the if-condition inside
1750  * the timer functions could be disabled right
1751  * now in the case of SMP. --Chao Mei
1752  */
1753 double CmiTimer(void) {
1754     double t;
1755 #if 0 && CMK_SMP
1756     if (timerLock) CmiLock(timerLock);
1757 #endif
1758
1759 #if CMK_TIMER_USE_XT3_DCLOCK
1760     t = dclock();
1761 #else
1762     t = MPI_Wtime();
1763 #endif
1764
1765 #if 0 && CMK_SMP
1766     if (timerLock) CmiUnlock(timerLock);
1767 #endif
1768
1769     return _absoluteTime?t: (t-starttimer);
1770 }
1771
1772 double CmiWallTimer(void) {
1773     double t;
1774 #if 0 && CMK_SMP
1775     if (timerLock) CmiLock(timerLock);
1776 #endif
1777
1778 #if CMK_TIMER_USE_XT3_DCLOCK
1779     t = dclock();
1780 #else
1781     t = MPI_Wtime();
1782 #endif
1783
1784 #if 0 && CMK_SMP
1785     if (timerLock) CmiUnlock(timerLock);
1786 #endif
1787
1788     return _absoluteTime? t: (t-starttimer);
1789 }
1790
1791 double CmiCpuTimer(void) {
1792     double t;
1793 #if 0 && CMK_SMP
1794     if (timerLock) CmiLock(timerLock);
1795 #endif
1796 #if CMK_TIMER_USE_XT3_DCLOCK
1797     t = dclock() - starttimer;
1798 #else
1799     t = MPI_Wtime() - starttimer;
1800 #endif
1801 #if 0 && CMK_SMP
1802     if (timerLock) CmiUnlock(timerLock);
1803 #endif
1804     return t;
1805 }
1806
1807 #endif     /* CMK_TIMER_USE_SPECIAL */
1808
1809 /************Barrier Related Functions****************/
1810 /* must be called on all ranks including comm thread in SMP */
1811 int CmiBarrier() {
1812 #if CMK_SMP
1813     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1814     CmiNodeAllBarrier();
1815     if (CmiMyRank() == CmiMyNodeSize())
1816 #else
1817     if (CmiMyRank() == 0)
1818 #endif
1819     {
1820         /**
1821          *  The call of CmiBarrier is usually before the initialization
1822          *  of trace module of Charm++, therefore, the START_EVENT
1823          *  and END_EVENT are disabled here. -Chao Mei
1824          */
1825         /*START_EVENT();*/
1826
1827         if (MPI_SUCCESS != MPI_Barrier(charmComm))
1828             CmiAbort("Timernit: MPI_Barrier failed!\n");
1829
1830         /*END_EVENT(10);*/
1831     }
1832     CmiNodeAllBarrier();
1833     return 0;
1834 }
1835
1836 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1837 int CmiBarrierZero() {
1838     int i;
1839 #if CMK_SMP
1840     if (CmiMyRank() == CmiMyNodeSize())
1841 #else
1842     if (CmiMyRank() == 0)
1843 #endif
1844     {
1845         char msg[1];
1846         MPI_Status sts;
1847         if (CmiMyNode() == 0)  {
1848             for (i=0; i<CmiNumNodes()-1; i++) {
1849                 START_EVENT();
1850
1851                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, charmComm,&sts))
1852                     CmiPrintf("MPI_Recv failed!\n");
1853
1854                 END_EVENT(30);
1855             }
1856         } else {
1857             START_EVENT();
1858
1859             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,charmComm))
1860                 printf("MPI_Send failed!\n");
1861
1862             END_EVENT(20);
1863         }
1864     }
1865     CmiNodeAllBarrier();
1866     return 0;
1867 }
1868
1869
1870 #if CMK_MEM_CHECKPOINT
1871
1872 void mpi_restart_crashed(int pe, int rank)
1873 {
1874     int vals[2];
1875     vals[0] = pe;
1876     vals[1] = CpvAccess(_curRestartPhase)+1;
1877     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
1878     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
1879 }
1880
1881 /* notify spare processors to exit */
1882 void mpi_end_spare()
1883 {
1884     int i;
1885     for (i=nextrank; i<total_pes; i++) {
1886         int vals[2] = {-1,-1};
1887         MPI_Send((void *)vals,2,MPI_INT,i,FAIL_TAG,charmComm);
1888     }
1889 }
1890
1891 int find_spare_mpirank(int pe)
1892 {
1893     if (nextrank == total_pes) {
1894       CmiAbort("Charm++> No spare processor available.");
1895     }
1896     petorank[pe] = nextrank;
1897     nextrank++;
1898     return nextrank-1;
1899 }
1900
1901 void CkDieNow()
1902 {
1903     CmiPrintf("[%d] die now.\n", CmiMyPe());
1904
1905       /* release old messages */
1906     while (!CmiAllAsyncMsgsSent()) {
1907         PumpMsgs();
1908         CmiReleaseSentMessages();
1909     }
1910     MPI_Barrier(charmComm);
1911     MPI_Finalize();
1912     exit(0);
1913 }
1914
1915 #endif
1916
1917 /*======Beginning of Msg Histogram or Dynamic Post-Recv Related Funcs=====*/
1918 #if CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV
1919 /* Functions related with capturing msg histogram */
1920
1921 #if MPI_DYNAMIC_POST_RECV
1922 /* Consume all messages in the request buffers */
1923 static void consumeAllMsgs()
1924 {
1925     MPIPostRecvList *ptr = CpvAccess(curPostRecvPtr);
1926     if (ptr) {
1927         do {
1928             int i;
1929             for (i=0; i<ptr->bufCnt; i++) {
1930                 int done = 0;
1931                 MPI_Status sts;
1932
1933                 /* Indicating this entry has been tested before */
1934                 if (ptr->postedRecvBufs[i] == NULL) continue;
1935
1936                 START_TRACE_RECVCOMM(NULL);
1937                 if (MPI_SUCCESS != MPI_Test(ptr->postedRecvReqs+i, &done, &sts))
1938                     CmiAbort("consumeAllMsgs failed in MPI_Test!\n");
1939                 if (done) {
1940                     int nbytes;
1941                     char *msg;                    
1942                     
1943                     if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
1944                         CmiAbort("consumeAllMsgs failed in MPI_Get_count!\n");
1945                     /* ready to handle this msg */
1946                     msg = (ptr->postedRecvBufs)[i];
1947                     (ptr->postedRecvBufs)[i] = NULL;
1948                     
1949                     END_TRACE_RECVCOMM(msg);
1950                     handleOneRecvedMsg(nbytes, msg);
1951                 } else {
1952                     if (MPI_SUCCESS != MPI_Cancel(ptr->postedRecvReqs+i))
1953                         CmiAbort("consumeAllMsgs failed in MPI_Cancel!\n");
1954                 }
1955             }
1956             ptr = ptr->next;
1957         } while (ptr != CpvAccess(curPostRecvPtr));
1958     }
1959 }
1960
1961 static void recordMsgHistogramInfo(int size)
1962 {
1963     int idx = 0;
1964     size -= MPI_POST_RECV_LOWERSIZE;
1965     if (size > 0)
1966         idx = (size/MSG_HISTOGRAM_BINSIZE + 1);
1967
1968     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
1969     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
1970 }
1971
1972 #define POST_RECV_USE_STATIC_PARAM 0
1973 #define POST_RECV_REPORT_STS 0
1974
1975 #if POST_RECV_REPORT_STS
1976 static int buildDynCallCnt = 0;
1977 #endif
1978
1979 static void buildDynamicRecvBuffers()
1980 {
1981     int i;
1982
1983     int local_MSG_CNT_THRESHOLD;
1984     int local_MSG_INC;
1985
1986 #if POST_RECV_REPORT_STS
1987     buildDynCallCnt++;
1988 #endif
1989
1990     /* For debugging usage */
1991     reportMsgHistogramInfo();
1992
1993     CpvAccess(msgRecvCnt) = 0;
1994     /* consume all outstanding msgs */
1995     consumeAllMsgs();
1996
1997 #if POST_RECV_USE_STATIC_PARAM
1998     local_MSG_CNT_THRESHOLD = MPI_POST_RECV_MSG_CNT_THRESHOLD;
1999     local_MSG_INC = MPI_POST_RECV_MSG_INC;
2000 #else
2001     {
2002         int total = 0;
2003         int count = 0;
2004         for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2005             int tmp = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2006             /* avg is temporarily used for counting how many buckets are non-zero */
2007             if (tmp > 0)  {
2008                 total += tmp;
2009                 count++;
2010             }
2011         }
2012         if (count == 1) local_MSG_CNT_THRESHOLD = 1; /* Just filter out those zero-count msgs */
2013         else local_MSG_CNT_THRESHOLD = total / count /3; /* Catch >50% msgs NEED-BETTER-SCHEME HERE!!*/
2014         local_MSG_INC = total/count; /* Not having a good heuristic right now */
2015 #if POST_RECV_REPORT_STS
2016         printf("sel_histo[%d]: critia_threshold=%d, critia_msginc=%d\n", CmiMyPe(), local_MSG_CNT_THRESHOLD, local_MSG_INC);
2017 #endif
2018     }
2019 #endif
2020
2021     /* First continue to find the first msg range that requires post recv */
2022     /* Ignore the fist and the last one because they are not tracked */
2023     MPIPostRecvList *newHdr = NULL;
2024     MPIPostRecvList *newListPtr = newHdr;
2025     MPIPostRecvList *ptr = CpvAccess(postRecvListHdr);
2026     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2027         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2028         if (count >= local_MSG_CNT_THRESHOLD) {
2029
2030 #if POST_RECV_REPORT_STS
2031             /* Report histogram results */
2032             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2033             int high = low + MSG_HISTOGRAM_BINSIZE;
2034             int reportCnt;
2035             if (count == local_MSG_CNT_THRESHOLD) reportCnt = 1;
2036             else reportCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2037             printf("sel_histo[%d]-%d: msg size [%.2f, %.2f) with count=%d (%d)\n", CmiMyPe(), buildDynCallCnt, low/1000.0, high/1000.0, count, reportCnt);
2038 #endif
2039             /* find if this msg idx exists, the "i" is the msgSizeIdx, in the current list */
2040             int notFound = 1;
2041             MPIPostRecvList *newEntry = NULL;
2042             while (ptr) {
2043                 if (ptr->msgSizeIdx < i) {
2044                     /* free the buffer for this range of msg size */
2045                     MPIPostRecvList *nextptr = ptr->next;
2046
2047                     free(ptr->postedRecvReqs);
2048                     int j;
2049                     for (j=0; j<ptr->bufCnt; j++) {
2050                         if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2051                     }
2052                     free(ptr->postedRecvBufs);
2053                     ptr = nextptr;
2054                 } else if (ptr->msgSizeIdx == i) {
2055                     int newBufCnt, j;
2056                     int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2057                     newEntry = ptr;
2058                     /* Do some adjustment according to the current statistics */
2059                     if (count == local_MSG_CNT_THRESHOLD) newBufCnt = 1;
2060                     else newBufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2061                     if (newBufCnt != ptr->bufCnt) {
2062                         /* free old buffers, and allocate new buffers */
2063                         free(ptr->postedRecvReqs);
2064                         ptr->postedRecvReqs = (MPI_Request *)malloc(newBufCnt * sizeof(MPI_Request));
2065                         for (j=0; j<ptr->bufCnt; j++) {
2066                             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2067                         }
2068                         free(ptr->postedRecvBufs);
2069                         ptr->postedRecvBufs = (char **)malloc(newBufCnt * sizeof(char *));
2070                     }
2071
2072                     /* re-post those buffers */
2073                     ptr->bufCnt = newBufCnt;
2074                     for (j=0; j<ptr->bufCnt; j++) {
2075                         ptr->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2076                         if (MPI_SUCCESS != MPI_Irecv(ptr->postedRecvBufs[j], bufSize, MPI_BYTE,
2077                                                      MPI_ANY_SOURCE, POST_RECV_TAG+ptr->msgSizeIdx,
2078                                                      charmComm, ptr->postedRecvReqs+j))
2079                             CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2080                     }
2081
2082                     /* We already posted bufs for this range of msg size */
2083                     ptr = ptr->next;
2084                     /* Need to set ptr to NULL as the buf list comes to an end and the while loop exits */
2085                     if (ptr == CpvAccess(postRecvListHdr)) ptr = NULL;
2086                     notFound = 0;
2087                     break;
2088                 } else {
2089                     /* The msgSizeIdx is larger than i */
2090                     break;
2091                 }
2092                 if (ptr == CpvAccess(postRecvListHdr)) {
2093                     ptr = NULL;
2094                     break;
2095                 }
2096             } /* end while(ptr): iterating the posted recv buffer list */
2097
2098             if (notFound) {
2099                 /* the current range of msg size is not found in the list */
2100                 int j;
2101                 int bufSize = i*MPI_POST_RECV_INC + MPI_POST_RECV_LOWERSIZE - 1;
2102                 newEntry = malloc(sizeof(MPIPostRecvList));
2103                 MPIPostRecvList *one = newEntry;
2104                 one->msgSizeIdx = i;
2105                 if (count == local_MSG_CNT_THRESHOLD) one->bufCnt = 1;
2106                 else one->bufCnt = (count - local_MSG_CNT_THRESHOLD)/local_MSG_INC + 1;
2107                 one->postedRecvReqs = (MPI_Request *)malloc(sizeof(MPI_Request)*one->bufCnt);
2108                 one->postedRecvBufs = (char **)malloc(one->bufCnt * sizeof(char *));
2109                 for (j=0; j<one->bufCnt; j++) {
2110                     one->postedRecvBufs[j] = (char *)CmiAlloc(bufSize);
2111                     if (MPI_SUCCESS != MPI_Irecv(one->postedRecvBufs[j], bufSize, MPI_BYTE,
2112                                                  MPI_ANY_SOURCE, POST_RECV_TAG+one->msgSizeIdx,
2113                                                  charmComm, one->postedRecvReqs+j))
2114                         CmiAbort("MPI_Irecv failed in buildDynamicRecvBuffers!\n");
2115                 }
2116             } /* end if notFound */
2117
2118             /* Update the new list with the newEntry */
2119             CmiAssert(newEntry != NULL);
2120             if (newHdr == NULL) {
2121                 newHdr = newEntry;
2122                 newListPtr = newEntry;
2123                 newHdr->next = newHdr;
2124             } else {
2125                 newListPtr->next = newEntry;
2126                 newListPtr = newEntry;
2127                 newListPtr->next = newHdr;
2128             }
2129         } /* end if the count of this msg size range exceeds the threshold */
2130     } /* end for loop over the histogram buckets */
2131
2132     /* Free remaining entries in the list */
2133     while (ptr) {
2134         /* free the buffer for this range of msg size */
2135         MPIPostRecvList *nextptr = ptr->next;
2136
2137         free(ptr->postedRecvReqs);
2138         int j;
2139         for (j=0; j<ptr->bufCnt; j++) {
2140             if ((ptr->postedRecvBufs)[j]) CmiFree((ptr->postedRecvBufs)[j]);
2141         }
2142         free(ptr->postedRecvBufs);
2143         ptr = nextptr;
2144         if (ptr == CpvAccess(postRecvListHdr)) break;
2145     }
2146
2147     CpvAccess(curPostRecvPtr) = CpvAccess(postRecvListHdr) = newHdr;
2148     memset(CpvAccess(MSG_HISTOGRAM_ARRAY), 0, sizeof(int)*MAX_HISTOGRAM_BUCKETS);
2149 } /* end of function buildDynamicRecvBuffers */
2150
2151 static void examineMsgHistogramInfo(int size)
2152 {
2153     int total = CpvAccess(msgRecvCnt)++;
2154     if (total < MPI_POST_RECV_FREQ) {
2155         recordMsgHistogramInfo(size);
2156     } else {
2157         buildDynamicRecvBuffers();
2158     }
2159 }
2160 #else
2161 /* case when CAPTURE_MSG_HISTOGRAM is defined */
2162 static void recordMsgHistogramInfo(int size)
2163 {
2164     int idx = size/MSG_HISTOGRAM_BINSIZE;
2165     if (idx >= MAX_HISTOGRAM_BUCKETS) idx = MAX_HISTOGRAM_BUCKETS-1;
2166     CpvAccess(MSG_HISTOGRAM_ARRAY)[idx]++;
2167 }
2168 #endif /* end of MPI_DYNAMIC_POST_RECV */
2169
2170 void reportMsgHistogramInfo()
2171 {
2172 #if MPI_DYNAMIC_POST_RECV
2173     int i, count;
2174     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[0];
2175     if (count > 0) {
2176         printf("msg_histo[%d]: %d for msg [0, %.2fK)\n", CmiMyNode(), count, MPI_POST_RECV_LOWERSIZE/1000.0);
2177     }
2178     for (i=1; i<MAX_HISTOGRAM_BUCKETS-1; i++) {
2179         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2180         if (count > 0) {
2181             int low = (i-1)*MSG_HISTOGRAM_BINSIZE + MPI_POST_RECV_LOWERSIZE;
2182             int high = low + MSG_HISTOGRAM_BINSIZE;
2183             printf("msg_histo[%d]: %d for msg [%.2fK, %.2fK)\n", CmiMyNode(), count, low/1000.0, high/1000.0);
2184         }
2185     }
2186     count = CpvAccess(MSG_HISTOGRAM_ARRAY)[MAX_HISTOGRAM_BUCKETS-1];
2187     if (count > 0) {
2188         printf("msg_histo[%d]: %d for msg [%.2fK, +inf)\n", CmiMyNode(), count, MPI_POST_RECV_UPPERSIZE/1000.0);
2189     }
2190 #else
2191     int i;
2192     for (i=0; i<MAX_HISTOGRAM_BUCKETS; i++) {
2193         int count = CpvAccess(MSG_HISTOGRAM_ARRAY)[i];
2194         if (count > 0) {
2195             int low = i*MSG_HISTOGRAM_BINSIZE;
2196             int high = low + MSG_HISTOGRAM_BINSIZE;
2197             printf("msg_histo[%d]: %d for msg [%dK, %dK)\n", CmiMyNode(), count, low/1000, high/1000);
2198         }
2199     }
2200 #endif
2201 }
2202 #endif /* end of CAPTURE_MSG_HISTOGRAM || MPI_DYNAMIC_POST_RECV */
2203
2204 void CmiSetupMachineRecvBuffersUser()
2205 {
2206 #if MPI_DYNAMIC_POST_RECV
2207     buildDynamicRecvBuffers();
2208 #endif
2209 }
2210 /*=======End of Msg Histogram or Dynamic Post-Recv Related Funcs======*/
2211
2212
2213 /*@}*/
2214