Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {
36     Sleep(1000*secs);
37 }
38 #else
39 #include <unistd.h> /*For getpid()*/
40 #endif
41 #include <stdlib.h> /*For sleep()*/
42
43 #include "machine.h"
44 #include "pcqueue.h"
45
46 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
47 /* Whether to use multiple send queue in SMP mode */
48 #define MULTI_SENDQUEUE    0
49
50 /* ###Beginning of flow control related macros ### */
51 #define CMI_EXERT_SEND_CAP 0
52 #define CMI_EXERT_RECV_CAP 0
53
54 #define CMI_DYNAMIC_EXERT_CAP 0
55 /* This macro defines the max number of msgs in the sender msg buffer
56  * that is allowed for recving operation to continue
57  */
58 static int CMI_DYNAMIC_OUTGOING_THRESHOLD=4;
59 #define CMI_DYNAMIC_MAXCAPSIZE 1000
60 static int CMI_DYNAMIC_SEND_CAPSIZE=4;
61 static int CMI_DYNAMIC_RECV_CAPSIZE=3;
62 /* initial values, -1 indiates there's no cap */
63 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
64 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 3
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73 /* ###End of flow control related macros ### */
74
75 /* ###Beginning of machine-layer-tracing related macros ### */
76 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
77 #define CMI_MPI_TRACE_MOREDETAILED 0
78 #undef CMI_MPI_TRACE_USEREVENTS
79 #define CMI_MPI_TRACE_USEREVENTS 1
80 #else
81 #undef CMK_SMP_TRACE_COMMTHREAD
82 #define CMK_SMP_TRACE_COMMTHREAD 0
83 #endif
84
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
87 #undef CMI_MPI_TRACE_USEREVENTS
88 #define CMI_MPI_TRACE_USEREVENTS 1
89 #else
90 #undef CMK_TRACE_COMMOVERHEAD
91 #define CMK_TRACE_COMMOVERHEAD 0
92 #endif
93
94 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
95 CpvStaticDeclare(double, projTraceStart);
96 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
97 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
98 #else
99 #define  START_EVENT()
100 #define  END_EVENT(x)
101 #endif
102 /* ###End of machine-layer-tracing related macros ### */
103
104 /* ###Beginning of POST_RECV related macros ### */
105 /*
106  * If MPI_POST_RECV is defined, we provide default values for
107  * size and number of posted recieves. If MPI_POST_RECV_COUNT
108  * is set then a default value for MPI_POST_RECV_SIZE is used
109  * if not specified by the user.
110  */
111 #define MPI_POST_RECV 0
112
113 /* Making those parameters configurable for testing them easily */
114
115 #if MPI_POST_RECV
116 static int MPI_POST_RECV_COUNT=10;
117 static int MPI_POST_RECV_LOWERSIZE=2000;
118 static int MPI_POST_RECV_UPPERSIZE=4000;
119 static int MPI_POST_RECV_SIZE;
120
121 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
122 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
123 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
124 CpvDeclare(char*,CmiPostedRecvBuffers);
125 #endif
126
127 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
128 #define TAG     1375
129 #if MPI_POST_RECV
130 #define POST_RECV_TAG       (TAG+1)
131 #define BARRIER_ZERO_TAG  TAG
132 #else
133 #define BARRIER_ZERO_TAG   (TAG-1)
134 #endif
135 /* ###End of POST_RECV related related macros ### */
136
137 #if CMK_BLUEGENEL
138 #define MAX_QLEN 8
139 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
140 #else
141 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
142 #define MAX_QLEN 200
143 #endif
144 /* =======End of Definitions of Performance-Specific Macros =======*/
145
146
147 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
148 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
149 #define CHARM_MAGIC_NUMBER               126
150
151 #if CMK_ERROR_CHECKING
152 extern unsigned char computeCheckSum(unsigned char *data, int len);
153 static int checksum_flag = 0;
154 #define CMI_SET_CHECKSUM(msg, len)      \
155         if (checksum_flag)  {   \
156           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
157           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
158         }
159 #define CMI_CHECK_CHECKSUM(msg, len)    \
160         if (checksum_flag)      \
161           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
162             CmiAbort("Fatal error: checksum doesn't agree!\n");
163 #else
164 #define CMI_SET_CHECKSUM(msg, len)
165 #define CMI_CHECK_CHECKSUM(msg, len)
166 #endif
167 /* =====End of Definitions of Message-Corruption Related Macros=====*/
168
169
170 /* =====Beginning of Declarations of Machine Specific Variables===== */
171 #include <signal.h>
172 void (*signal_int)(int);
173
174 static int _thread_provided = -1; /* Indicating MPI thread level */
175 static int idleblock = 0;
176
177 /* A simple list for msgs that have been sent by MPI_Isend */
178 typedef struct msg_list {
179     char *msg;
180     struct msg_list *next;
181     int size, destpe;
182 #if CMK_SMP_TRACE_COMMTHREAD
183     int srcpe;
184 #endif
185     MPI_Request req;
186 } SMSG_LIST;
187
188 static SMSG_LIST *sent_msgs=0;
189 static SMSG_LIST *end_sent=0;
190
191 int MsgQueueLen=0;
192 static int request_max;
193 /*FLAG: consume outstanding Isends in scheduler loop*/
194 static int no_outstanding_sends=0;
195
196 #if NODE_0_IS_CONVHOST
197 int inside_comm = 0;
198 #endif
199
200 typedef struct ProcState {
201 #if MULTI_SENDQUEUE
202     PCQueue      sendMsgBuf;       /* per processor message sending queue */
203 #endif
204     CmiNodeLock  recvLock;                  /* for cs->recv */
205 } ProcState;
206 static ProcState  *procState;
207
208 #if CMK_SMP && !MULTI_SENDQUEUE
209 static PCQueue sendMsgBuf;
210 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
211 #endif
212 /* =====End of Declarations of Machine Specific Variables===== */
213
214
215 /* =====Beginning of Declarations of Machine Specific Functions===== */
216 /* Utility functions */
217 #if CMK_BLUEGENEL
218 extern void MPID_Progress_test();
219 #endif
220 static size_t CmiAllAsyncMsgsSent(void);
221 static void CmiReleaseSentMessages(void);
222 static int PumpMsgs(void);
223 static void PumpMsgsBlocking(void);
224
225 #if CMK_SMP
226 static int MsgQueueEmpty();
227 static int RecvQueueEmpty();
228 static int SendMsgBuf();
229 static  void EnqueueMsg(void *m, int size, int node);
230 #endif
231
232 /* The machine-specific send function */
233 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
234 #define CmiMachineSpecificSendFunc MachineSpecificSendForMPI
235
236 /* ### Beginning of Machine-startup Related Functions ### */
237 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID);
238 #define MachineSpecificInit MachineInitForMPI
239
240 static void MachinePreCommonInitForMPI(int everReturn);
241 static void MachinePostCommonInitForMPI(int everReturn);
242 #define MachineSpecificPreCommonInit MachinePreCommonInitForMPI
243 #define MachineSpecificPostCommonInit MachinePostCommonInitForMPI
244 /* ### End of Machine-startup Related Functions ### */
245
246 /* ### Beginning of Machine-running Related Functions ### */
247 static void AdvanceCommunicationForMPI();
248 #define MachineSpecificAdvanceCommunication AdvanceCommunicationForMPI
249
250 static void DrainResourcesForMPI(); /* used when exit */
251 #define MachineSpecificDrainResources DrainResourcesForMPI
252
253 static void MachineExitForMPI();
254 #define MachineSpecificExit MachineExitForMPI
255 /* ### End of Machine-running Related Functions ### */
256
257 /* ### Beginning of Idle-state Related Functions ### */
258 void CmiNotifyIdleForMPI(void);
259 /* ### End of Idle-state Related Functions ### */
260
261 void MachinePostNonLocalForMPI();
262 #define MachineSpecificPostNonLocal MachinePostNonLocalForMPI
263
264 /* =====End of Declarations of Machine Specific Functions===== */
265
266 /**
267  *  Macros that overwrites the common codes, such as
268  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
269  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
270  *  CMK_OFFLOAD_BCAST_PROCESS etc.
271  */
272 #define CMK_HAS_SIZE_IN_MSGHDR 0
273 #include "machine-common.c"
274
275 /* The machine specific msg-sending function */
276
277 #if CMK_SMP
278 static void EnqueueMsg(void *m, int size, int node) {
279     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
280     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
281     msg_tmp->msg = m;
282     msg_tmp->size = size;
283     msg_tmp->destpe = node;
284     msg_tmp->next = 0;
285
286 #if CMK_SMP_TRACE_COMMTHREAD
287     msg_tmp->srcpe = CmiMyPe();
288 #endif
289
290 #if MULTI_SENDQUEUE
291     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
292 #else
293     /*CmiLock(sendMsgBufLock);*/
294     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
295     /*CmiUnlock(sendMsgBufLock);*/
296 #endif
297
298     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
299 }
300 #endif
301
302 /* The function that calls MPI_Isend so that both non-SMP and SMP could use */
303 static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
304     int node = smsg->destpe;
305     int size = smsg->size;
306     char *msg = smsg->msg;
307
308 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
309     while (MsgQueueLen > request_max) {
310         CmiReleaseSentMessages();
311         PumpMsgs();
312     }
313 #endif
314
315     MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
316 #if CMK_ERROR_CHECKING
317     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
318     CMI_SET_CHECKSUM(msg, size);
319 #endif
320
321 #if MPI_POST_RECV
322     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
323         START_EVENT();
324         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(smsg->req)))
325             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
326         /*END_EVENT(40);*/
327     } else {
328         START_EVENT();
329         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
330             CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
331         /*END_EVENT(40);*/
332     }
333 #else
334     START_EVENT();
335     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
336         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
337     /*END_EVENT(40);*/
338 #endif
339
340 #if CMK_SMP_TRACE_COMMTHREAD
341     traceBeginCommOp(msg);
342     traceChangeLastTimestamp(CpvAccess(projTraceStart));
343     /* traceSendMsgComm must execute after traceBeginCommOp because
344          * we pretend we execute an entry method, and inside this we
345          * pretend we will send another message. Otherwise how could
346          * a message creation just before an entry method invocation?
347          * If such logic is broken, the projections will not trace
348          * messages correctly! -Chao Mei
349          */
350     traceSendMsgComm(msg);
351     traceEndCommOp(msg);
352 #if CMI_MPI_TRACE_MOREDETAILED
353     char tmp[64];
354     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", smsg->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
355     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
356 #endif
357 #endif
358
359     MACHSTATE(3,"}MPI_send end");
360     MsgQueueLen++;
361     if (sent_msgs==0)
362         sent_msgs = smsg;
363     else
364         end_sent->next = smsg;
365     end_sent = smsg;
366     return (CmiCommHandle) &(smsg->req);
367 }
368
369 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
370     /* Ignoring the mode for MPI layer */
371
372     CmiState cs = CmiGetState();
373     SMSG_LIST *msg_tmp;
374     int  rank;
375
376     CmiAssert(destNode != CmiMyNode());
377 #if CMK_SMP
378     EnqueueMsg(msg, size, destNode);
379     return 0;
380 #else
381     /* non smp */
382     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
383     msg_tmp->msg = msg;
384     msg_tmp->destpe = destNode;
385     msg_tmp->size = size;
386     msg_tmp->next = 0;
387     return MPISendOneMsg(msg_tmp);
388 #endif
389 }
390
391 static size_t CmiAllAsyncMsgsSent(void) {
392     SMSG_LIST *msg_tmp = sent_msgs;
393     MPI_Status sts;
394     int done;
395
396     while (msg_tmp!=0) {
397         done = 0;
398         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
399             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
400         if (!done)
401             return 0;
402         msg_tmp = msg_tmp->next;
403         /*    MsgQueueLen--; ????? */
404     }
405     return 1;
406 }
407
408 int CmiAsyncMsgSent(CmiCommHandle c) {
409
410     SMSG_LIST *msg_tmp = sent_msgs;
411     int done;
412     MPI_Status sts;
413
414     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
415         msg_tmp = msg_tmp->next;
416     if (msg_tmp) {
417         done = 0;
418         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
419             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
420         return ((done)?1:0);
421     } else {
422         return 1;
423     }
424 }
425
426 void CmiReleaseCommHandle(CmiCommHandle c) {
427     return;
428 }
429
430 /* ######Beginning of functions related with communication progress ###### */
431 static void CmiReleaseSentMessages(void) {
432     SMSG_LIST *msg_tmp=sent_msgs;
433     SMSG_LIST *prev=0;
434     SMSG_LIST *temp;
435     int done;
436     MPI_Status sts;
437
438 #if CMK_BLUEGENEL
439     MPID_Progress_test();
440 #endif
441
442     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
443     while (msg_tmp!=0) {
444         done =0;
445 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
446         double startT = CmiWallTimer();
447 #endif
448         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
449             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
450         if (done) {
451             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
452             MsgQueueLen--;
453             /* Release the message */
454             temp = msg_tmp->next;
455             if (prev==0) /* first message */
456                 sent_msgs = temp;
457             else
458                 prev->next = temp;
459             CmiFree(msg_tmp->msg);
460             CmiFree(msg_tmp);
461             msg_tmp = temp;
462         } else {
463             prev = msg_tmp;
464             msg_tmp = msg_tmp->next;
465         }
466 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
467         {
468             double endT = CmiWallTimer();
469             /* only record the event if it takes more than 1ms */
470             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
471         }
472 #endif
473     }
474     end_sent = prev;
475     MACHSTATE(2,"} CmiReleaseSentMessages end");
476 }
477
478 static int PumpMsgs(void) {
479     int nbytes, flg, res;
480     char *msg;
481     MPI_Status sts;
482     int recd=0;
483
484 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
485     int recvCnt=0;
486 #endif
487
488 #if CMK_BLUEGENEL
489     MPID_Progress_test();
490 #endif
491
492     MACHSTATE(2,"PumpMsgs begin {");
493
494 #if CMI_DYNAMIC_EXERT_CAP
495     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
496 #endif
497
498     while (1) {
499 #if CMI_EXERT_RECV_CAP
500         if (recvCnt==RECV_CAP) break;
501 #elif CMI_DYNAMIC_EXERT_CAP
502         if (recvCnt >= dynamicRecvCap) break;
503 #endif
504
505         /* First check posted recvs then do  probe unmatched outstanding messages */
506 #if MPI_POST_RECV
507         int completed_index=-1;
508         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
509             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
510         if (flg) {
511             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
512                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
513
514             recd = 1;
515             msg = (char *) CmiAlloc(nbytes);
516             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
517             /* and repost the recv */
518
519             START_EVENT();
520
521             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
522                                            MPI_POST_RECV_SIZE,
523                                            MPI_BYTE,
524                                            MPI_ANY_SOURCE,
525                                            POST_RECV_TAG,
526                                            MPI_COMM_WORLD,
527                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
528                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
529
530             END_EVENT(50);
531
532             CpvAccess(Cmi_posted_recv_total)++;
533         } else {
534             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
535             if (res != MPI_SUCCESS)
536                 CmiAbort("MPI_Iprobe failed\n");
537             if (!flg) break;
538             recd = 1;
539             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
540             msg = (char *) CmiAlloc(nbytes);
541
542             START_EVENT();
543
544             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
545                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
546
547             END_EVENT(30);
548
549             CpvAccess(Cmi_unposted_recv_total)++;
550         }
551 #else
552         /* Original version */
553 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
554         double startT = CmiWallTimer();
555 #endif
556         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
557         if (res != MPI_SUCCESS)
558             CmiAbort("MPI_Iprobe failed\n");
559
560         if (!flg) break;
561 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
562         {
563             double endT = CmiWallTimer();
564             /* only trace the probe that last longer than 1ms */
565             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
566         }
567 #endif
568
569         recd = 1;
570         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
571         msg = (char *) CmiAlloc(nbytes);
572
573         START_EVENT();
574
575         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
576             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
577
578         /*END_EVENT(30);*/
579
580 #endif
581
582 #if CMK_SMP_TRACE_COMMTHREAD
583         traceBeginCommOp(msg);
584         traceChangeLastTimestamp(CpvAccess(projTraceStart));
585         traceEndCommOp(msg);
586 #if CMI_MPI_TRACE_MOREDETAILED
587         char tmp[32];
588         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
589         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
590 #endif
591 #elif CMK_TRACE_COMMOVERHEAD
592         char tmp[32];
593         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
594         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
595 #endif
596
597
598         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
599         CMI_CHECK_CHECKSUM(msg, nbytes);
600 #if CMK_ERROR_CHECKING
601         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
602             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
603             CmiFree(msg);
604             CmiAbort("Abort!\n");
605             continue;
606         }
607 #endif
608
609         handleOneRecvedMsg(nbytes, msg);
610
611 #if CMI_EXERT_RECV_CAP
612         recvCnt++;
613 #elif CMI_DYNAMIC_EXERT_CAP
614         recvCnt++;
615 #if CMK_SMP
616         /* check sendMsgBuf to get the  number of messages that have not been sent
617              * which is only available in SMP mode
618          * MsgQueueLen indicates the number of messages that have not been released
619              * by MPI
620              */
621         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
622                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
623             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
624         }
625 #else
626         /* MsgQueueLen indicates the number of messages that have not been released
627              * by MPI
628              */
629         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
630             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
631         }
632 #endif
633
634 #endif
635
636     }
637
638     MACHSTATE(2,"} PumpMsgs end ");
639     return recd;
640 }
641
642 /* blocking version */
643 static void PumpMsgsBlocking(void) {
644     static int maxbytes = 20000000;
645     static char *buf = NULL;
646     int nbytes, flg;
647     MPI_Status sts;
648     char *msg;
649     int recd=0;
650
651     if (!PCQueueEmpty(CmiGetState()->recv)) return;
652     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
653     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
654     if (sent_msgs)  return;
655
656 #if 0
657     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
658 #endif
659
660     if (buf == NULL) {
661         buf = (char *) CmiAlloc(maxbytes);
662         _MEMCHECK(buf);
663     }
664
665
666 #if MPI_POST_RECV
667 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
668     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
669 #endif
670
671     START_EVENT();
672
673     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
674         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
675
676     /*END_EVENT(30);*/
677
678     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
679     msg = (char *) CmiAlloc(nbytes);
680     memcpy(msg, buf, nbytes);
681
682 #if CMK_SMP_TRACE_COMMTHREAD
683     traceBeginCommOp(msg);
684     traceChangeLastTimestamp(CpvAccess(projTraceStart));
685     traceEndCommOp(msg);
686 #if CMI_MPI_TRACE_MOREDETAILED
687     char tmp[32];
688     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
689     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
690 #endif
691 #endif
692
693     handleOneRecvedMsg(nbytes, msg);
694 }
695
696
697 #if CMK_SMP
698
699 /* called by communication thread in SMP */
700 static int SendMsgBuf() {
701     SMSG_LIST *msg_tmp;
702     char *msg;
703     int node, rank, size;
704     int i;
705     int sent = 0;
706
707 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
708     int sentCnt = 0;
709 #endif
710
711 #if CMI_DYNAMIC_EXERT_CAP
712     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
713 #endif
714
715     MACHSTATE(2,"SendMsgBuf begin {");
716 #if MULTI_SENDQUEUE
717     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
718         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
719             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
720 #else
721     /* single message sending queue */
722     /* CmiLock(sendMsgBufLock); */
723     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
724     /* CmiUnlock(sendMsgBufLock); */
725     while (NULL != msg_tmp) {
726 #endif
727             MPISendOneMsg(msg_tmp);
728             sent=1;
729
730 #if CMI_EXERT_SEND_CAP
731             if (++sentCnt == SEND_CAP) break;
732 #elif CMI_DYNAMIC_EXERT_CAP
733             if (++sentCnt >= dynamicSendCap) break;
734             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
735                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
736 #endif
737
738 #if ! MULTI_SENDQUEUE
739             /* CmiLock(sendMsgBufLock); */
740             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
741             /* CmiUnlock(sendMsgBufLock); */
742 #endif
743         }
744 #if MULTI_SENDQUEUE
745     }
746 #endif
747     MACHSTATE(2,"}SendMsgBuf end ");
748     return sent;
749 }
750
751 static int MsgQueueEmpty() {
752     int i;
753 #if MULTI_SENDQUEUE
754     for (i=0; i<_Cmi_mynodesize; i++)
755         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
756 #else
757     return PCQueueEmpty(sendMsgBuf);
758 #endif
759     return 1;
760 }
761
762 /* test if all processors recv queues are empty */
763 static int RecvQueueEmpty() {
764     int i;
765     for (i=0; i<_Cmi_mynodesize; i++) {
766         CmiState cs=CmiGetStateN(i);
767         if (!PCQueueEmpty(cs->recv)) return 0;
768     }
769     return 1;
770 }
771
772
773 #define REPORT_COMM_METRICS 0
774 #if REPORT_COMM_METRICS
775 static double pumptime = 0.0;
776                          static double releasetime = 0.0;
777                                                      static double sendtime = 0.0;
778 #endif
779
780 #endif //end of CMK_SMP
781
782 static void AdvanceCommunicationForMPI() {
783 #if REPORT_COMM_METRICS
784     double t1, t2, t3, t4;
785     t1 = CmiWallTimer();
786 #endif
787
788 #if CMK_SMP
789     PumpMsgs();
790
791 #if REPORT_COMM_METRICS
792     t2 = CmiWallTimer();
793 #endif
794
795     CmiReleaseSentMessages();
796 #if REPORT_COMM_METRICS
797     t3 = CmiWallTimer();
798 #endif
799
800     SendMsgBuf();
801
802 #if REPORT_COMM_METRICS
803     t4 = CmiWallTimer();
804     pumptime += (t2-t1);
805     releasetime += (t3-t2);
806     sendtime += (t4-t3);
807 #endif
808
809 #else /* non-SMP case */
810     CmiReleaseSentMessages();
811
812 #if REPORT_COMM_METRICS
813     t2 = CmiWallTimer();
814 #endif
815     PumpMsgs();
816
817 #if REPORT_COMM_METRICS
818     t3 = CmiWallTimer();
819     pumptime += (t3-t2);
820     releasetime += (t2-t1);
821 #endif
822
823 #endif /* end of #if CMK_SMP */
824 }
825 /* ######End of functions related with communication progress ###### */
826
827 void MachinePostNonLocalForMPI() {
828 #if !CMK_SMP
829     if (no_outstanding_sends) {
830         while (MsgQueueLen>0) {
831             AdvanceCommunicationForMPI();
832         }
833     }
834
835     /* FIXME: I don't think the following codes are needed because
836      * it repeats the same job of the next call of CmiGetNonLocal
837      */
838 #if 0
839     if (!msg) {
840         CmiReleaseSentMessages();
841         if (PumpMsgs())
842             return  PCQueuePop(cs->recv);
843         else
844             return 0;
845     }
846 #endif
847 #endif
848 }
849
850 /* Idle-state related functions: called in non-smp mode */
851 void CmiNotifyIdleForMPI(void) {
852     CmiReleaseSentMessages();
853     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
854 }
855
856 /* Network progress function is used to poll the network when for
857    messages. This flushes receive buffers on some  implementations*/
858 #if CMK_MACHINE_PROGRESS_DEFINED
859 void CmiMachineProgressImpl() {
860 #if !CMK_SMP
861     PumpMsgs();
862 #if CMK_IMMEDIATE_MSG
863     CmiHandleImmediate();
864 #endif
865 #else
866     /*Not implemented yet. Communication server does not seem to be
867       thread safe, so only communication thread call it */
868     if (CmiMyRank() == CmiMyNodeSize())
869         CommunicationServerThread(0);
870 #endif
871 }
872 #endif
873
874 /* ######Beginning of functions related with exiting programs###### */
875 void DrainResourcesForMPI() {
876 #if !CMK_SMP
877     while (!CmiAllAsyncMsgsSent()) {
878         PumpMsgs();
879         CmiReleaseSentMessages();
880     }
881 #else
882     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
883         CmiReleaseSentMessages();
884         SendMsgBuf();
885         PumpMsgs();
886     }
887 #endif
888     MACHSTATE(2, "Machine exit barrier begin {");
889     START_EVENT();
890     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
891         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
892     END_EVENT(10);
893     MACHSTATE(2, "} Machine exit barrier end");
894 }
895
896 void MachineExitForMPI(void) {
897 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
898     int doPrint = 0;
899 #if CMK_SMP
900     if (CmiMyNode()==0) doPrint = 1;
901 #else
902     if (CmiMyPe()==0) doPrint = 1;
903 #endif
904
905     if (doPrint) {
906 #if MPI_POST_RECV
907         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
908 #endif
909     }
910 #endif
911
912 #if REPORT_COMM_METRICS
913 #if CMK_SMP
914     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
915               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
916               pumptime, releasetime, sendtime);
917 #else
918     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
919               CmiMyPe(), pumptime, releasetime, sendtime);
920 #endif
921 #endif
922
923 #if ! CMK_AUTOBUILD
924     signal(SIGINT, signal_int);
925     MPI_Finalize();
926 #endif
927     exit(0);
928 }
929
930 static int machine_exit_idx;
931 static void machine_exit(char *m) {
932     EmergencyExit();
933     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
934     fflush(stdout);
935     CmiNodeBarrier();
936     if (CmiMyRank() == 0) {
937         MPI_Barrier(MPI_COMM_WORLD);
938         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
939         MPI_Abort(MPI_COMM_WORLD, 1);
940     } else {
941         while (1) CmiYield();
942     }
943 }
944
945 static void KillOnAllSigs(int sigNo) {
946     static int already_in_signal_handler = 0;
947     char *m;
948     if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
949     already_in_signal_handler = 1;
950 #if CMK_CCS_AVAILABLE
951     if (CpvAccess(cmiArgDebugFlag)) {
952         CpdNotify(CPD_SIGNAL, sigNo);
953         CpdFreeze();
954     }
955 #endif
956     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
957              "Signal: %d\n",CmiMyPe(),sigNo);
958     CmiPrintStackTrace(1);
959
960     m = CmiAlloc(CmiMsgHeaderSizeBytes);
961     CmiSetHandler(m, machine_exit_idx);
962     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
963     machine_exit(m);
964 }
965 /* ######End of functions related with exiting programs###### */
966
967
968 /* ######Beginning of functions related with starting programs###### */
969 static void registerMPITraceEvents() {
970 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
971     traceRegisterUserEvent("MPI_Barrier", 10);
972     traceRegisterUserEvent("MPI_Send", 20);
973     traceRegisterUserEvent("MPI_Recv", 30);
974     traceRegisterUserEvent("MPI_Isend", 40);
975     traceRegisterUserEvent("MPI_Irecv", 50);
976     traceRegisterUserEvent("MPI_Test", 60);
977     traceRegisterUserEvent("MPI_Iprobe", 70);
978 #endif
979 }
980
981 #if MACHINE_DEBUG_LOG
982 FILE *debugLog = NULL;
983 #endif
984
985 static char *thread_level_tostring(int thread_level) {
986 #if CMK_MPI_INIT_THREAD
987     switch (thread_level) {
988     case MPI_THREAD_SINGLE:
989         return "MPI_THREAD_SINGLE";
990     case MPI_THREAD_FUNNELED:
991         return "MPI_THREAD_FUNNELED";
992     case MPI_THREAD_SERIALIZED:
993         return "MPI_THREAD_SERIALIZED";
994     case MPI_THREAD_MULTIPLE :
995         return "MPI_THREAD_MULTIPLE ";
996     default: {
997         char *str = (char*)malloc(5);
998         sprintf(str,"%d", thread_level);
999         return str;
1000     }
1001     }
1002     return  "unknown";
1003 #else
1004     char *str = (char*)malloc(5);
1005     sprintf(str,"%d", thread_level);
1006     return str;
1007 #endif
1008 }
1009
1010 /**
1011  *  Obtain the number of nodes, my node id, and consuming machine layer
1012  *  specific arguments
1013  */
1014 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID) {
1015     int n,i;
1016     int ver, subver;
1017     int provided;
1018     int thread_level;
1019     int myNID;
1020
1021 #if MACHINE_DEBUG
1022     debugLog=NULL;
1023 #endif
1024 #if CMK_USE_HP_MAIN_FIX
1025 #if FOR_CPLUS
1026     _main(argc,argv);
1027 #endif
1028 #endif
1029
1030 #if CMK_MPI_INIT_THREAD
1031 #if CMK_SMP
1032     thread_level = MPI_THREAD_FUNNELED;
1033 #else
1034     thread_level = MPI_THREAD_SINGLE;
1035 #endif
1036     MPI_Init_thread(&argc, &argv, thread_level, &provided);
1037     _thread_provided = provided;
1038 #else
1039     MPI_Init(&argc, &argv);
1040     thread_level = 0;
1041     provided = -1;
1042 #endif
1043     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1044     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1045
1046     myNID = *myNodeID;
1047
1048     MPI_Get_version(&ver, &subver);
1049     if (myNID == 0) {
1050         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1051     }
1052
1053     idleblock = CmiGetArgFlag(argv, "+idleblocking");
1054     if (idleblock && _Cmi_mynode == 0) {
1055         printf("Charm++: Running in idle blocking mode.\n");
1056     }
1057
1058     /* setup signal handlers */
1059     signal(SIGSEGV, KillOnAllSigs);
1060     signal(SIGFPE, KillOnAllSigs);
1061     signal(SIGILL, KillOnAllSigs);
1062     signal_int = signal(SIGINT, KillOnAllSigs);
1063     signal(SIGTERM, KillOnAllSigs);
1064     signal(SIGABRT, KillOnAllSigs);
1065 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1066     signal(SIGQUIT, KillOnAllSigs);
1067     signal(SIGBUS, KillOnAllSigs);
1068 #   endif /*UNIX*/
1069
1070 #if CMK_NO_OUTSTANDING_SENDS
1071     no_outstanding_sends=1;
1072 #endif
1073     if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1074         no_outstanding_sends = 1;
1075         if (myNID == 0)
1076             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1077                    no_outstanding_sends?"":" not");
1078     }
1079
1080     request_max=MAX_QLEN;
1081     CmiGetArgInt(argv,"+requestmax",&request_max);
1082     /*printf("request max=%d\n", request_max);*/
1083
1084 #if MPI_POST_RECV
1085     CmiGetArgInt(argv, "+postRecvCnt", &MPI_POST_RECV_COUNT);
1086     CmiGetArgInt(argv, "+postRecvLowerSize", &MPI_POST_RECV_LOWERSIZE);
1087     CmiGetArgInt(argv, "+postRecvUpperSize", &MPI_POST_RECV_UPPERSIZE);
1088     if (MPI_POST_RECV_COUNT<=0) MPI_POST_RECV_COUNT=1;
1089     if (MPI_POST_RECV_LOWERSIZE>MPI_POST_RECV_UPPERSIZE) MPI_POST_RECV_UPPERSIZE = MPI_POST_RECV_LOWERSIZE;
1090     MPI_POST_RECV_SIZE = MPI_POST_RECV_UPPERSIZE;
1091     if (myNID==0) {
1092         printf("Charm++: using post-recv scheme with %d pre-posted recvs ranging from %d to %d (bytes)\n",
1093                MPI_POST_RECV_COUNT, MPI_POST_RECV_LOWERSIZE, MPI_POST_RECV_UPPERSIZE);
1094     }
1095 #endif
1096
1097 #if CMI_DYNAMIC_EXERT_CAP
1098     CmiGetArgInt(argv, "+dynCapThreshold", &CMI_DYNAMIC_OUTGOING_THRESHOLD);
1099     CmiGetArgInt(argv, "+dynCapSend", &CMI_DYNAMIC_SEND_CAPSIZE);
1100     CmiGetArgInt(argv, "+dynCapRecv", &CMI_DYNAMIC_RECV_CAPSIZE);
1101     if (myNID==0) {
1102         printf("Charm++: using dynamic flow control with outgoing threshold %d, send cap %d, recv cap %d\n",
1103                CMI_DYNAMIC_OUTGOING_THRESHOLD, CMI_DYNAMIC_SEND_CAPSIZE, CMI_DYNAMIC_RECV_CAPSIZE);
1104     }
1105 #endif
1106
1107     /* checksum flag */
1108     if (CmiGetArgFlag(argv,"+checksum")) {
1109 #if CMK_ERROR_CHECKING
1110         checksum_flag = 1;
1111         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1112 #else
1113         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1114 #endif
1115     }
1116
1117     {
1118         int debug = CmiGetArgFlag(argv,"++debug");
1119         int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
1120         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1121 #if CMK_HAS_GETPID
1122             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1123             fflush(stdout);
1124             if (!debug_no_pause)
1125                 sleep(15);
1126 #else
1127             printf("++debug ignored.\n");
1128 #endif
1129         }
1130     }
1131
1132     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1133     for (i=0; i<_Cmi_mynodesize+1; i++) {
1134 #if MULTI_SENDQUEUE
1135         procState[i].sendMsgBuf = PCQueueCreate();
1136 #endif
1137         procState[i].recvLock = CmiCreateLock();
1138     }
1139 #if CMK_SMP
1140 #if !MULTI_SENDQUEUE
1141     sendMsgBuf = PCQueueCreate();
1142     sendMsgBufLock = CmiCreateLock();
1143 #endif
1144 #endif
1145 }
1146
1147 static void MachinePreCommonInitForMPI(int everReturn) {
1148 #if MPI_POST_RECV
1149     int doInit = 1;
1150     int i;
1151
1152 #if CMK_SMP
1153     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1154 #endif
1155
1156     /* Currently, in mpi smp, the main thread will be the comm thread, so
1157      *  only the comm thread should post recvs. Cpvs, however, need to be
1158      * created on rank 0 (the ptrs to the actual cpv memory), while
1159      * other ranks are busy waiting for this to finish. So cpv initialize
1160      * routines have to be called on every ranks, although they are only
1161      * useful on comm thread (whose rank is not zero) -Chao Mei
1162      */
1163     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1164     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1165     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1166     CpvInitialize(char*,CmiPostedRecvBuffers);
1167
1168     if (doInit) {
1169         /* Post some extra recvs to help out with incoming messages */
1170         /* On some MPIs the messages are unexpected and thus slow */
1171
1172         /* An array of request handles for posted recvs */
1173         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1174
1175         /* An array of buffers for posted recvs */
1176         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1177
1178         /* Post Recvs */
1179         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1180             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1181                                            MPI_POST_RECV_SIZE,
1182                                            MPI_BYTE,
1183                                            MPI_ANY_SOURCE,
1184                                            POST_RECV_TAG,
1185                                            MPI_COMM_WORLD,
1186                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1187                 CmiAbort("MPI_Irecv failed\n");
1188         }
1189     }
1190 #endif
1191
1192 }
1193
1194 static void MachinePostCommonInitForMPI(int everReturn) {
1195     CmiIdleState *s=CmiNotifyGetState();
1196     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1197
1198 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1199     CpvInitialize(double, projTraceStart);
1200     /* only PE 0 needs to care about registration (to generate sts file). */
1201     if (CmiMyPe() == 0) {
1202         registerMachineUserEventsFunction(&registerMPITraceEvents);
1203     }
1204 #endif
1205
1206 #if CMK_SMP
1207     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1208     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1209 #else
1210     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1211 #endif
1212
1213 #if MACHINE_DEBUG_LOG
1214     if (CmiMyRank() == 0) {
1215         char ln[200];
1216         sprintf(ln,"debugLog.%d",CmiMyNode());
1217         debugLog=fopen(ln,"w");
1218     }
1219 #endif
1220 }
1221 /* ######End of functions related with starting programs###### */
1222
1223 /***********************************************************************
1224  *
1225  * Abort function:
1226  *
1227  ************************************************************************/
1228
1229 void CmiAbort(const char *message) {
1230     char *m;
1231     /* if CharmDebug is attached simply try to send a message to it */
1232 #if CMK_CCS_AVAILABLE
1233     if (CpvAccess(cmiArgDebugFlag)) {
1234         CpdNotify(CPD_ABORT, message);
1235         CpdFreeze();
1236     }
1237 #endif
1238     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1239              "Reason: %s\n",CmiMyPe(),message);
1240     /*  CmiError(message); */
1241     CmiPrintStackTrace(0);
1242     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1243     CmiSetHandler(m, machine_exit_idx);
1244     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1245     machine_exit(m);
1246     /* Program never reaches here */
1247     MPI_Abort(MPI_COMM_WORLD, 1);
1248 }
1249
1250 /**************************  TIMER FUNCTIONS **************************/
1251 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1252
1253 /* MPI calls are not threadsafe, even the timer on some machines */
1254 static CmiNodeLock  timerLock = 0;
1255                                 static int _absoluteTime = 0;
1256                                                            static double starttimer = 0;
1257                                                                                       static int _is_global = 0;
1258
1259 int CmiTimerIsSynchronized() {
1260     int  flag;
1261     void *v;
1262
1263     /*  check if it using synchronized timer */
1264     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1265         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1266     if (flag) {
1267         _is_global = *(int*)v;
1268         if (_is_global && CmiMyPe() == 0)
1269             printf("Charm++> MPI timer is synchronized\n");
1270     }
1271     return _is_global;
1272 }
1273
1274 int CmiTimerAbsolute() {
1275     return _absoluteTime;
1276 }
1277
1278 double CmiStartTimer() {
1279     return 0.0;
1280 }
1281
1282 double CmiInitTime() {
1283     return starttimer;
1284 }
1285
1286 void CmiTimerInit(char **argv) {
1287     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1288     if (_absoluteTime && CmiMyPe() == 0)
1289         printf("Charm++> absolute MPI timer is used\n");
1290
1291     _is_global = CmiTimerIsSynchronized();
1292
1293     if (_is_global) {
1294         if (CmiMyRank() == 0) {
1295             double minTimer;
1296 #if CMK_TIMER_USE_XT3_DCLOCK
1297             starttimer = dclock();
1298 #else
1299             starttimer = MPI_Wtime();
1300 #endif
1301
1302             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1303                           MPI_COMM_WORLD );
1304             starttimer = minTimer;
1305         }
1306     } else { /* we don't have a synchronous timer, set our own start time */
1307         CmiBarrier();
1308         CmiBarrier();
1309         CmiBarrier();
1310 #if CMK_TIMER_USE_XT3_DCLOCK
1311         starttimer = dclock();
1312 #else
1313         starttimer = MPI_Wtime();
1314 #endif
1315     }
1316
1317 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1318     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1319         timerLock = CmiCreateLock();
1320 #endif
1321     CmiNodeAllBarrier();          /* for smp */
1322 }
1323
1324 /**
1325  * Since the timerLock is never created, and is
1326  * always NULL, then all the if-condition inside
1327  * the timer functions could be disabled right
1328  * now in the case of SMP. --Chao Mei
1329  */
1330 double CmiTimer(void) {
1331     double t;
1332 #if 0 && CMK_SMP
1333     if (timerLock) CmiLock(timerLock);
1334 #endif
1335
1336 #if CMK_TIMER_USE_XT3_DCLOCK
1337     t = dclock();
1338 #else
1339     t = MPI_Wtime();
1340 #endif
1341
1342 #if 0 && CMK_SMP
1343     if (timerLock) CmiUnlock(timerLock);
1344 #endif
1345
1346     return _absoluteTime?t: (t-starttimer);
1347 }
1348
1349 double CmiWallTimer(void) {
1350     double t;
1351 #if 0 && CMK_SMP
1352     if (timerLock) CmiLock(timerLock);
1353 #endif
1354
1355 #if CMK_TIMER_USE_XT3_DCLOCK
1356     t = dclock();
1357 #else
1358     t = MPI_Wtime();
1359 #endif
1360
1361 #if 0 && CMK_SMP
1362     if (timerLock) CmiUnlock(timerLock);
1363 #endif
1364
1365     return _absoluteTime? t: (t-starttimer);
1366 }
1367
1368 double CmiCpuTimer(void) {
1369     double t;
1370 #if 0 && CMK_SMP
1371     if (timerLock) CmiLock(timerLock);
1372 #endif
1373 #if CMK_TIMER_USE_XT3_DCLOCK
1374     t = dclock() - starttimer;
1375 #else
1376     t = MPI_Wtime() - starttimer;
1377 #endif
1378 #if 0 && CMK_SMP
1379     if (timerLock) CmiUnlock(timerLock);
1380 #endif
1381     return t;
1382 }
1383
1384 #endif
1385
1386 /************Barrier Related Functions****************/
1387 /* must be called on all ranks including comm thread in SMP */
1388 int CmiBarrier() {
1389 #if CMK_SMP
1390     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1391     CmiNodeAllBarrier();
1392     if (CmiMyRank() == CmiMyNodeSize())
1393 #else
1394     if (CmiMyRank() == 0)
1395 #endif
1396     {
1397         /**
1398          *  The call of CmiBarrier is usually before the initialization
1399          *  of trace module of Charm++, therefore, the START_EVENT
1400          *  and END_EVENT are disabled here. -Chao Mei
1401          */
1402         /*START_EVENT();*/
1403
1404         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1405             CmiAbort("Timernit: MPI_Barrier failed!\n");
1406
1407         /*END_EVENT(10);*/
1408     }
1409     CmiNodeAllBarrier();
1410     return 0;
1411 }
1412
1413 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1414 int CmiBarrierZero() {
1415     int i;
1416 #if CMK_SMP
1417     if (CmiMyRank() == CmiMyNodeSize())
1418 #else
1419     if (CmiMyRank() == 0)
1420 #endif
1421     {
1422         char msg[1];
1423         MPI_Status sts;
1424         if (CmiMyNode() == 0)  {
1425             for (i=0; i<CmiNumNodes()-1; i++) {
1426                 START_EVENT();
1427
1428                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1429                     CmiPrintf("MPI_Recv failed!\n");
1430
1431                 END_EVENT(30);
1432             }
1433         } else {
1434             START_EVENT();
1435
1436             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1437                 printf("MPI_Send failed!\n");
1438
1439             END_EVENT(20);
1440         }
1441     }
1442     CmiNodeAllBarrier();
1443     return 0;
1444 }
1445
1446 /*@}*/
1447