First check-in for the work on extractin common codes from MPI, LAPI and DCMF layer...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {
36     Sleep(1000*secs);
37 }
38 #else
39 #include <unistd.h> /*For getpid()*/
40 #endif
41 #include <stdlib.h> /*For sleep()*/
42
43 #include "machine.h"
44 #include "pcqueue.h"
45
46 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
47 /* Whether to use multiple send queue in SMP mode */
48 #define MULTI_SENDQUEUE    0
49
50 /* ###Beginning of flow control related macros ### */
51 #define CMI_EXERT_SEND_CAP 0
52 #define CMI_EXERT_RECV_CAP 0
53
54 #define CMI_DYNAMIC_EXERT_CAP 0
55 /* This macro defines the max number of msgs in the sender msg buffer
56  * that is allowed for recving operation to continue
57  */
58 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 4
59 #define CMI_DYNAMIC_MAXCAPSIZE 1000
60 #define CMI_DYNAMIC_SEND_CAPSIZE 4
61 #define CMI_DYNAMIC_RECV_CAPSIZE 3
62 /* initial values, -1 indiates there's no cap */
63 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
64 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
65
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 3
68 #endif
69
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
73 /* ###End of flow control related macros ### */
74
75 /* ###Beginning of machine-layer-tracing related macros ### */
76 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
77 #define CMI_MPI_TRACE_MOREDETAILED 0
78 #undef CMI_MPI_TRACE_USEREVENTS
79 #define CMI_MPI_TRACE_USEREVENTS 1
80 #else
81 #undef CMK_SMP_TRACE_COMMTHREAD
82 #define CMK_SMP_TRACE_COMMTHREAD 0
83 #endif
84
85 #define CMK_TRACE_COMMOVERHEAD 0
86 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
87 #undef CMI_MPI_TRACE_USEREVENTS
88 #define CMI_MPI_TRACE_USEREVENTS 1
89 #else
90 #undef CMK_TRACE_COMMOVERHEAD
91 #define CMK_TRACE_COMMOVERHEAD 0
92 #endif
93
94 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
95 CpvStaticDeclare(double, projTraceStart);
96 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
97 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
98 #else
99 #define  START_EVENT()
100 #define  END_EVENT(x)
101 #endif
102 /* ###End of machine-layer-tracing related macros ### */
103
104 /* ###Beginning of POST_RECV related macros ### */
105 /*
106  * If MPI_POST_RECV is defined, we provide default values for
107  * size and number of posted recieves. If MPI_POST_RECV_COUNT
108  * is set then a default value for MPI_POST_RECV_SIZE is used
109  * if not specified by the user.
110  */
111 #define MPI_POST_RECV 0
112
113 #if MPI_POST_RECV
114 #define MPI_POST_RECV_COUNT 10
115 #endif
116
117 #if MPI_POST_RECV_COUNT > 0
118 #define MPI_POST_RECV_LOWERSIZE 2000
119 #define MPI_POST_RECV_UPPERSIZE 4000
120 #define MPI_POST_RECV_SIZE MPI_POST_RECV_UPPERSIZE
121
122 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
123 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
124 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
125 CpvDeclare(char*,CmiPostedRecvBuffers);
126 #endif
127
128 /* to avoid MPI's in order delivery, changing MPI Tag all the time */
129 #define TAG     1375
130 #if MPI_POST_RECV_COUNT > 0
131 #define POST_RECV_TAG       (TAG+1)
132 #define BARRIER_ZERO_TAG  TAG
133 #else
134 #define BARRIER_ZERO_TAG   (TAG-1)
135 #endif
136 /* ###End of POST_RECV related related macros ### */
137
138 #if CMK_BLUEGENEL
139 #define MAX_QLEN 8
140 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
141 #else
142 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
143 #define MAX_QLEN 200
144 #endif
145 /* =======End of Definitions of Performance-Specific Macros =======*/
146
147
148 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
149 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
150 #define CHARM_MAGIC_NUMBER               126
151
152 #if CMK_ERROR_CHECKING
153 extern unsigned char computeCheckSum(unsigned char *data, int len);
154 static int checksum_flag = 0;
155 #define CMI_SET_CHECKSUM(msg, len)      \
156         if (checksum_flag)  {   \
157           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
158           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
159         }
160 #define CMI_CHECK_CHECKSUM(msg, len)    \
161         if (checksum_flag)      \
162           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
163             CmiAbort("Fatal error: checksum doesn't agree!\n");
164 #else
165 #define CMI_SET_CHECKSUM(msg, len)
166 #define CMI_CHECK_CHECKSUM(msg, len)
167 #endif
168 /* =====End of Definitions of Message-Corruption Related Macros=====*/
169
170
171 /* =====Beginning of Declarations of Machine Specific Variables===== */
172 #include <signal.h>
173 void (*signal_int)(int);
174
175 static int _thread_provided = -1; /* Indicating MPI thread level */
176 static int idleblock = 0;
177
178 /* A simple list for msgs that have been sent by MPI_Isend */
179 typedef struct msg_list {
180     char *msg;
181     struct msg_list *next;
182     int size, destpe;
183 #if CMK_SMP_TRACE_COMMTHREAD
184     int srcpe;
185 #endif
186     MPI_Request req;
187 } SMSG_LIST;
188
189 static SMSG_LIST *sent_msgs=0;
190 static SMSG_LIST *end_sent=0;
191
192 int MsgQueueLen=0;
193 static int request_max;
194 /*FLAG: consume outstanding Isends in scheduler loop*/
195 static int no_outstanding_sends=0;
196
197 #if NODE_0_IS_CONVHOST
198 int inside_comm = 0;
199 #endif
200
201 typedef struct ProcState {
202 #if MULTI_SENDQUEUE
203     PCQueue      sendMsgBuf;       /* per processor message sending queue */
204 #endif
205     CmiNodeLock  recvLock;                  /* for cs->recv */
206 } ProcState;
207 static ProcState  *procState;
208
209 #if CMK_SMP && !MULTI_SENDQUEUE
210 static PCQueue sendMsgBuf;
211 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
212 #endif
213 /* =====End of Declarations of Machine Specific Variables===== */
214
215
216 /* =====Beginning of Declarations of Machine Specific Functions===== */
217 /* Utility functions */
218 #if CMK_BLUEGENEL
219 extern void MPID_Progress_test();
220 #endif
221 static size_t CmiAllAsyncMsgsSent(void);
222 static void CmiReleaseSentMessages(void);
223 static int PumpMsgs(void);
224 static void PumpMsgsBlocking(void);
225
226 #if CMK_SMP
227 static int MsgQueueEmpty();
228 static int RecvQueueEmpty();
229 static int SendMsgBuf();
230 static  void EnqueueMsg(void *m, int size, int node);
231 #endif
232
233 /* The machine-specific send function */
234 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode);
235 #define CmiMachineSpecificSendFunc MachineSpecificSendForMPI
236
237 /* ### Beginning of Machine-startup Related Functions ### */
238 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID);
239 #define MachineSpecificInit MachineInitForMPI
240
241 static void MachinePreCommonInitForMPI(int everReturn);
242 static void MachinePostCommonInitForMPI(int everReturn);
243 #define MachineSpecificPreCommonInit MachinePreCommonInitForMPI
244 #define MachineSpecificPostCommonInit MachinePostCommonInitForMPI
245 /* ### End of Machine-startup Related Functions ### */
246
247 /* ### Beginning of Machine-running Related Functions ### */
248 static void AdvanceCommunicationForMPI();
249 #define MachineSpecificAdvanceCommunication AdvanceCommunicationForMPI
250
251 static void DrainResourcesForMPI(); /* used when exit */
252 #define MachineSpecificDrainResources DrainResourcesForMPI
253
254 static void MachineExitForMPI();
255 #define MachineSpecificExit MachineExitForMPI
256 /* ### End of Machine-running Related Functions ### */
257
258 /* ### Beginning of Idle-state Related Functions ### */
259 void CmiNotifyIdleForMPI(void);
260 /* ### End of Idle-state Related Functions ### */
261
262 void MachinePostNonLocalForMPI();
263 #define MachineSpecificPostNonLocal MachinePostNonLocalForMPI
264
265 /* =====End of Declarations of Machine Specific Functions===== */
266
267 /**
268  *  Macros that overwrites the common codes, such as
269  *  CMK_SMP_NO_COMMTHD, NETWORK_PROGRESS_PERIOD_DEFAULT,
270  *  USE_COMMON_SYNC_P2P, CMK_HAS_SIZE_IN_MSGHDR,
271  *  CMK_OFFLOAD_BCAST_PROCESS etc.
272  */
273 #define CMK_HAS_SIZE_IN_MSGHDR 0
274 #include "machine-common.c"
275
276 /* The machine specific msg-sending function */
277
278 #if CMK_SMP
279 static void EnqueueMsg(void *m, int size, int node) {
280     SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
281     MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
282     msg_tmp->msg = m;
283     msg_tmp->size = size;
284     msg_tmp->destpe = node;
285
286 #if CMK_SMP_TRACE_COMMTHREAD
287     msg_tmp->srcpe = CmiMyPe();
288 #endif
289
290 #if MULTI_SENDQUEUE
291     PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
292 #else
293     /*CmiLock(sendMsgBufLock);*/
294     PCQueuePush(sendMsgBuf,(char *)msg_tmp);
295     /*CmiUnlock(sendMsgBufLock);*/
296 #endif
297
298     MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
299 }
300 #endif
301
302 static CmiCommHandle MachineSpecificSendForMPI(int destNode, int size, char *msg, int mode) {
303     /* Ignoring the mode for MPI layer */
304
305     CmiState cs = CmiGetState();
306     SMSG_LIST *msg_tmp;
307     int  rank;
308
309     CmiAssert(destNode != CmiMyNode());
310 #if CMK_SMP
311     EnqueueMsg(msg, size, destNode);
312     return 0;
313 #else
314     /* non smp */
315     msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
316     msg_tmp->msg = msg;
317     msg_tmp->next = 0;
318     while (MsgQueueLen > request_max) {
319         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
320         CmiReleaseSentMessages();
321         PumpMsgs();
322     }
323 #if CMK_ERROR_CHECKING
324     CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
325 #endif
326     CMI_SET_CHECKSUM(msg, size);
327
328 #if MPI_POST_RECV_COUNT > 0
329     if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
330         START_EVENT();
331         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
332             CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
333         END_EVENT(40);
334     } else {
335         START_EVENT();
336         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
337             CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
338         END_EVENT(40);
339     }
340 #else
341     START_EVENT();
342     if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destNode,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
343         CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
344     /*END_EVENT(40);*/
345 #if CMK_TRACE_COMMOVERHEAD
346     char tmp[64];
347     sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destNode);
348     traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
349 #endif
350 #endif
351
352     MsgQueueLen++;
353     if (sent_msgs==0)
354         sent_msgs = msg_tmp;
355     else
356         end_sent->next = msg_tmp;
357     end_sent = msg_tmp;
358     return (CmiCommHandle) &(msg_tmp->req);
359 #endif              /* non-smp */
360 }
361
362 static size_t CmiAllAsyncMsgsSent(void) {
363     SMSG_LIST *msg_tmp = sent_msgs;
364     MPI_Status sts;
365     int done;
366
367     while (msg_tmp!=0) {
368         done = 0;
369         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
370             CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
371         if (!done)
372             return 0;
373         msg_tmp = msg_tmp->next;
374         /*    MsgQueueLen--; ????? */
375     }
376     return 1;
377 }
378
379 int CmiAsyncMsgSent(CmiCommHandle c) {
380
381     SMSG_LIST *msg_tmp = sent_msgs;
382     int done;
383     MPI_Status sts;
384
385     while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
386         msg_tmp = msg_tmp->next;
387     if (msg_tmp) {
388         done = 0;
389         if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
390             CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
391         return ((done)?1:0);
392     } else {
393         return 1;
394     }
395 }
396
397 void CmiReleaseCommHandle(CmiCommHandle c) {
398     return;
399 }
400
401 /* ######Beginning of functions related with communication progress ###### */
402 static void CmiReleaseSentMessages(void) {
403     SMSG_LIST *msg_tmp=sent_msgs;
404     SMSG_LIST *prev=0;
405     SMSG_LIST *temp;
406     int done;
407     MPI_Status sts;
408
409 #if CMK_BLUEGENEL
410     MPID_Progress_test();
411 #endif
412
413     MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
414     while (msg_tmp!=0) {
415         done =0;
416 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
417         double startT = CmiWallTimer();
418 #endif
419         if (MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
420             CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
421         if (done) {
422             MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
423             MsgQueueLen--;
424             /* Release the message */
425             temp = msg_tmp->next;
426             if (prev==0) /* first message */
427                 sent_msgs = temp;
428             else
429                 prev->next = temp;
430             CmiFree(msg_tmp->msg);
431             CmiFree(msg_tmp);
432             msg_tmp = temp;
433         } else {
434             prev = msg_tmp;
435             msg_tmp = msg_tmp->next;
436         }
437 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
438         {
439             double endT = CmiWallTimer();
440             /* only record the event if it takes more than 1ms */
441             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
442         }
443 #endif
444     }
445     end_sent = prev;
446     MACHSTATE(2,"} CmiReleaseSentMessages end");
447 }
448
449 static int PumpMsgs(void) {
450     int nbytes, flg, res;
451     char *msg;
452     MPI_Status sts;
453     int recd=0;
454
455 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
456     int recvCnt=0;
457 #endif
458
459 #if CMK_BLUEGENEL
460     MPID_Progress_test();
461 #endif
462
463     MACHSTATE(2,"PumpMsgs begin {");
464
465 #if CMI_DYNAMIC_EXERT_CAP
466     dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
467 #endif
468
469     while (1) {
470 #if CMI_EXERT_RECV_CAP
471         if (recvCnt==RECV_CAP) break;
472 #elif CMI_DYNAMIC_EXERT_CAP
473         if (recvCnt >= dynamicRecvCap) break;
474 #endif
475
476         /* First check posted recvs then do  probe unmatched outstanding messages */
477 #if MPI_POST_RECV_COUNT > 0
478         int completed_index=-1;
479         if (MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
480             CmiAbort("PumpMsgs: MPI_Testany failed!\n");
481         if (flg) {
482             if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
483                 CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
484
485             recd = 1;
486             msg = (char *) CmiAlloc(nbytes);
487             memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
488             /* and repost the recv */
489
490             START_EVENT();
491
492             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])       ,
493                                            MPI_POST_RECV_SIZE,
494                                            MPI_BYTE,
495                                            MPI_ANY_SOURCE,
496                                            POST_RECV_TAG,
497                                            MPI_COMM_WORLD,
498                                            &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
499                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
500
501             END_EVENT(50);
502
503             CpvAccess(Cmi_posted_recv_total)++;
504         } else {
505             res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
506             if (res != MPI_SUCCESS)
507                 CmiAbort("MPI_Iprobe failed\n");
508             if (!flg) break;
509             recd = 1;
510             MPI_Get_count(&sts, MPI_BYTE, &nbytes);
511             msg = (char *) CmiAlloc(nbytes);
512
513             START_EVENT();
514
515             if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
516                 CmiAbort("PumpMsgs: MPI_Recv failed!\n");
517
518             END_EVENT(30);
519
520             CpvAccess(Cmi_unposted_recv_total)++;
521         }
522 #else
523         /* Original version */
524 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
525         double startT = CmiWallTimer();
526 #endif
527         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
528         if (res != MPI_SUCCESS)
529             CmiAbort("MPI_Iprobe failed\n");
530
531         if (!flg) break;
532 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
533         {
534             double endT = CmiWallTimer();
535             /* only trace the probe that last longer than 1ms */
536             if (endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
537         }
538 #endif
539
540         recd = 1;
541         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
542         msg = (char *) CmiAlloc(nbytes);
543
544         START_EVENT();
545
546         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
547             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
548
549         /*END_EVENT(30);*/
550
551 #endif
552
553 #if CMK_SMP_TRACE_COMMTHREAD
554         traceBeginCommOp(msg);
555         traceChangeLastTimestamp(CpvAccess(projTraceStart));
556         traceEndCommOp(msg);
557 #if CMI_MPI_TRACE_MOREDETAILED
558         char tmp[32];
559         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
560         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
561 #endif
562 #elif CMK_TRACE_COMMOVERHEAD
563         char tmp[32];
564         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
565         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
566 #endif
567
568
569         MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
570         CMI_CHECK_CHECKSUM(msg, nbytes);
571 #if CMK_ERROR_CHECKING
572         if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
573             CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
574             CmiFree(msg);
575             CmiAbort("Abort!\n");
576             continue;
577         }
578 #endif
579
580         handleOneRecvedMsg(nbytes, msg);
581
582 #if CMI_EXERT_RECV_CAP
583         recvCnt++;
584 #elif CMI_DYNAMIC_EXERT_CAP
585         recvCnt++;
586 #if CMK_SMP
587         /* check sendMsgBuf to get the  number of messages that have not been sent
588              * which is only available in SMP mode
589          * MsgQueueLen indicates the number of messages that have not been released
590              * by MPI
591              */
592         if (PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
593                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
594             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
595         }
596 #else
597         /* MsgQueueLen indicates the number of messages that have not been released
598              * by MPI
599              */
600         if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD) {
601             dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
602         }
603 #endif
604
605 #endif
606
607     }
608
609     MACHSTATE(2,"} PumpMsgs end ");
610     return recd;
611 }
612
613 /* blocking version */
614 static void PumpMsgsBlocking(void) {
615     static int maxbytes = 20000000;
616     static char *buf = NULL;
617     int nbytes, flg;
618     MPI_Status sts;
619     char *msg;
620     int recd=0;
621
622     if (!PCQueueEmpty(CmiGetState()->recv)) return;
623     if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
624     if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
625     if (sent_msgs)  return;
626
627 #if 0
628     CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
629 #endif
630
631     if (buf == NULL) {
632         buf = (char *) CmiAlloc(maxbytes);
633         _MEMCHECK(buf);
634     }
635
636
637 #if MPI_POST_RECV_COUNT > 0
638 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
639     CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
640 #endif
641
642     START_EVENT();
643
644     if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
645         CmiAbort("PumpMsgs: PMP_Recv failed!\n");
646
647     /*END_EVENT(30);*/
648
649     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
650     msg = (char *) CmiAlloc(nbytes);
651     memcpy(msg, buf, nbytes);
652
653 #if CMK_SMP_TRACE_COMMTHREAD
654     traceBeginCommOp(msg);
655     traceChangeLastTimestamp(CpvAccess(projTraceStart));
656     traceEndCommOp(msg);
657 #if CMI_MPI_TRACE_MOREDETAILED
658     char tmp[32];
659     sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
660     traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
661 #endif
662 #endif
663
664     handleOneRecvedMsg(nbytes, msg);
665 }
666
667
668 #if CMK_SMP
669
670 /* called by communication thread in SMP */
671 static int SendMsgBuf() {
672     SMSG_LIST *msg_tmp;
673     char *msg;
674     int node, rank, size;
675     int i;
676     int sent = 0;
677
678 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
679     int sentCnt = 0;
680 #endif
681
682 #if CMI_DYNAMIC_EXERT_CAP
683     dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
684 #endif
685
686     MACHSTATE(2,"SendMsgBuf begin {");
687 #if MULTI_SENDQUEUE
688     for (i=0; i<_Cmi_mynodesize+1; i++) { /* subtle: including comm thread */
689         if (!PCQueueEmpty(procState[i].sendMsgBuf)) {
690             msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
691 #else
692     /* single message sending queue */
693     /* CmiLock(sendMsgBufLock); */
694     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
695     /* CmiUnlock(sendMsgBufLock); */
696     while (NULL != msg_tmp) {
697 #endif
698             node = msg_tmp->destpe;
699             size = msg_tmp->size;
700             msg = msg_tmp->msg;
701             msg_tmp->next = 0;
702
703 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
704             while (MsgQueueLen > request_max) {
705                 CmiReleaseSentMessages();
706                 PumpMsgs();
707             }
708 #endif
709
710             MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
711 #if CMK_ERROR_CHECKING
712             CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
713 #endif
714             CMI_SET_CHECKSUM(msg, size);
715
716 #if MPI_POST_RECV_COUNT > 0
717             if (size>=MPI_POST_RECV_LOWERSIZE && size <= MPI_POST_RECV_UPPERSIZE) {
718                 START_EVENT();
719                 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
720                     CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
721                 END_EVENT(40);
722             } else {
723                 START_EVENT();
724                 if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
725                     CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
726                 END_EVENT(40);
727             }
728 #else
729             START_EVENT();
730             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
731                 CmiAbort("MachineSpecificSendForMPI: MPI_Isend failed!\n");
732             /*END_EVENT(40);*/
733 #endif
734
735 #if CMK_SMP_TRACE_COMMTHREAD
736             traceBeginCommOp(msg);
737             traceChangeLastTimestamp(CpvAccess(projTraceStart));
738             /* traceSendMsgComm must execute after traceBeginCommOp because
739                  * we pretend we execute an entry method, and inside this we
740                  * pretend we will send another message. Otherwise how could
741                  * a message creation just before an entry method invocation?
742                  * If such logic is broken, the projections will not trace
743                  * messages correctly! -Chao Mei
744                  */
745             traceSendMsgComm(msg);
746             traceEndCommOp(msg);
747 #if CMI_MPI_TRACE_MOREDETAILED
748             char tmp[64];
749             sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
750             traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
751 #endif
752 #endif
753
754
755             MACHSTATE(3,"}MPI_send end");
756             MsgQueueLen++;
757             if (sent_msgs==0)
758                 sent_msgs = msg_tmp;
759             else
760                 end_sent->next = msg_tmp;
761             end_sent = msg_tmp;
762             sent=1;
763
764 #if CMI_EXERT_SEND_CAP
765             if (++sentCnt == SEND_CAP) break;
766 #elif CMI_DYNAMIC_EXERT_CAP
767             if (++sentCnt >= dynamicSendCap) break;
768             if (MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
769                 dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
770 #endif
771
772 #if ! MULTI_SENDQUEUE
773             /* CmiLock(sendMsgBufLock); */
774             msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
775             /* CmiUnlock(sendMsgBufLock); */
776 #endif
777         }
778 #if MULTI_SENDQUEUE
779     }
780 #endif
781     MACHSTATE(2,"}SendMsgBuf end ");
782     return sent;
783 }
784
785 static int MsgQueueEmpty() {
786     int i;
787 #if MULTI_SENDQUEUE
788     for (i=0; i<_Cmi_mynodesize; i++)
789         if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
790 #else
791     return PCQueueEmpty(sendMsgBuf);
792 #endif
793     return 1;
794 }
795
796 /* test if all processors recv queues are empty */
797 static int RecvQueueEmpty() {
798     int i;
799     for (i=0; i<_Cmi_mynodesize; i++) {
800         CmiState cs=CmiGetStateN(i);
801         if (!PCQueueEmpty(cs->recv)) return 0;
802     }
803     return 1;
804 }
805
806
807 #define REPORT_COMM_METRICS 0
808 #if REPORT_COMM_METRICS
809 static double pumptime = 0.0;
810 static double releasetime = 0.0;
811 static double sendtime = 0.0;
812 #endif
813
814 #endif //end of CMK_SMP
815
816 static void AdvanceCommunicationForMPI() {
817 #if REPORT_COMM_METRICS
818     double t1, t2, t3, t4;
819     t1 = CmiWallTimer();
820 #endif
821
822 #if CMK_SMP
823     PumpMsgs();
824
825 #if REPORT_COMM_METRICS
826     t2 = CmiWallTimer();
827 #endif
828
829     CmiReleaseSentMessages();
830 #if REPORT_COMM_METRICS
831     t3 = CmiWallTimer();
832 #endif
833
834     SendMsgBuf();
835
836 #if REPORT_COMM_METRICS
837     t4 = CmiWallTimer();
838     pumptime += (t2-t1);
839     releasetime += (t3-t2);
840     sendtime += (t4-t3);
841 #endif
842
843 #else /* non-SMP case */
844     CmiReleaseSentMessages();
845
846 #if REPORT_COMM_METRICS
847     t2 = CmiWallTimer();
848 #endif
849     PumpMsgs();
850
851 #if REPORT_COMM_METRICS
852     t3 = CmiWallTimer();
853     pumptime += (t3-t2);
854     releasetime += (t2-t1);
855 #endif
856
857 #endif /* end of #if CMK_SMP */
858 }
859 /* ######End of functions related with communication progress ###### */
860
861 void MachinePostNonLocalForMPI() {
862 #if !CMK_SMP
863     if (no_outstanding_sends) {
864         while (MsgQueueLen>0) {
865             AdvanceCommunicationForMPI();
866         }
867     }
868
869     /* FIXME: I don't think the following codes are needed because
870      * it repeats the same job of the next call of CmiGetNonLocal
871      */
872 #if 0
873     if (!msg) {
874         CmiReleaseSentMessages();
875         if (PumpMsgs())
876             return  PCQueuePop(cs->recv);
877         else
878             return 0;
879     }
880 #endif
881 #endif
882 }
883
884 /* Idle-state related functions: called in non-smp mode */
885 void CmiNotifyIdleForMPI(void) {
886     CmiReleaseSentMessages();
887     if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
888 }
889
890 /* Network progress function is used to poll the network when for
891    messages. This flushes receive buffers on some  implementations*/
892 #if CMK_MACHINE_PROGRESS_DEFINED
893 void CmiMachineProgressImpl() {
894 #if !CMK_SMP
895     PumpMsgs();
896 #if CMK_IMMEDIATE_MSG
897     CmiHandleImmediate();
898 #endif
899 #else
900     /*Not implemented yet. Communication server does not seem to be
901       thread safe, so only communication thread call it */
902     if (CmiMyRank() == CmiMyNodeSize())
903         CommunicationServerThread(0);
904 #endif
905 }
906 #endif
907
908 /* ######Beginning of functions related with exiting programs###### */
909 void DrainResourcesForMPI() {
910 #if !CMK_SMP
911     while (!CmiAllAsyncMsgsSent()) {
912         PumpMsgs();
913         CmiReleaseSentMessages();
914     }
915 #else
916     while (!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
917         CmiReleaseSentMessages();
918         SendMsgBuf();
919         PumpMsgs();
920     }
921 #endif
922     MACHSTATE(2, "Machine exit barrier begin {");
923     START_EVENT();
924     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
925         CmiAbort("DrainResourcesForMPI: MPI_Barrier failed!\n");
926     END_EVENT(10);
927     MACHSTATE(2, "} Machine exit barrier end");
928 }
929
930 void MachineExitForMPI(void) {
931 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
932     int doPrint = 0;
933 #if CMK_SMP
934     if (CmiMyNode()==0) doPrint = 1;
935 #else
936     if (CmiMyPe()==0) doPrint = 1;
937 #endif
938
939     if (doPrint) {
940 #if MPI_POST_RECV_COUNT > 0
941         CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
942 #endif
943     }
944 #endif
945
946 #if REPORT_COMM_METRICS
947 #if CMK_SMP
948     CmiPrintf("Report comm metrics for node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n",
949               CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1,
950               pumptime, releasetime, sendtime);
951 #else
952     CmiPrintf("Report comm metrics for proc %d: pumptime: %f, releasetime: %f, senttime: %f\n",
953               CmiMyPe(), pumptime, releasetime, sendtime);
954 #endif
955 #endif
956
957 #if ! CMK_AUTOBUILD
958     signal(SIGINT, signal_int);
959     MPI_Finalize();
960 #endif
961     exit(0);
962 }
963
964 static int machine_exit_idx;
965 static void machine_exit(char *m) {
966     EmergencyExit();
967     /*printf("--> %d: machine_exit\n",CmiMyPe());*/
968     fflush(stdout);
969     CmiNodeBarrier();
970     if (CmiMyRank() == 0) {
971         MPI_Barrier(MPI_COMM_WORLD);
972         /*printf("==> %d: passed barrier\n",CmiMyPe());*/
973         MPI_Abort(MPI_COMM_WORLD, 1);
974     } else {
975         while (1) CmiYield();
976     }
977 }
978
979 static void KillOnAllSigs(int sigNo) {
980     static int already_in_signal_handler = 0;
981     char *m;
982     if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
983     already_in_signal_handler = 1;
984 #if CMK_CCS_AVAILABLE
985     if (CpvAccess(cmiArgDebugFlag)) {
986         CpdNotify(CPD_SIGNAL, sigNo);
987         CpdFreeze();
988     }
989 #endif
990     CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
991              "Signal: %d\n",CmiMyPe(),sigNo);
992     CmiPrintStackTrace(1);
993
994     m = CmiAlloc(CmiMsgHeaderSizeBytes);
995     CmiSetHandler(m, machine_exit_idx);
996     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
997     machine_exit(m);
998 }
999 /* ######End of functions related with exiting programs###### */
1000
1001
1002 /* ######Beginning of functions related with starting programs###### */
1003 static void registerMPITraceEvents() {
1004 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1005     traceRegisterUserEvent("MPI_Barrier", 10);
1006     traceRegisterUserEvent("MPI_Send", 20);
1007     traceRegisterUserEvent("MPI_Recv", 30);
1008     traceRegisterUserEvent("MPI_Isend", 40);
1009     traceRegisterUserEvent("MPI_Irecv", 50);
1010     traceRegisterUserEvent("MPI_Test", 60);
1011     traceRegisterUserEvent("MPI_Iprobe", 70);
1012 #endif
1013 }
1014
1015 #if MACHINE_DEBUG_LOG
1016 FILE *debugLog = NULL;
1017 #endif
1018
1019 static char *thread_level_tostring(int thread_level) {
1020 #if CMK_MPI_INIT_THREAD
1021     switch (thread_level) {
1022     case MPI_THREAD_SINGLE:
1023         return "MPI_THREAD_SINGLE";
1024     case MPI_THREAD_FUNNELED:
1025         return "MPI_THREAD_FUNNELED";
1026     case MPI_THREAD_SERIALIZED:
1027         return "MPI_THREAD_SERIALIZED";
1028     case MPI_THREAD_MULTIPLE :
1029         return "MPI_THREAD_MULTIPLE ";
1030     default: {
1031         char *str = (char*)malloc(5);
1032         sprintf(str,"%d", thread_level);
1033         return str;
1034     }
1035     }
1036     return  "unknown";
1037 #else
1038     char *str = (char*)malloc(5);
1039     sprintf(str,"%d", thread_level);
1040     return str;
1041 #endif
1042 }
1043
1044 /**
1045  *  Obtain the number of nodes, my node id, and consuming machine layer
1046  *  specific arguments
1047  */
1048 static void MachineInitForMPI(int argc, char **argv, int *numNodes, int *myNodeID) {
1049     int n,i;
1050     int ver, subver;
1051     int provided;
1052     int thread_level;
1053     int myNID;
1054
1055 #if MACHINE_DEBUG
1056     debugLog=NULL;
1057 #endif
1058 #if CMK_USE_HP_MAIN_FIX
1059 #if FOR_CPLUS
1060     _main(argc,argv);
1061 #endif
1062 #endif
1063
1064 #if CMK_MPI_INIT_THREAD
1065 #if CMK_SMP
1066     thread_level = MPI_THREAD_FUNNELED;
1067 #else
1068     thread_level = MPI_THREAD_SINGLE;
1069 #endif
1070     MPI_Init_thread(&argc, &argv, thread_level, &provided);
1071     _thread_provided = provided;
1072 #else
1073     MPI_Init(&argc, &argv);
1074     thread_level = 0;
1075     provided = -1;
1076 #endif
1077     MPI_Comm_size(MPI_COMM_WORLD, numNodes);
1078     MPI_Comm_rank(MPI_COMM_WORLD, myNodeID);
1079
1080     myNID = *myNodeID;
1081
1082     MPI_Get_version(&ver, &subver);
1083     if (myNID == 0) {
1084         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
1085     }
1086
1087     idleblock = CmiGetArgFlag(argv, "+idleblocking");
1088     if (idleblock && _Cmi_mynode == 0) {
1089         printf("Charm++: Running in idle blocking mode.\n");
1090     }
1091
1092     /* setup signal handlers */
1093     signal(SIGSEGV, KillOnAllSigs);
1094     signal(SIGFPE, KillOnAllSigs);
1095     signal(SIGILL, KillOnAllSigs);
1096     signal_int = signal(SIGINT, KillOnAllSigs);
1097     signal(SIGTERM, KillOnAllSigs);
1098     signal(SIGABRT, KillOnAllSigs);
1099 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
1100     signal(SIGQUIT, KillOnAllSigs);
1101     signal(SIGBUS, KillOnAllSigs);
1102 #   endif /*UNIX*/
1103
1104 #if CMK_NO_OUTSTANDING_SENDS
1105     no_outstanding_sends=1;
1106 #endif
1107     if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1108         no_outstanding_sends = 1;
1109         if (myNID == 0)
1110             printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1111                    no_outstanding_sends?"":" not");
1112     }
1113
1114     request_max=MAX_QLEN;
1115     CmiGetArgInt(argv,"+requestmax",&request_max);
1116     /*printf("request max=%d\n", request_max);*/
1117
1118     /* checksum flag */
1119     if (CmiGetArgFlag(argv,"+checksum")) {
1120 #if CMK_ERROR_CHECKING
1121         checksum_flag = 1;
1122         if (myNID == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1123 #else
1124         if (myNID == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1125 #endif
1126     }
1127
1128     {
1129         int debug = CmiGetArgFlag(argv,"++debug");
1130         int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
1131         if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
1132 #if CMK_HAS_GETPID
1133             printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
1134             fflush(stdout);
1135             if (!debug_no_pause)
1136                 sleep(15);
1137 #else
1138             printf("++debug ignored.\n");
1139 #endif
1140         }
1141     }
1142
1143     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1144     for (i=0; i<_Cmi_mynodesize+1; i++) {
1145 #if MULTI_SENDQUEUE
1146         procState[i].sendMsgBuf = PCQueueCreate();
1147 #endif
1148         procState[i].recvLock = CmiCreateLock();
1149     }
1150 #if CMK_SMP
1151 #if !MULTI_SENDQUEUE
1152     sendMsgBuf = PCQueueCreate();
1153     sendMsgBufLock = CmiCreateLock();
1154 #endif
1155 #endif
1156 }
1157
1158 static void MachinePreCommonInitForMPI(int everReturn) {
1159 #if MPI_POST_RECV_COUNT > 0
1160     int doInit = 1;
1161     int i;
1162
1163 #if CMK_SMP
1164     if (CmiMyRank() != CmiMyNodeSize()) doInit = 0;
1165 #endif
1166
1167     /* Currently, in mpi smp, the main thread will be the comm thread, so
1168      *  only the comm thread should post recvs. Cpvs, however, need to be
1169      * created on rank 0 (the ptrs to the actual cpv memory), while
1170      * other ranks are busy waiting for this to finish. So cpv initialize
1171      * routines have to be called on every ranks, although they are only
1172      * useful on comm thread (whose rank is not zero) -Chao Mei
1173      */
1174     CpvInitialize(unsigned long long, Cmi_posted_recv_total);
1175     CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
1176     CpvInitialize(MPI_Request*, CmiPostedRecvRequests);
1177     CpvInitialize(char*,CmiPostedRecvBuffers);
1178
1179     if (doInit) {
1180         /* Post some extra recvs to help out with incoming messages */
1181         /* On some MPIs the messages are unexpected and thus slow */
1182
1183         /* An array of request handles for posted recvs */
1184         CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1185
1186         /* An array of buffers for posted recvs */
1187         CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1188
1189         /* Post Recvs */
1190         for (i=0; i<MPI_POST_RECV_COUNT; i++) {
1191             printf("Pre post recv %d\n", i);
1192             if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])     ,
1193                                            MPI_POST_RECV_SIZE,
1194                                            MPI_BYTE,
1195                                            MPI_ANY_SOURCE,
1196                                            POST_RECV_TAG,
1197                                            MPI_COMM_WORLD,
1198                                            &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1199                 CmiAbort("MPI_Irecv failed\n");
1200         }
1201     }
1202 #endif
1203
1204 }
1205
1206 static void MachinePostCommonInitForMPI(int everReturn) {
1207     CmiIdleState *s=CmiNotifyGetState();
1208     machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1209
1210 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1211     CpvInitialize(double, projTraceStart);
1212     /* only PE 0 needs to care about registration (to generate sts file). */
1213     if (CmiMyPe() == 0) {
1214         registerMachineUserEventsFunction(&registerMPITraceEvents);
1215     }
1216 #endif
1217
1218 #if CMK_SMP
1219     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1220     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1221 #else
1222     CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdleForMPI,NULL);
1223 #endif
1224
1225 #if MACHINE_DEBUG_LOG
1226     if (CmiMyRank() == 0) {
1227         char ln[200];
1228         sprintf(ln,"debugLog.%d",CmiMyNode());
1229         debugLog=fopen(ln,"w");
1230     }
1231 #endif
1232 }
1233 /* ######End of functions related with starting programs###### */
1234
1235 /***********************************************************************
1236  *
1237  * Abort function:
1238  *
1239  ************************************************************************/
1240
1241 void CmiAbort(const char *message) {
1242     char *m;
1243     /* if CharmDebug is attached simply try to send a message to it */
1244 #if CMK_CCS_AVAILABLE
1245     if (CpvAccess(cmiArgDebugFlag)) {
1246         CpdNotify(CPD_ABORT, message);
1247         CpdFreeze();
1248     }
1249 #endif
1250     CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1251              "Reason: %s\n",CmiMyPe(),message);
1252     /*  CmiError(message); */
1253     CmiPrintStackTrace(0);
1254     m = CmiAlloc(CmiMsgHeaderSizeBytes);
1255     CmiSetHandler(m, machine_exit_idx);
1256     CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1257     machine_exit(m);
1258     /* Program never reaches here */
1259     MPI_Abort(MPI_COMM_WORLD, 1);
1260 }
1261
1262 /**************************  TIMER FUNCTIONS **************************/
1263 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
1264
1265 /* MPI calls are not threadsafe, even the timer on some machines */
1266 static CmiNodeLock  timerLock = 0;
1267 static int _absoluteTime = 0;
1268 static double starttimer = 0;
1269 static int _is_global = 0;
1270
1271 int CmiTimerIsSynchronized() {
1272     int  flag;
1273     void *v;
1274
1275     /*  check if it using synchronized timer */
1276     if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
1277         printf("MPI_WTIME_IS_GLOBAL not valid!\n");
1278     if (flag) {
1279         _is_global = *(int*)v;
1280         if (_is_global && CmiMyPe() == 0)
1281             printf("Charm++> MPI timer is synchronized\n");
1282     }
1283     return _is_global;
1284 }
1285
1286 int CmiTimerAbsolute() {
1287     return _absoluteTime;
1288 }
1289
1290 double CmiStartTimer() {
1291     return 0.0;
1292 }
1293
1294 double CmiInitTime() {
1295     return starttimer;
1296 }
1297
1298 void CmiTimerInit(char **argv) {
1299     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
1300     if (_absoluteTime && CmiMyPe() == 0)
1301         printf("Charm++> absolute MPI timer is used\n");
1302
1303     _is_global = CmiTimerIsSynchronized();
1304
1305     if (_is_global) {
1306         if (CmiMyRank() == 0) {
1307             double minTimer;
1308 #if CMK_TIMER_USE_XT3_DCLOCK
1309             starttimer = dclock();
1310 #else
1311             starttimer = MPI_Wtime();
1312 #endif
1313
1314             MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
1315                           MPI_COMM_WORLD );
1316             starttimer = minTimer;
1317         }
1318     } else { /* we don't have a synchronous timer, set our own start time */
1319         CmiBarrier();
1320         CmiBarrier();
1321         CmiBarrier();
1322 #if CMK_TIMER_USE_XT3_DCLOCK
1323         starttimer = dclock();
1324 #else
1325         starttimer = MPI_Wtime();
1326 #endif
1327     }
1328
1329 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
1330     if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
1331         timerLock = CmiCreateLock();
1332 #endif
1333     CmiNodeAllBarrier();          /* for smp */
1334 }
1335
1336 /**
1337  * Since the timerLock is never created, and is
1338  * always NULL, then all the if-condition inside
1339  * the timer functions could be disabled right
1340  * now in the case of SMP. --Chao Mei
1341  */
1342 double CmiTimer(void) {
1343     double t;
1344 #if 0 && CMK_SMP
1345     if (timerLock) CmiLock(timerLock);
1346 #endif
1347
1348 #if CMK_TIMER_USE_XT3_DCLOCK
1349     t = dclock();
1350 #else
1351     t = MPI_Wtime();
1352 #endif
1353
1354 #if 0 && CMK_SMP
1355     if (timerLock) CmiUnlock(timerLock);
1356 #endif
1357
1358     return _absoluteTime?t: (t-starttimer);
1359 }
1360
1361 double CmiWallTimer(void) {
1362     double t;
1363 #if 0 && CMK_SMP
1364     if (timerLock) CmiLock(timerLock);
1365 #endif
1366
1367 #if CMK_TIMER_USE_XT3_DCLOCK
1368     t = dclock();
1369 #else
1370     t = MPI_Wtime();
1371 #endif
1372
1373 #if 0 && CMK_SMP
1374     if (timerLock) CmiUnlock(timerLock);
1375 #endif
1376
1377     return _absoluteTime? t: (t-starttimer);
1378 }
1379
1380 double CmiCpuTimer(void) {
1381     double t;
1382 #if 0 && CMK_SMP
1383     if (timerLock) CmiLock(timerLock);
1384 #endif
1385 #if CMK_TIMER_USE_XT3_DCLOCK
1386     t = dclock() - starttimer;
1387 #else
1388     t = MPI_Wtime() - starttimer;
1389 #endif
1390 #if 0 && CMK_SMP
1391     if (timerLock) CmiUnlock(timerLock);
1392 #endif
1393     return t;
1394 }
1395
1396 #endif
1397
1398 /************Barrier Related Functions****************/
1399 /* must be called on all ranks including comm thread in SMP */
1400 int CmiBarrier() {
1401 #if CMK_SMP
1402     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
1403     CmiNodeAllBarrier();
1404     if (CmiMyRank() == CmiMyNodeSize())
1405 #else
1406     if (CmiMyRank() == 0)
1407 #endif
1408     {
1409         /**
1410          *  The call of CmiBarrier is usually before the initialization
1411          *  of trace module of Charm++, therefore, the START_EVENT
1412          *  and END_EVENT are disabled here. -Chao Mei
1413          */
1414         /*START_EVENT();*/
1415
1416         if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1417             CmiAbort("Timernit: MPI_Barrier failed!\n");
1418
1419         /*END_EVENT(10);*/
1420     }
1421     CmiNodeAllBarrier();
1422     return 0;
1423 }
1424
1425 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
1426 int CmiBarrierZero() {
1427     int i;
1428 #if CMK_SMP
1429     if (CmiMyRank() == CmiMyNodeSize())
1430 #else
1431     if (CmiMyRank() == 0)
1432 #endif
1433     {
1434         char msg[1];
1435         MPI_Status sts;
1436         if (CmiMyNode() == 0)  {
1437             for (i=0; i<CmiNumNodes()-1; i++) {
1438                 START_EVENT();
1439
1440                 if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
1441                     CmiPrintf("MPI_Recv failed!\n");
1442
1443                 END_EVENT(30);
1444             }
1445         } else {
1446             START_EVENT();
1447
1448             if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
1449                 printf("MPI_Send failed!\n");
1450
1451             END_EVENT(20);
1452         }
1453     }
1454     CmiNodeAllBarrier();
1455     return 0;
1456 }
1457
1458 /*@}*/
1459