disable MPI_POST_RECV in default
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {
36     Sleep(1000*secs);
37 }
38 #else
39 #include <unistd.h> /*For getpid()*/
40 #endif
41 #include <stdlib.h> /*For sleep()*/
42
43 #include "machine.h"
44 #include "pcqueue.h"
45
46 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
47 /* Whether to use multiple send queue in SMP mode */
48 #define MULTI_SENDQUEUE    0
49
50 /* ###Beginning of flow control related macros ### */
51 #define CMI_EXERT_SEND_CAP 0
52 #define CMI_EXERT_RECV_CAP 0
53
54 #define CMI_DYNAMIC_EXERT_CAP 0
55 /* This macro defines the max number of msgs in the sender msg buffer
56  * that is allowed for recving operation to continue
57  */
58 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
59 #define CMI_DYNAMIC_MAXCAPSIZE 1000
60 #define CMI_DYNAMIC_SEND_CAPSIZE 4
61 #define CMI_DYNAMIC_RECV_CAPSIZE 3
62 /* initial values, -1 indiates there's no cap */
63 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
64 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 3
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73 /* ###End of flow control related macros ### */
74
75 /* ###Beginning of machine-layer-tracing related macros ### */
76 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
77 #define CMI_MPI_TRACE_MOREDETAILED 0
78 #undef CMI_MPI_TRACE_USEREVENTS
79 #define CMI_MPI_TRACE_USEREVENTS 1
80 #else
81 #undef CMK_SMP_TRACE_COMMTHREAD
82 #define CMK_SMP_TRACE_COMMTHREAD 0
83 #endif
84
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
87 #undef CMI_MPI_TRACE_USEREVENTS
88 #define CMI_MPI_TRACE_USEREVENTS 1
89 #else
90 #undef CMK_TRACE_COMMOVERHEAD
91 #define CMK_TRACE_COMMOVERHEAD 0
92 #endif
93
94 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
95 CpvStaticDeclare(double, projTraceStart);
96 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
97 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
98 #else
99 #define  START_EVENT()
100 #define  END_EVENT(x)
101 #endif
102 /* ###End of machine-layer-tracing related macros ### */
103
104 /* ###Beginning of POST_RECV related macros ### */
105 /*
106  * If MPI_POST_RECV is defined, we provide default values for
107  * size and number of posted recieves. If MPI_POST_RECV_COUNT
108  * is set then a default value for MPI_POST_RECV_SIZE is used
109  * if not specified by the user.
110  */
111 #define MPI_POST_RECV 0
112
113 /* Making those parameters configurable for testing them easily */
114
115 #if MPI_POST_RECV
116 static int MPI_POST_RECV_COUNT=10;
117 static int MPI_POST_RECV_LOWERSIZE=2000;
118 static int MPI_POST_RECV_UPPERSIZE=4000;
119 static int MPI_POST_RECV_SIZE;
120
121 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
122 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
123 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
124 CpvDeclare(char*,CmiPostedRecvBuffers);
125 #endif
126
127 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
128 #define TAG     1375
129 #if MPI_POST_RECV
130 #define POST_RECV_TAG       (TAG+1)
131 #define BARRIER_ZERO_TAG  TAG
132 #else
133 #define BARRIER_ZERO_TAG   (TAG-1)
134 #endif
135 /* ###End of POST_RECV related related macros ### */
136
137 #if CMK_BLUEGENEL
138 #define MAX_QLEN 8
139 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
140 #else
141 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
142 #define MAX_QLEN 200
143 #endif
144 /* =======End of Definitions of Performance-Specific Macros =======*/
145
146
147 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
148 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
149 #define CHARM_MAGIC_NUMBER               126
150
151 #if CMK_ERROR_CHECKING
152 extern unsigned char computeCheckSum(unsigned char *data, int len);
153 static int checksum_flag = 0;
154 #define CMI_SET_CHECKSUM(msg, len)      \
155         if (checksum_flag)  {   \
156           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
157           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
158         }
159 #define CMI_CHECK_CHECKSUM(msg, len)    \
160         if (checksum_flag)      \
161           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
162             CmiAbort("Fatal error: checksum doesn't agree!\n");
163 #else
164 #define CMI_SET_CHECKSUM(msg, len)
165 #define CMI_CHECK_CHECKSUM(msg, len)
166 #endif
167 /* =====End of Definitions of Message-Corruption Related Macros=====*/
168
169
170 /* =====Beginning of Declarations of Machine Specific Variables===== */
171 #include <signal.h>
172 void (*signal_int)(int);
173
174 static int _thread_provided = -1; /* Indicating MPI thread level */
175 static int idleblock = 0;
176
177 /* A simple list for msgs that have been sent by MPI_Isend */
178 typedef struct msg_list {
179     char *msg;
180     struct msg_list *next;
181     int size, destpe;
182 #if CMK_SMP_TRACE_COMMTHREAD
183     int srcpe;
184 #endif
185     MPI_Request req;
186 } SMSG_LIST;
187
188 static SMSG_LIST *sent_msgs=0;
189 static SMSG_LIST *end_sent=0;
190
191 int MsgQueueLen=0;
192 static int request_max;
193 /*FLAG: consume outstanding Isends in scheduler loop*/
194 static int no_outstanding_sends=0;
195
196 #if NODE_0_IS_CONVHOST
197 int inside_comm = 0;
198 #endif
199
200 typedef struct ProcState {
201 #if MULTI_SENDQUEUE
202     PCQueue      sendMsgBuf;       /* per processor message sending queue */
203 #endif
204     CmiNodeLock  recvLock;                  /* for cs->recv */
205 } ProcState;
206 static ProcState  *procState;
207
208 #if CMK_SMP && !MULTI_SENDQUEUE
209 static PCQueue sendMsgBuf;
210 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
211 #endif
212 /* =====End of Declarations of Machine Specific Variables===== */
213
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define CmiMachineSpecificSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID);
238 #define MachineSpecificInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define MachineSpecificPreCommonInit MachinePreCommonInitForMPI
243 #define MachineSpecificPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define MachineSpecificAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define MachineSpecificDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define MachineSpecificExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 void MachinePostNonLocalForMPI();
262 #define MachineSpecificPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-common.c"
274
275 /* The machine specific msg-sending function */
276
277 #if CMK_SMP
278 static void EnqueueMsg(void *m, int size, int node) {
279     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
280     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
281     msg_tmp->msg = m;
282     msg_tmp->size = size;
283     msg_tmp->destpe = node;
284
285 #if CMK_SMP_TRACE_COMMTHREAD
286     msg_tmp->srcpe = CmiMyPe();
287 #endif
288
289 #if MULTI_SENDQUEUE
290     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
291 #else
292     /*CmiLock(sendMsgBufLock);*/
293     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
294     /*CmiUnlock(sendMsgBufLock);*/
295 #endif
296
297     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
298 }
299 #endif
300
301 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
302     /* Ignoring the mode for MPI layer */
303
304     CmiState cs = CmiGetState();
305     SMSG_LIST *msg_tmp;
306     int  rank;
307
308     CmiAssert(destNode != CmiMyNode());
309 #if CMK_SMP
310     EnqueueMsg(msg, size, destNode);
311     return 0;
312 #else
313     /* non smp */
314     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
315     msg_tmp->msg = msg;
316     msg_tmp->next = 0;
317     while (MsgQueueLen > request_max) {
318         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
319         CmiReleaseSentMessages();
320         PumpMsgs();
321     }
322 #if CMK_ERROR_CHECKING
323     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
324 #endif
325     CMI_SET_CHECKSUM(msg, size);
326
327 #if MPI_POST_RECV
328     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
329         START_EVENT();
330         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
331             CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
332         END_EVENT(40);
333     } else {
334         START_EVENT();
335         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
336             CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
337         END_EVENT(40);
338     }
339 #else
340     START_EVENT();
341     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
342         CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
343     /*END_EVENT(40);*/
344 #if CMK_TRACE_COMMOVERHEAD
345     char tmp[64];
346     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destNode);
347     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
348 #endif
349 #endif
350
351     MsgQueueLen++;
352     if (sent_msgs==0)
353         sent_msgs = msg_tmp;
354     else
355         end_sent->next = msg_tmp;
356     end_sent = msg_tmp;
357     return (CmiCommHandle) &(msg_tmp->req);
358 #endif              /* non-smp */
359 }
360
361 static size_t CmiAllAsyncMsgsSent(void) {
362     SMSG_LIST *msg_tmp = sent_msgs;
363     MPI_Status sts;
364     int done;
365
366     while (msg_tmp!=0) {
367         done = 0;
368         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
369             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
370         if (!done)
371             return 0;
372         msg_tmp = msg_tmp->next;
373         /*    MsgQueueLen--; ????? */
374     }
375     return 1;
376 }
377
378 int CmiAsyncMsgSent(CmiCommHandle c) {
379
380     SMSG_LIST *msg_tmp = sent_msgs;
381     int done;
382     MPI_Status sts;
383
384     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
385         msg_tmp = msg_tmp->next;
386     if (msg_tmp) {
387         done = 0;
388         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
389             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
390         return ((done)?1:0);
391     } else {
392         return 1;
393     }
394 }
395
396 void CmiReleaseCommHandle(CmiCommHandle c) {
397     return;
398 }
399
400 /* ######Beginning of functions related with communication progress ###### */
401 static void CmiReleaseSentMessages(void) {
402     SMSG_LIST *msg_tmp=sent_msgs;
403     SMSG_LIST *prev=0;
404     SMSG_LIST *temp;
405     int done;
406     MPI_Status sts;
407
408 #if CMK_BLUEGENEL
409     MPID_Progress_test();
410 #endif
411
412     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
413     while (msg_tmp!=0) {
414         done =0;
415 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
416         double startT = CmiWallTimer();
417 #endif
418         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
419             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
420         if (done) {
421             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
422             MsgQueueLen--;
423             /* Release the message */
424             temp = msg_tmp->next;
425             if (prev==0) /* first message */
426                 sent_msgs = temp;
427             else
428                 prev->next = temp;
429             CmiFree(msg_tmp->msg);
430             CmiFree(msg_tmp);
431             msg_tmp = temp;
432         } else {
433             prev = msg_tmp;
434             msg_tmp = msg_tmp->next;
435         }
436 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
437         {
438             double endT = CmiWallTimer();
439             /* only record the event if it takes more than 1ms */
440             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
441         }
442 #endif
443     }
444     end_sent = prev;
445     MACHSTATE(2,"} CmiReleaseSentMessages end");
446 }
447
448 static int PumpMsgs(void) {
449     int nbytes, flg, res;
450     char *msg;
451     MPI_Status sts;
452     int recd=0;
453
454 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
455     int recvCnt=0;
456 #endif
457
458 #if CMK_BLUEGENEL
459     MPID_Progress_test();
460 #endif
461
462     MACHSTATE(2,"PumpMsgs begin {");
463
464 #if CMI_DYNAMIC_EXERT_CAP
465     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
466 #endif
467
468     while (1) {
469 #if CMI_EXERT_RECV_CAP
470         if (recvCnt==RECV_CAP) break;
471 #elif CMI_DYNAMIC_EXERT_CAP
472         if (recvCnt >= dynamicRecvCap) break;
473 #endif
474
475         /* First check posted recvs then do  probe unmatched outstanding messages */
476 #if MPI_POST_RECV
477         int completed_index=-1;
478         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
479             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
480         if (flg) {
481             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
482                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
483
484             recd = 1;
485             msg = (char *) CmiAlloc(nbytes);
486             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
487             /* and repost the recv */
488
489             START_EVENT();
490
491             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
492                                            MPI_POST_RECV_SIZE,
493                                            MPI_BYTE,
494                                            MPI_ANY_SOURCE,
495                                            POST_RECV_TAG,
496                                            MPI_COMM_WORLD,
497                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
498                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
499
500             END_EVENT(50);
501
502             CpvAccess(Cmi_posted_recv_total)++;
503         } else {
504             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
505             if (res != MPI_SUCCESS)
506                 CmiAbort("MPI_Iprobe failed\n");
507             if (!flg) break;
508             recd = 1;
509             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
510             msg = (char *) CmiAlloc(nbytes);
511
512             START_EVENT();
513
514             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
515                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
516
517             END_EVENT(30);
518
519             CpvAccess(Cmi_unposted_recv_total)++;
520         }
521 #else
522         /* Original version */
523 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
524         double startT = CmiWallTimer();
525 #endif
526         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
527         if (res != MPI_SUCCESS)
528             CmiAbort("MPI_Iprobe failed\n");
529
530         if (!flg) break;
531 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
532         {
533             double endT = CmiWallTimer();
534             /* only trace the probe that last longer than 1ms */
535             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
536         }
537 #endif
538
539         recd = 1;
540         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
541         msg = (char *) CmiAlloc(nbytes);
542
543         START_EVENT();
544
545         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
546             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
547
548         /*END_EVENT(30);*/
549
550 #endif
551
552 #if CMK_SMP_TRACE_COMMTHREAD
553         traceBeginCommOp(msg);
554         traceChangeLastTimestamp(CpvAccess(projTraceStart));
555         traceEndCommOp(msg);
556 #if CMI_MPI_TRACE_MOREDETAILED
557         char tmp[32];
558         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
559         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
560 #endif
561 #elif CMK_TRACE_COMMOVERHEAD
562         char tmp[32];
563         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
564         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
565 #endif
566
567
568         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
569         CMI_CHECK_CHECKSUM(msg, nbytes);
570 #if CMK_ERROR_CHECKING
571         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
572             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
573             CmiFree(msg);
574             CmiAbort("Abort!\n");
575             continue;
576         }
577 #endif
578
579         handleOneRecvedMsg(nbytes, msg);
580
581 #if CMI_EXERT_RECV_CAP
582         recvCnt++;
583 #elif CMI_DYNAMIC_EXERT_CAP
584         recvCnt++;
585 #if CMK_SMP
586         /* check sendMsgBuf to get the  number of messages that have not been sent
587              * which is only available in SMP mode
588          * MsgQueueLen indicates the number of messages that have not been released
589              * by MPI
590              */
591         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
592                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
593             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
594         }
595 #else
596         /* MsgQueueLen indicates the number of messages that have not been released
597              * by MPI
598              */
599         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
600             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
601         }
602 #endif
603
604 #endif
605
606     }
607
608     MACHSTATE(2,"} PumpMsgs end ");
609     return recd;
610 }
611
612 /* blocking version */
613 static void PumpMsgsBlocking(void) {
614     static int maxbytes = 20000000;
615     static char *buf = NULL;
616     int nbytes, flg;
617     MPI_Status sts;
618     char *msg;
619     int recd=0;
620
621     if (!PCQueueEmpty(CmiGetState()->recv)) return;
622     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
623     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
624     if (sent_msgs)  return;
625
626 #if 0
627     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
628 #endif
629
630     if (buf == NULL) {
631         buf = (char *) CmiAlloc(maxbytes);
632         _MEMCHECK(buf);
633     }
634
635
636 #if MPI_POST_RECV
637 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
638     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
639 #endif
640
641     START_EVENT();
642
643     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
644         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
645
646     /*END_EVENT(30);*/
647
648     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
649     msg = (char *) CmiAlloc(nbytes);
650     memcpy(msg, buf, nbytes);
651
652 #if CMK_SMP_TRACE_COMMTHREAD
653     traceBeginCommOp(msg);
654     traceChangeLastTimestamp(CpvAccess(projTraceStart));
655     traceEndCommOp(msg);
656 #if CMI_MPI_TRACE_MOREDETAILED
657     char tmp[32];
658     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
659     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
660 #endif
661 #endif
662
663     handleOneRecvedMsg(nbytes, msg);
664 }
665
666
667 #if CMK_SMP
668
669 /* called by communication thread in SMP */
670 static int SendMsgBuf() {
671     SMSG_LIST *msg_tmp;
672     char *msg;
673     int node, rank, size;
674     int i;
675     int sent = 0;
676
677 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
678     int sentCnt = 0;
679 #endif
680
681 #if CMI_DYNAMIC_EXERT_CAP
682     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
683 #endif
684
685     MACHSTATE(2,"SendMsgBuf begin {");
686 #if MULTI_SENDQUEUE
687     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
688         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
689             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
690 #else
691     /* single message sending queue */
692     /* CmiLock(sendMsgBufLock); */
693     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
694     /* CmiUnlock(sendMsgBufLock); */
695     while (NULL != msg_tmp) {
696 #endif
697             node = msg_tmp->destpe;
698             size = msg_tmp->size;
699             msg = msg_tmp->msg;
700             msg_tmp->next = 0;
701
702 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
703             while (MsgQueueLen > request_max) {
704                 CmiReleaseSentMessages();
705                 PumpMsgs();
706             }
707 #endif
708
709             MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
710 #if CMK_ERROR_CHECKING
711             CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
712 #endif
713             CMI_SET_CHECKSUM(msg, size);
714
715 #if MPI_POST_RECV
716             if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
717                 START_EVENT();
718                 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
719                     CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
720                 END_EVENT(40);
721             } else {
722                 START_EVENT();
723                 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
724                     CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
725                 END_EVENT(40);
726             }
727 #else
728             START_EVENT();
729             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
730                 CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
731             /*END_EVENT(40);*/
732 #endif
733
734 #if CMK_SMP_TRACE_COMMTHREAD
735             traceBeginCommOp(msg);
736             traceChangeLastTimestamp(CpvAccess(projTraceStart));
737             /* traceSendMsgComm must execute after traceBeginCommOp because
738                  * we pretend we execute an entry method, and inside this we
739                  * pretend we will send another message. Otherwise how could
740                  * a message creation just before an entry method invocation?
741                  * If such logic is broken, the projections will not trace
742                  * messages correctly! -Chao Mei
743                  */
744             traceSendMsgComm(msg);
745             traceEndCommOp(msg);
746 #if CMI_MPI_TRACE_MOREDETAILED
747             char tmp[64];
748             sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
749             traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
750 #endif
751 #endif
752
753
754             MACHSTATE(3,"}MPI_send end");
755             MsgQueueLen++;
756             if (sent_msgs==0)
757                 sent_msgs = msg_tmp;
758             else
759                 end_sent->next = msg_tmp;
760             end_sent = msg_tmp;
761             sent=1;
762
763 #if CMI_EXERT_SEND_CAP
764             if (++sentCnt == SEND_CAP) break;
765 #elif CMI_DYNAMIC_EXERT_CAP
766             if (++sentCnt >= dynamicSendCap) break;
767             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
768                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
769 #endif
770
771 #if ! MULTI_SENDQUEUE
772             /* CmiLock(sendMsgBufLock); */
773             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
774             /* CmiUnlock(sendMsgBufLock); */
775 #endif
776         }
777 #if MULTI_SENDQUEUE
778     }
779 #endif
780     MACHSTATE(2,"}SendMsgBuf end ");
781     return sent;
782 }
783
784 static int MsgQueueEmpty() {
785     int i;
786 #if MULTI_SENDQUEUE
787     for (i=0; i<_Cmi_mynodesize; i++)
788         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
789 #else
790     return PCQueueEmpty(sendMsgBuf);
791 #endif
792     return 1;
793 }
794
795 /* test if all processors recv queues are empty */
796 static int RecvQueueEmpty() {
797     int i;
798     for (i=0; i<_Cmi_mynodesize; i++) {
799         CmiState cs=CmiGetStateN(i);
800         if (!PCQueueEmpty(cs->recv)) return 0;
801     }
802     return 1;
803 }
804
805
806 #define REPORT_COMM_METRICS 0
807 #if REPORT_COMM_METRICS
808 static double pumptime = 0.0;
809 static double releasetime = 0.0;
810 static double sendtime = 0.0;
811 #endif
812
813 #endif //end of CMK_SMP
814
815 static void AdvanceCommunicationForMPI() {
816 #if REPORT_COMM_METRICS
817     double t1, t2, t3, t4;
818     t1 = CmiWallTimer();
819 #endif
820
821 #if CMK_SMP
822     PumpMsgs();
823
824 #if REPORT_COMM_METRICS
825     t2 = CmiWallTimer();
826 #endif
827
828     CmiReleaseSentMessages();
829 #if REPORT_COMM_METRICS
830     t3 = CmiWallTimer();
831 #endif
832
833     SendMsgBuf();
834
835 #if REPORT_COMM_METRICS
836     t4 = CmiWallTimer();
837     pumptime += (t2-t1);
838     releasetime += (t3-t2);
839     sendtime += (t4-t3);
840 #endif
841
842 #else /* non-SMP case */
843     CmiReleaseSentMessages();
844
845 #if REPORT_COMM_METRICS
846     t2 = CmiWallTimer();
847 #endif
848     PumpMsgs();
849
850 #if REPORT_COMM_METRICS
851     t3 = CmiWallTimer();
852     pumptime += (t3-t2);
853     releasetime += (t2-t1);
854 #endif
855
856 #endif /* end of #if CMK_SMP */
857 }
858 /* ######End of functions related with communication progress ###### */
859
860 void MachinePostNonLocalForMPI() {
861 #if !CMK_SMP
862     if (no_outstanding_sends) {
863         while (MsgQueueLen>0) {
864             AdvanceCommunicationForMPI();
865         }
866     }
867
868     /* FIXME: I don't think the following codes are needed because
869      * it repeats the same job of the next call of CmiGetNonLocal
870      */
871 #if 0
872     if (!msg) {
873         CmiReleaseSentMessages();
874         if (PumpMsgs())
875             return  PCQueuePop(cs->recv);
876         else
877             return 0;
878     }
879 #endif
880 #endif
881 }
882
883 /* Idle-state related functions: called in non-smp mode */
884 void CmiNotifyIdleForMPI(void) {
885     CmiReleaseSentMessages();
886     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
887 }
888
889 /* Network progress function is used to poll the network when for
890    messages. This flushes receive buffers on some  implementations*/
891 #if CMK_MACHINE_PROGRESS_DEFINED
892 void CmiMachineProgressImpl() {
893 #if !CMK_SMP
894     PumpMsgs();
895 #if CMK_IMMEDIATE_MSG
896     CmiHandleImmediate();
897 #endif
898 #else
899     /*Not implemented yet. Communication server does not seem to be
900       thread safe, so only communication thread call it */
901     if (CmiMyRank() == CmiMyNodeSize())
902         CommunicationServerThread(0);
903 #endif
904 }
905 #endif
906
907 /* ######Beginning of functions related with exiting programs###### */
908 void DrainResourcesForMPI() {
909 #if !CMK_SMP
910     while (!CmiAllAsyncMsgsSent()) {
911         PumpMsgs();
912         CmiReleaseSentMessages();
913     }
914 #else
915     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
916         CmiReleaseSentMessages();
917         SendMsgBuf();
918         PumpMsgs();
919     }
920 #endif
921     MACHSTATE(2, "Machine exit barrier begin {");
922     START_EVENT();
923     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
924         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
925     END_EVENT(10);
926     MACHSTATE(2, "} Machine exit barrier end");
927 }
928
929 void MachineExitForMPI(void) {
930 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
931     int doPrint = 0;
932 #if CMK_SMP
933     if (CmiMyNode()==0) doPrint = 1;
934 #else
935     if (CmiMyPe()==0) doPrint = 1;
936 #endif
937
938     if (doPrint) {
939 #if MPI_POST_RECV
940         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
941 #endif
942     }
943 #endif
944
945 #if REPORT_COMM_METRICS
946 #if CMK_SMP
947     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
948               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
949               pumptime, releasetime, sendtime);
950 #else
951     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
952               CmiMyPe(), pumptime, releasetime, sendtime);
953 #endif
954 #endif
955
956 #if ! CMK_AUTOBUILD
957     signal(SIGINT, signal_int);
958     MPI_Finalize();
959 #endif
960     exit(0);
961 }
962
963 static int machine_exit_idx;
964 static void machine_exit(char *m) {
965     EmergencyExit();
966     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
967     fflush(stdout);
968     CmiNodeBarrier();
969     if (CmiMyRank() == 0) {
970         MPI_Barrier(MPI_COMM_WORLD);
971         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
972         MPI_Abort(MPI_COMM_WORLD, 1);
973     } else {
974         while (1) CmiYield();
975     }
976 }
977
978 static void KillOnAllSigs(int sigNo) {
979     static int already_in_signal_handler = 0;
980     char *m;
981     if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
982     already_in_signal_handler = 1;
983 #if CMK_CCS_AVAILABLE
984     if (CpvAccess(cmiArgDebugFlag)) {
985         CpdNotify(CPD_SIGNAL, sigNo);
986         CpdFreeze();
987     }
988 #endif
989     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
990              "Signal: %d\n",CmiMyPe(),sigNo);
991     CmiPrintStackTrace(1);
992
993     m = CmiAlloc(CmiMsgHeaderSizeBytes);
994     CmiSetHandler(m, machine_exit_idx);
995     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
996     machine_exit(m);
997 }
998 /* ######End of functions related with exiting programs###### */
999
1000
1001 /* ######Beginning of functions related with starting programs###### */
1002 static void registerMPITraceEvents() {
1003 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1004     traceRegisterUserEvent("MPI_Barrier", 10);
1005     traceRegisterUserEvent("MPI_Send", 20);
1006     traceRegisterUserEvent("MPI_Recv", 30);
1007     traceRegisterUserEvent("MPI_Isend", 40);
1008     traceRegisterUserEvent("MPI_Irecv", 50);
1009     traceRegisterUserEvent("MPI_Test", 60);
1010     traceRegisterUserEvent("MPI_Iprobe", 70);
1011 #endif
1012 }
1013
1014 #if MACHINE_DEBUG_LOG
1015 FILE *debugLog = NULL;
1016 #endif
1017
1018 static char *thread_level_tostring(int thread_level) {
1019 #if CMK_MPI_INIT_THREAD
1020     switch (thread_level) {
1021     case MPI_THREAD_SINGLE:
1022         return "MPI_THREAD_SINGLE";
1023     case MPI_THREAD_FUNNELED:
1024         return "MPI_THREAD_FUNNELED";
1025     case MPI_THREAD_SERIALIZED:
1026         return "MPI_THREAD_SERIALIZED";
1027     case MPI_THREAD_MULTIPLE :
1028         return "MPI_THREAD_MULTIPLE ";
1029     default: {
1030         char *str = (char*)malloc(5);
1031         sprintf(str,"%d", thread_level);
1032         return str;
1033     }
1034     }
1035     return  "unknown";
1036 #else
1037     char *str = (char*)malloc(5);
1038     sprintf(str,"%d", thread_level);
1039     return str;
1040 #endif
1041 }
1042
1043 /**
1044  *  Obtain the number of nodes, my node id, and consuming machine layer
1045  *  specific arguments
1046  */
1047 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID) {
1048     int n,i;
1049     int ver, subver;
1050     int provided;
1051     int thread_level;
1052     int myNID;
1053
1054 #if MACHINE_DEBUG
1055     debugLog=NULL;
1056 #endif
1057 #if CMK_USE_HP_MAIN_FIX
1058 #if FOR_CPLUS
1059     _main(argc,argv);
1060 #endif
1061 #endif
1062
1063 #if CMK_MPI_INIT_THREAD
1064 #if CMK_SMP
1065     thread_level = MPI_THREAD_FUNNELED;
1066 #else
1067     thread_level = MPI_THREAD_SINGLE;
1068 #endif
1069     MPI_Init_thread(&argc, &argv, thread_level, &provided);
1070     _thread_provided = provided;
1071 #else
1072     MPI_Init(&argc, &argv);
1073     thread_level = 0;
1074     provided = -1;
1075 #endif
1076     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1077     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1078
1079     myNID = *myNodeID;
1080
1081     MPI_Get_version(&ver, &subver);
1082     if (myNID == 0) {
1083         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1084     }
1085
1086     idleblock = CmiGetArgFlag(argv, "+idleblocking");
1087     if (idleblock && _Cmi_mynode == 0) {
1088         printf("Charm++: Running in idle blocking mode.\n");
1089     }
1090
1091     /* setup signal handlers */
1092     signal(SIGSEGV, KillOnAllSigs);
1093     signal(SIGFPE, KillOnAllSigs);
1094     signal(SIGILL, KillOnAllSigs);
1095     signal_int = signal(SIGINT, KillOnAllSigs);
1096     signal(SIGTERM, KillOnAllSigs);
1097     signal(SIGABRT, KillOnAllSigs);
1098 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1099     signal(SIGQUIT, KillOnAllSigs);
1100     signal(SIGBUS, KillOnAllSigs);
1101 #   endif /*UNIX*/
1102
1103 #if CMK_NO_OUTSTANDING_SENDS
1104     no_outstanding_sends=1;
1105 #endif
1106     if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1107         no_outstanding_sends = 1;
1108         if (myNID == 0)
1109             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1110                    no_outstanding_sends?"":" not");
1111     }
1112
1113     request_max=MAX_QLEN;
1114     CmiGetArgInt(argv,"+requestmax",&request_max);
1115     /*printf("request max=%d\n", request_max);*/
1116
1117 #if MPI_POST_RECV
1118     CmiGetArgInt(argv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1119     CmiGetArgInt(argv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1120     CmiGetArgInt(argv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1121     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1122     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1123     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1124     if (myNID==0) {
1125         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1126                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1127     }
1128 #endif
1129
1130     /* checksum flag */
1131     if (CmiGetArgFlag(argv,"+checksum")) {
1132 #if CMK_ERROR_CHECKING
1133         checksum_flag = 1;
1134         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1135 #else
1136         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1137 #endif
1138     }
1139
1140     {
1141         int debug = CmiGetArgFlag(argv,"++debug");
1142         int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
1143         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1144 #if CMK_HAS_GETPID
1145             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1146             fflush(stdout);
1147             if (!debug_no_pause)
1148                 sleep(15);
1149 #else
1150             printf("++debug ignored.\n");
1151 #endif
1152         }
1153     }
1154
1155     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1156     for (i=0; i<_Cmi_mynodesize+1; i++) {
1157 #if MULTI_SENDQUEUE
1158         procState[i].sendMsgBuf = PCQueueCreate();
1159 #endif
1160         procState[i].recvLock = CmiCreateLock();
1161     }
1162 #if CMK_SMP
1163 #if !MULTI_SENDQUEUE
1164     sendMsgBuf = PCQueueCreate();
1165     sendMsgBufLock = CmiCreateLock();
1166 #endif
1167 #endif
1168 }
1169
1170 static void MachinePreCommonInitForMPI(int everReturn) {
1171 #if MPI_POST_RECV
1172     int doInit = 1;
1173     int i;
1174
1175 #if CMK_SMP
1176     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1177 #endif
1178
1179     /* Currently, in mpi smp, the main thread will be the comm thread, so
1180      *  only the comm thread should post recvs. Cpvs, however, need to be
1181      * created on rank 0 (the ptrs to the actual cpv memory), while
1182      * other ranks are busy waiting for this to finish. So cpv initialize
1183      * routines have to be called on every ranks, although they are only
1184      * useful on comm thread (whose rank is not zero) -Chao Mei
1185      */
1186     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1187     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1188     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1189     CpvInitialize(char*,CmiPostedRecvBuffers);
1190
1191     if (doInit) {
1192         /* Post some extra recvs to help out with incoming messages */
1193         /* On some MPIs the messages are unexpected and thus slow */
1194
1195         /* An array of request handles for posted recvs */
1196         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1197
1198         /* An array of buffers for posted recvs */
1199         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1200
1201         /* Post Recvs */
1202         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1203             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1204                                            MPI_POST_RECV_SIZE,
1205                                            MPI_BYTE,
1206                                            MPI_ANY_SOURCE,
1207                                            POST_RECV_TAG,
1208                                            MPI_COMM_WORLD,
1209                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1210                 CmiAbort("MPI_Irecv failed\n");
1211         }
1212     }
1213 #endif
1214
1215 }
1216
1217 static void MachinePostCommonInitForMPI(int everReturn) {
1218     CmiIdleState *s=CmiNotifyGetState();
1219     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1220
1221 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1222     CpvInitialize(double, projTraceStart);
1223     /* only PE 0 needs to care about registration (to generate sts file). */
1224     if (CmiMyPe() == 0) {
1225         registerMachineUserEventsFunction(&registerMPITraceEvents);
1226     }
1227 #endif
1228
1229 #if CMK_SMP
1230     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1231     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1232 #else
1233     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1234 #endif
1235
1236 #if MACHINE_DEBUG_LOG
1237     if (CmiMyRank() == 0) {
1238         char ln[200];
1239         sprintf(ln,"debugLog.%d",CmiMyNode());
1240         debugLog=fopen(ln,"w");
1241     }
1242 #endif
1243 }
1244 /* ######End of functions related with starting programs###### */
1245
1246 /***********************************************************************
1247  *
1248  * Abort function:
1249  *
1250  ************************************************************************/
1251
1252 void CmiAbort(const char *message) {
1253     char *m;
1254     /* if CharmDebug is attached simply try to send a message to it */
1255 #if CMK_CCS_AVAILABLE
1256     if (CpvAccess(cmiArgDebugFlag)) {
1257         CpdNotify(CPD_ABORT, message);
1258         CpdFreeze();
1259     }
1260 #endif
1261     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1262              "Reason: %s\n",CmiMyPe(),message);
1263     /*  CmiError(message); */
1264     CmiPrintStackTrace(0);
1265     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1266     CmiSetHandler(m, machine_exit_idx);
1267     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1268     machine_exit(m);
1269     /* Program never reaches here */
1270     MPI_Abort(MPI_COMM_WORLD, 1);
1271 }
1272
1273 /**************************  TIMER FUNCTIONS **************************/
1274 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1275
1276 /* MPI calls are not threadsafe, even the timer on some machines */
1277 static CmiNodeLock  timerLock = 0;
1278 static int _absoluteTime = 0;
1279 static double starttimer = 0;
1280 static int _is_global = 0;
1281
1282 int CmiTimerIsSynchronized() {
1283     int  flag;
1284     void *v;
1285
1286     /*  check if it using synchronized timer */
1287     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1288         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1289     if (flag) {
1290         _is_global = *(int*)v;
1291         if (_is_global && CmiMyPe() == 0)
1292             printf("Charm++> MPI timer is synchronized\n");
1293     }
1294     return _is_global;
1295 }
1296
1297 int CmiTimerAbsolute() {
1298     return _absoluteTime;
1299 }
1300
1301 double CmiStartTimer() {
1302     return 0.0;
1303 }
1304
1305 double CmiInitTime() {
1306     return starttimer;
1307 }
1308
1309 void CmiTimerInit(char **argv) {
1310     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1311     if (_absoluteTime && CmiMyPe() == 0)
1312         printf("Charm++> absolute MPI timer is used\n");
1313
1314     _is_global = CmiTimerIsSynchronized();
1315
1316     if (_is_global) {
1317         if (CmiMyRank() == 0) {
1318             double minTimer;
1319 #if CMK_TIMER_USE_XT3_DCLOCK
1320             starttimer = dclock();
1321 #else
1322             starttimer = MPI_Wtime();
1323 #endif
1324
1325             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1326                           MPI_COMM_WORLD );
1327             starttimer = minTimer;
1328         }
1329     } else { /* we don't have a synchronous timer, set our own start time */
1330         CmiBarrier();
1331         CmiBarrier();
1332         CmiBarrier();
1333 #if CMK_TIMER_USE_XT3_DCLOCK
1334         starttimer = dclock();
1335 #else
1336         starttimer = MPI_Wtime();
1337 #endif
1338     }
1339
1340 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1341     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1342         timerLock = CmiCreateLock();
1343 #endif
1344     CmiNodeAllBarrier();          /* for smp */
1345 }
1346
1347 /**
1348  * Since the timerLock is never created, and is
1349  * always NULL, then all the if-condition inside
1350  * the timer functions could be disabled right
1351  * now in the case of SMP. --Chao Mei
1352  */
1353 double CmiTimer(void) {
1354     double t;
1355 #if 0 && CMK_SMP
1356     if (timerLock) CmiLock(timerLock);
1357 #endif
1358
1359 #if CMK_TIMER_USE_XT3_DCLOCK
1360     t = dclock();
1361 #else
1362     t = MPI_Wtime();
1363 #endif
1364
1365 #if 0 && CMK_SMP
1366     if (timerLock) CmiUnlock(timerLock);
1367 #endif
1368
1369     return _absoluteTime?t: (t-starttimer);
1370 }
1371
1372 double CmiWallTimer(void) {
1373     double t;
1374 #if 0 && CMK_SMP
1375     if (timerLock) CmiLock(timerLock);
1376 #endif
1377
1378 #if CMK_TIMER_USE_XT3_DCLOCK
1379     t = dclock();
1380 #else
1381     t = MPI_Wtime();
1382 #endif
1383
1384 #if 0 && CMK_SMP
1385     if (timerLock) CmiUnlock(timerLock);
1386 #endif
1387
1388     return _absoluteTime? t: (t-starttimer);
1389 }
1390
1391 double CmiCpuTimer(void) {
1392     double t;
1393 #if 0 && CMK_SMP
1394     if (timerLock) CmiLock(timerLock);
1395 #endif
1396 #if CMK_TIMER_USE_XT3_DCLOCK
1397     t = dclock() - starttimer;
1398 #else
1399     t = MPI_Wtime() - starttimer;
1400 #endif
1401 #if 0 && CMK_SMP
1402     if (timerLock) CmiUnlock(timerLock);
1403 #endif
1404     return t;
1405 }
1406
1407 #endif
1408
1409 /************Barrier Related Functions****************/
1410 /* must be called on all ranks including comm thread in SMP */
1411 int CmiBarrier() {
1412 #if CMK_SMP
1413     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1414     CmiNodeAllBarrier();
1415     if (CmiMyRank() == CmiMyNodeSize())
1416 #else
1417     if (CmiMyRank() == 0)
1418 #endif
1419     {
1420         /**
1421          *  The call of CmiBarrier is usually before the initialization
1422          *  of trace module of Charm++, therefore, the START_EVENT
1423          *  and END_EVENT are disabled here. -Chao Mei
1424          */
1425         /*START_EVENT();*/
1426
1427         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1428             CmiAbort("Timernit: MPI_Barrier failed!\n");
1429
1430         /*END_EVENT(10);*/
1431     }
1432     CmiNodeAllBarrier();
1433     return 0;
1434 }
1435
1436 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1437 int CmiBarrierZero() {
1438     int i;
1439 #if CMK_SMP
1440     if (CmiMyRank() == CmiMyNodeSize())
1441 #else
1442     if (CmiMyRank() == 0)
1443 #endif
1444     {
1445         char msg[1];
1446         MPI_Status sts;
1447         if (CmiMyNode() == 0)  {
1448             for (i=0; i<CmiNumNodes()-1; i++) {
1449                 START_EVENT();
1450
1451                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1452                     CmiPrintf("MPI_Recv failed!\n");
1453
1454                 END_EVENT(30);
1455             }
1456         } else {
1457             START_EVENT();
1458
1459             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1460                 printf("MPI_Send failed!\n");
1461
1462             END_EVENT(20);
1463         }
1464     }
1465     CmiNodeAllBarrier();
1466     return 0;
1467 }
1468
1469 /*@}*/
1470