43a0082cce21c97fc2a57dd39feb2e9da1dd004e
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #if CMK_SMP
51 /* currently only considering the smp case */
52 #define CMI_DYNAMIC_EXERT_CAP 1
53 /* This macro defines the max number of msgs in the sender msg buffer 
54  * that is allowed for recving operation to continue
55  */
56 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 10
57 #define CMI_DYNAMIC_SEND_CAPSIZE 10
58 #define CMI_DYNAMIC_RECV_CAPSIZE 10
59 /* initial values, -1 indiates there's no cap */
60 static int dynamicSendCap = -1;
61 static int dynamicRecvCap = -1;
62 #endif
63
64 #if CMI_EXERT_SEND_CAP
65 #define SEND_CAP 3
66 #endif
67
68 #if CMI_EXERT_RECV_CAP
69 #define RECV_CAP 2
70 #endif
71
72
73 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
74 #define CMI_MPI_TRACE_MOREDETAILED 0
75 #undef CMI_MPI_TRACE_USEREVENTS
76 #define CMI_MPI_TRACE_USEREVENTS 1
77 #else
78 #undef CMK_SMP_TRACE_COMMTHREAD
79 #define CMK_SMP_TRACE_COMMTHREAD 0
80 #endif
81
82 #define CMK_TRACE_COMMOVERHEAD 0
83 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
84 #undef CMI_MPI_TRACE_USEREVENTS
85 #define CMI_MPI_TRACE_USEREVENTS 1
86 #else
87 #undef CMK_TRACE_COMMOVERHEAD
88 #define CMK_TRACE_COMMOVERHEAD 0
89 #endif
90
91 #include "machine.h"
92
93 #include "pcqueue.h"
94
95 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
96
97 #if CMK_BLUEGENEL
98 #define MAX_QLEN 8
99 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
100 #else
101 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
102 #define MAX_QLEN 200
103 #endif
104
105 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
106 CpvStaticDeclare(double, projTraceStart);
107 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
108 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
109 #else
110 # define  START_EVENT()
111 # define  END_EVENT(x)
112 #endif
113
114 /*
115     To reduce the buffer used in broadcast and distribute the load from
116   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
117   spanning tree broadcast algorithm.
118     This will use the fourth short in message as an indicator of spanning tree
119   root.
120 */
121 #define CMK_BROADCAST_SPANNING_TREE    1
122 #define CMK_BROADCAST_HYPERCUBE        0
123
124 #define BROADCAST_SPANNING_FACTOR      4
125 /* The number of children used when a msg is broadcast inside a node */
126 #define BROADCAST_SPANNING_INTRA_FACTOR      8
127
128 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
129 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
130
131 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
132 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
133
134 /* FIXME: need a random number that everyone agrees ! */
135 #define CHARM_MAGIC_NUMBER               126
136
137 #if CMK_ERROR_CHECKING
138 static int checksum_flag = 0;
139 #define CMI_SET_CHECKSUM(msg, len)      \
140         if (checksum_flag)  {   \
141           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
142           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
143         }
144 #define CMI_CHECK_CHECKSUM(msg, len)    \
145         if (checksum_flag)      \
146           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
147             CmiAbort("Fatal error: checksum doesn't agree!\n");
148 #else
149 #define CMI_SET_CHECKSUM(msg, len)
150 #define CMI_CHECK_CHECKSUM(msg, len)
151 #endif
152
153 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
154 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
155 #else
156 #define CMI_SET_BROADCAST_ROOT(msg, root) 
157 #endif
158
159
160 /** 
161     If MPI_POST_RECV is defined, we provide default values for size 
162     and number of posted recieves. If MPI_POST_RECV_COUNT is set
163     then a default value for MPI_POST_RECV_SIZE is used if not specified
164     by the user.
165 */
166 #ifdef MPI_POST_RECV
167 #define MPI_POST_RECV_COUNT 10
168 #undef MPI_POST_RECV
169 #endif
170 #if MPI_POST_RECV_COUNT > 0
171 #warning "Using MPI posted receives which have not yet been tested"
172 #ifndef MPI_POST_RECV_SIZE
173 #define MPI_POST_RECV_SIZE 200
174 #endif
175 /* #undef  MPI_POST_RECV_DEBUG  */
176 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
177 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
178 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
179 CpvDeclare(char*,CmiPostedRecvBuffers);
180 #endif
181
182 /*
183  to avoid MPI's in order delivery, changing MPI Tag all the time
184 */
185 #define TAG     1375
186
187 #if MPI_POST_RECV_COUNT > 0
188 #define POST_RECV_TAG TAG+1
189 #define BARRIER_ZERO_TAG TAG
190 #else
191 #define BARRIER_ZERO_TAG     1375
192 #endif
193
194 #include <signal.h>
195 void (*signal_int)(int);
196
197 /*
198 static int mpi_tag = TAG;
199 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
200 */
201
202 static int        _thread_provided = -1;
203 int               _Cmi_numpes;
204 int               _Cmi_mynode;    /* Which address space am I */
205 int               _Cmi_mynodesize;/* Number of processors in my address space */
206 int               _Cmi_numnodes;  /* Total number of address spaces */
207 int               _Cmi_numpes;    /* Total number of processors */
208 static int        Cmi_nodestart; /* First processor in this address space */
209 CpvDeclare(void*, CmiLocalQueue);
210
211 /*Network progress utility variables. Period controls the rate at
212   which the network poll is called */
213 CpvDeclare(unsigned , networkProgressCount);
214 int networkProgressPeriod;
215
216 int               idleblock = 0;
217
218 #define BLK_LEN  512
219
220 #if CMK_NODE_QUEUE_AVAILABLE
221 #define DGRAM_NODEMESSAGE   (0xFB)
222
223 #define NODE_BROADCAST_OTHERS (-1)
224 #define NODE_BROADCAST_ALL    (-2)
225 #endif
226
227 #if 0
228 static void **recdQueue_blk;
229 static unsigned int recdQueue_blk_len;
230 static unsigned int recdQueue_first;
231 static unsigned int recdQueue_len;
232 static void recdQueueInit(void);
233 static void recdQueueAddToBack(void *element);
234 static void *recdQueueRemoveFromFront(void);
235 #endif
236
237 static void ConverseRunPE(int everReturn);
238 static void CommunicationServer(int sleepTime);
239 static void CommunicationServerThread(int sleepTime);
240
241 typedef struct msg_list {
242      char *msg;
243      struct msg_list *next;
244      int size, destpe;
245
246 #if CMK_SMP_TRACE_COMMTHREAD
247         int srcpe;
248 #endif  
249         
250      MPI_Request req;
251 } SMSG_LIST;
252
253 int MsgQueueLen=0;
254 static int request_max;
255
256 static SMSG_LIST *sent_msgs=0;
257 static SMSG_LIST *end_sent=0;
258
259 static int Cmi_dim;
260
261 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
262
263 #if NODE_0_IS_CONVHOST
264 int inside_comm = 0;
265 #endif
266
267 void CmiAbort(const char *message);
268 static void PerrorExit(const char *msg);
269
270 void SendSpanningChildren(int size, char *msg);
271 void SendHypercube(int size, char *msg);
272
273 static void PerrorExit(const char *msg)
274 {
275   perror(msg);
276   exit(1);
277 }
278
279 extern unsigned char computeCheckSum(unsigned char *data, int len);
280
281 /**************************  TIMER FUNCTIONS **************************/
282
283 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
284
285 /* MPI calls are not threadsafe, even the timer on some machines */
286 static CmiNodeLock  timerLock = 0;
287 static int _absoluteTime = 0;
288 static double starttimer = 0;
289 static int _is_global = 0;
290
291 int CmiTimerIsSynchronized()
292 {
293   int  flag;
294   void *v;
295
296   /*  check if it using synchronized timer */
297   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
298     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
299   if (flag) {
300     _is_global = *(int*)v;
301     if (_is_global && CmiMyPe() == 0)
302       printf("Charm++> MPI timer is synchronized!\n");
303   }
304   return _is_global;
305 }
306
307 int CmiTimerAbsolute()
308 {       
309   return _absoluteTime;
310 }
311
312 double CmiStartTimer()
313 {
314   return 0.0;
315 }
316
317 double CmiInitTime()
318 {
319   return starttimer;
320 }
321
322 void CmiTimerInit(char **argv)
323 {
324   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
325
326   _is_global = CmiTimerIsSynchronized();
327
328   if (_is_global) {
329     if (CmiMyRank() == 0) {
330       double minTimer;
331 #if CMK_TIMER_USE_XT3_DCLOCK
332       starttimer = dclock();
333 #else
334       starttimer = MPI_Wtime();
335 #endif
336
337       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
338                                   MPI_COMM_WORLD );
339       starttimer = minTimer;
340     }
341   }
342   else {  /* we don't have a synchronous timer, set our own start time */
343     CmiBarrier();
344     CmiBarrier();
345     CmiBarrier();
346 #if CMK_TIMER_USE_XT3_DCLOCK
347     starttimer = dclock();
348 #else
349     starttimer = MPI_Wtime();
350 #endif
351   }
352
353 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
354   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
355     timerLock = CmiCreateLock();
356 #endif
357   CmiNodeAllBarrier();          /* for smp */
358 }
359
360 /**
361  * Since the timerLock is never created, and is
362  * always NULL, then all the if-condition inside
363  * the timer functions could be disabled right
364  * now in the case of SMP. --Chao Mei
365  */
366 double CmiTimer(void)
367 {
368   double t;
369 #if 0 && CMK_SMP
370   if (timerLock) CmiLock(timerLock);
371 #endif
372
373 #if CMK_TIMER_USE_XT3_DCLOCK
374   t = dclock();
375 #else
376   t = MPI_Wtime();
377 #endif
378
379 #if 0 && CMK_SMP
380   if (timerLock) CmiUnlock(timerLock);
381 #endif
382
383   return _absoluteTime?t: (t-starttimer);
384 }
385
386 double CmiWallTimer(void)
387 {
388   double t;
389 #if 0 && CMK_SMP
390   if (timerLock) CmiLock(timerLock);
391 #endif
392
393 #if CMK_TIMER_USE_XT3_DCLOCK
394   t = dclock();
395 #else
396   t = MPI_Wtime();
397 #endif
398
399 #if 0 && CMK_SMP
400   if (timerLock) CmiUnlock(timerLock);
401 #endif
402
403   return _absoluteTime? t: (t-starttimer);
404 }
405
406 double CmiCpuTimer(void)
407 {
408   double t;
409 #if 0 && CMK_SMP
410   if (timerLock) CmiLock(timerLock);
411 #endif
412 #if CMK_TIMER_USE_XT3_DCLOCK
413   t = dclock() - starttimer;
414 #else
415   t = MPI_Wtime() - starttimer;
416 #endif
417 #if 0 && CMK_SMP
418   if (timerLock) CmiUnlock(timerLock);
419 #endif
420   return t;
421 }
422
423 #endif
424
425 /* must be called on all ranks including comm thread in SMP */
426 int CmiBarrier()
427 {
428 #if CMK_SMP
429     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
430   CmiNodeAllBarrier();
431   if (CmiMyRank() == CmiMyNodeSize()) 
432 #else
433   if (CmiMyRank() == 0) 
434 #endif
435   {
436 /**
437  *  The call of CmiBarrier is usually before the initialization
438  *  of trace module of Charm++, therefore, the START_EVENT
439  *  and END_EVENT are disabled here. -Chao Mei
440  */     
441     /*START_EVENT();*/
442
443     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
444         CmiAbort("Timernit: MPI_Barrier failed!\n");
445
446     /*END_EVENT(10);*/
447   }
448   CmiNodeAllBarrier();
449   return 0;
450 }
451
452 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
453 int CmiBarrierZero()
454 {
455   int i;
456 #if CMK_SMP
457   if (CmiMyRank() == CmiMyNodeSize()) 
458 #else
459   if (CmiMyRank() == 0) 
460 #endif
461   {
462     char msg[1];
463     MPI_Status sts;
464     if (CmiMyNode() == 0)  {
465       for (i=0; i<CmiNumNodes()-1; i++) {
466          START_EVENT();
467
468          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
469             CmiPrintf("MPI_Recv failed!\n");
470
471          END_EVENT(30);
472       }
473     }
474     else {
475       START_EVENT();
476
477       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
478          printf("MPI_Send failed!\n");
479
480       END_EVENT(20);
481     }
482   }
483   CmiNodeAllBarrier();
484   return 0;
485 }
486
487 typedef struct ProcState {
488 #if MULTI_SENDQUEUE
489 PCQueue      sendMsgBuf;       /* per processor message sending queue */
490 #endif
491 CmiNodeLock  recvLock;              /* for cs->recv */
492 } ProcState;
493
494 static ProcState  *procState;
495
496 #if CMK_SMP
497
498 #if !MULTI_SENDQUEUE
499 static PCQueue sendMsgBuf;
500 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
501 #endif
502
503 #endif
504
505 /************************************************************
506  *
507  * Processor state structure
508  *
509  ************************************************************/
510
511 /* fake Cmi_charmrun_fd */
512 static int Cmi_charmrun_fd = 0;
513 #include "machine-smp.c"
514
515 CsvDeclare(CmiNodeState, NodeState);
516
517 #include "immediate.c"
518
519 #if CMK_SHARED_VARS_UNAVAILABLE
520 /************ non SMP **************/
521 static struct CmiStateStruct Cmi_state;
522 int _Cmi_mype;
523 int _Cmi_myrank;
524
525 void CmiMemLock() {}
526 void CmiMemUnlock() {}
527
528 #define CmiGetState() (&Cmi_state)
529 #define CmiGetStateN(n) (&Cmi_state)
530
531 void CmiYield(void) { sleep(0); }
532
533 static void CmiStartThreads(char **argv)
534 {
535   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
536   _Cmi_mype = Cmi_nodestart;
537   _Cmi_myrank = 0;
538 }
539 #endif  /* non smp */
540
541 /*Add a message to this processor's receive queue, pe is a rank */
542 void CmiPushPE(int pe,void *msg)
543 {
544   CmiState cs = CmiGetStateN(pe);
545   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
546 #if CMK_IMMEDIATE_MSG
547   if (CmiIsImmediate(msg)) {
548 /*
549 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
550     CmiHandleMessage(msg);
551 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
552 */
553     /**(CmiUInt2 *)msg = pe;*/
554     CMI_DEST_RANK(msg) = pe;
555     CmiPushImmediateMsg(msg);
556     return;
557   }
558 #endif
559
560 #if CMK_SMP
561   CmiLock(procState[pe].recvLock);
562 #endif
563   PCQueuePush(cs->recv,msg);
564 #if CMK_SMP
565   CmiUnlock(procState[pe].recvLock);
566 #endif
567   CmiIdleLock_addMessage(&cs->idle);
568   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
569 }
570
571 #if CMK_NODE_QUEUE_AVAILABLE
572 /*Add a message to this processor's receive queue */
573 static void CmiPushNode(void *msg)
574 {
575   MACHSTATE(3,"Pushing message into NodeRecv queue");
576 #if CMK_IMMEDIATE_MSG
577   if (CmiIsImmediate(msg)) {
578     CMI_DEST_RANK(msg) = 0;
579     CmiPushImmediateMsg(msg);
580     return;
581   }
582 #endif
583   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
584   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
585   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
586   {
587   CmiState cs=CmiGetStateN(0);
588   CmiIdleLock_addMessage(&cs->idle);
589   }
590 }
591 #endif
592
593 #ifndef CmiMyPe
594 int CmiMyPe(void)
595 {
596   return CmiGetState()->pe;
597 }
598 #endif
599
600 #ifndef CmiMyRank
601 int CmiMyRank(void)
602 {
603   return CmiGetState()->rank;
604 }
605 #endif
606
607 #ifndef CmiNodeFirst
608 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
609 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
610 #endif
611
612 #ifndef CmiNodeOf
613 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
614 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
615 #endif
616
617 static size_t CmiAllAsyncMsgsSent(void)
618 {
619    SMSG_LIST *msg_tmp = sent_msgs;
620    MPI_Status sts;
621    int done;
622
623    while(msg_tmp!=0) {
624     done = 0;
625     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
626       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
627     if(!done)
628       return 0;
629     msg_tmp = msg_tmp->next;
630 /*    MsgQueueLen--; ????? */
631    }
632    return 1;
633 }
634
635 int CmiAsyncMsgSent(CmiCommHandle c) {
636
637   SMSG_LIST *msg_tmp = sent_msgs;
638   int done;
639   MPI_Status sts;
640
641   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
642     msg_tmp = msg_tmp->next;
643   if(msg_tmp) {
644     done = 0;
645     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
646       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
647     return ((done)?1:0);
648   } else {
649     return 1;
650   }
651 }
652
653 void CmiReleaseCommHandle(CmiCommHandle c)
654 {
655   return;
656 }
657
658 #if CMK_BLUEGENEL
659 extern void MPID_Progress_test();
660 #endif
661
662 void CmiReleaseSentMessages(void)
663 {
664   SMSG_LIST *msg_tmp=sent_msgs;
665   SMSG_LIST *prev=0;
666   SMSG_LIST *temp;
667   int done;
668   MPI_Status sts;
669
670 #if CMK_BLUEGENEL
671   MPID_Progress_test();
672 #endif
673
674   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
675   while(msg_tmp!=0) {
676     done =0;
677 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
678     double startT = CmiWallTimer();
679 #endif
680     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
681       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
682     if(done) {
683       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
684       MsgQueueLen--;
685       /* Release the message */
686       temp = msg_tmp->next;
687       if(prev==0)  /* first message */
688         sent_msgs = temp;
689       else
690         prev->next = temp;
691       CmiFree(msg_tmp->msg);
692       CmiFree(msg_tmp);
693       msg_tmp = temp;
694     } else {
695       prev = msg_tmp;
696       msg_tmp = msg_tmp->next;
697     }
698 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
699     {
700     double endT = CmiWallTimer();
701     /* only record the event if it takes more than 1ms */
702     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
703     }
704 #endif
705   }
706   end_sent = prev;
707   MACHSTATE(2,"} CmiReleaseSentMessages end");
708 }
709
710 int PumpMsgs(void)
711 {
712   int nbytes, flg, res;
713   char *msg;
714   MPI_Status sts;
715   int recd=0;
716
717 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
718   int recvCnt=0;
719 #endif
720         
721 #if CMK_BLUEGENEL
722   MPID_Progress_test();
723 #endif
724
725   MACHSTATE(2,"PumpMsgs begin {");
726
727         
728   while(1) {
729 #if CMI_EXERT_RECV_CAP
730         if(recvCnt==RECV_CAP) break;
731 #elif CMI_DYNAMIC_EXERT_CAP
732         if(recvCnt == dynamicRecvCap) break;
733 #endif
734           
735     /* First check posted recvs then do  probe unmatched outstanding messages */
736 #if MPI_POST_RECV_COUNT > 0 
737     int completed_index=-1;
738     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
739         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
740     if(flg){
741         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
742             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
743
744         recd = 1;
745         msg = (char *) CmiAlloc(nbytes);
746         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
747         /* and repost the recv */
748
749         START_EVENT();
750
751         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
752             MPI_POST_RECV_SIZE,
753             MPI_BYTE,
754             MPI_ANY_SOURCE,
755             POST_RECV_TAG,
756             MPI_COMM_WORLD,
757             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
758                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
759
760         END_EVENT(50);
761
762         CpvAccess(Cmi_posted_recv_total)++;
763     }
764     else {
765         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
766         if(res != MPI_SUCCESS)
767         CmiAbort("MPI_Iprobe failed\n");
768         if(!flg) break;
769         recd = 1;
770         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
771         msg = (char *) CmiAlloc(nbytes);
772
773         START_EVENT();
774
775         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
776             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
777
778         END_EVENT(30);
779
780         CpvAccess(Cmi_unposted_recv_total)++;
781     }
782 #else
783     /* Original version */
784 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
785   double startT = CmiWallTimer(); 
786 #endif
787     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
788     if(res != MPI_SUCCESS)
789       CmiAbort("MPI_Iprobe failed\n");
790
791     if(!flg) break;
792 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
793     {
794     double endT = CmiWallTimer();
795     /* only trace the probe that last longer than 1ms */
796     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
797     }
798 #endif
799
800     recd = 1;
801     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
802     msg = (char *) CmiAlloc(nbytes);
803     
804     START_EVENT();
805
806     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
807       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
808
809     /*END_EVENT(30);*/
810
811 #endif
812
813 #if CMK_SMP_TRACE_COMMTHREAD
814         traceBeginCommOp(msg);
815         traceChangeLastTimestamp(CpvAccess(projTraceStart));
816         traceEndCommOp(msg);
817         #if CMI_MPI_TRACE_MOREDETAILED
818         char tmp[32];
819         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
820         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
821         #endif
822 #elif CMK_TRACE_COMMOVERHEAD
823         char tmp[32];
824         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
825         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
826 #endif
827         
828         
829     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
830     CMI_CHECK_CHECKSUM(msg, nbytes);
831 #if CMK_ERROR_CHECKING
832     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
833       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
834       CmiFree(msg);
835       CmiAbort("Abort!\n");
836       continue;
837     }
838 #endif
839         
840 #if CMK_BROADCAST_SPANNING_TREE
841     if (CMI_BROADCAST_ROOT(msg))
842       SendSpanningChildren(nbytes, msg);
843 #elif CMK_BROADCAST_HYPERCUBE
844     if (CMI_BROADCAST_ROOT(msg))
845       SendHypercube(nbytes, msg);
846 #endif
847         
848         /* In SMP mode, this push operation needs to be executed
849      * after forwarding broadcast messages. If it is executed
850      * earlier, then during the bcast msg forwarding period,    
851          * the msg could be already freed on the worker thread.
852          * As a result, the forwarded message could be wrong! 
853          * --Chao Mei
854          */
855 #if CMK_NODE_QUEUE_AVAILABLE
856     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
857       CmiPushNode(msg);
858     else
859 #endif
860         CmiPushPE(CMI_DEST_RANK(msg), msg);     
861         
862 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
863         recvCnt++;
864         /* check sendMsgBuf  to get the  number of messages that have not been sent */
865         /* MsgQueueLen indicates the number of messages that have not been released by MPI */
866         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD){
867                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
868         }
869 #endif  
870         
871   }
872
873   
874 #if CMK_IMMEDIATE_MSG && !CMK_SMP
875   CmiHandleImmediate();
876 #endif
877   
878   MACHSTATE(2,"} PumpMsgs end ");
879   return recd;
880 }
881
882 /* blocking version */
883 static void PumpMsgsBlocking(void)
884 {
885   static int maxbytes = 20000000;
886   static char *buf = NULL;
887   int nbytes, flg;
888   MPI_Status sts;
889   char *msg;
890   int recd=0;
891
892   if (!PCQueueEmpty(CmiGetState()->recv)) return;
893   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
894   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
895   if (sent_msgs)  return;
896
897 #if 0
898   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
899 #endif
900
901   if (buf == NULL) {
902     buf = (char *) CmiAlloc(maxbytes);
903     _MEMCHECK(buf);
904   }
905
906
907 #if MPI_POST_RECV_COUNT > 0
908 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
909 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
910 #endif
911
912   START_EVENT();
913
914   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
915       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
916
917   /*END_EVENT(30);*/
918     
919    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
920    msg = (char *) CmiAlloc(nbytes);
921    memcpy(msg, buf, nbytes);
922
923 #if CMK_SMP_TRACE_COMMTHREAD
924         traceBeginCommOp(msg);
925         traceChangeLastTimestamp(CpvAccess(projTraceStart));
926         traceEndCommOp(msg);
927         #if CMI_MPI_TRACE_MOREDETAILED
928         char tmp[32];
929         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
930         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
931         #endif
932 #endif
933
934 #if CMK_BROADCAST_SPANNING_TREE
935    if (CMI_BROADCAST_ROOT(msg))
936       SendSpanningChildren(nbytes, msg);
937 #elif CMK_BROADCAST_HYPERCUBE
938    if (CMI_BROADCAST_ROOT(msg))
939       SendHypercube(nbytes, msg);
940 #endif
941   
942         /* In SMP mode, this push operation needs to be executed
943      * after forwarding broadcast messages. If it is executed
944      * earlier, then during the bcast msg forwarding period,    
945          * the msg could be already freed on the worker thread.
946          * As a result, the forwarded message could be wrong! 
947          * --Chao Mei
948          */  
949 #if CMK_NODE_QUEUE_AVAILABLE
950    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
951       CmiPushNode(msg);
952    else
953 #endif
954       CmiPushPE(CMI_DEST_RANK(msg), msg);
955 }
956
957 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
958
959 #if CMK_SMP
960
961 static int inexit = 0;
962 static CmiNodeLock  exitLock = 0;
963
964 static int MsgQueueEmpty()
965 {
966   int i;
967 #if MULTI_SENDQUEUE
968   for (i=0; i<_Cmi_mynodesize; i++)
969     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
970 #else
971   return PCQueueEmpty(sendMsgBuf);
972 #endif
973   return 1;
974 }
975
976 static int SendMsgBuf();
977
978 /* test if all processors recv queues are empty */
979 static int RecvQueueEmpty()
980 {
981   int i;
982   for (i=0; i<_Cmi_mynodesize; i++) {
983     CmiState cs=CmiGetStateN(i);
984     if (!PCQueueEmpty(cs->recv)) return 0;
985   }
986   return 1;
987 }
988
989 /**
990 CommunicationServer calls MPI to send messages in the queues and probe message from network.
991 */
992
993 #define REPORT_COMM_METRICS 0
994 #if REPORT_COMM_METRICS
995 static double pumptime = 0.0;
996 static double releasetime = 0.0;
997 static double sendtime = 0.0;
998 #endif
999
1000 static void CommunicationServer(int sleepTime)
1001 {
1002   int static count=0;
1003 /*
1004   count ++;
1005   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1006 */
1007 #if REPORT_COMM_METRICS
1008   double t1, t2, t3, t4;
1009   t1 = CmiWallTimer();
1010 #endif
1011   PumpMsgs();
1012 #if REPORT_COMM_METRICS
1013   t2 = CmiWallTimer();
1014 #endif
1015   CmiReleaseSentMessages();
1016 #if REPORT_COMM_METRICS
1017   t3 = CmiWallTimer();
1018 #endif
1019   SendMsgBuf();
1020 #if REPORT_COMM_METRICS
1021   t4 = CmiWallTimer();
1022   pumptime += (t2-t1);
1023   releasetime += (t3-t2);
1024   sendtime += (t4-t3);
1025 #endif
1026 /*
1027   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1028 */
1029   if (inexit == CmiMyNodeSize()) {
1030     MACHSTATE(2, "CommunicationServer exiting {");
1031 #if 0
1032     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1033 #endif
1034     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1035       CmiReleaseSentMessages();
1036       SendMsgBuf();
1037       PumpMsgs();
1038     }
1039     MACHSTATE(2, "CommunicationServer barrier begin {");
1040
1041     START_EVENT();
1042
1043     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1044       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1045
1046     END_EVENT(10);
1047
1048     MACHSTATE(2, "} CommunicationServer barrier end");
1049 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1050     if (CmiMyNode() == 0){
1051       CmiPrintf("End of program\n");
1052     }
1053 #endif
1054     MACHSTATE(2, "} CommunicationServer EXIT");
1055
1056     ConverseCommonExit();   
1057 #if REPORT_COMM_METRICS
1058     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1059 #endif
1060
1061 #if ! CMK_AUTOBUILD
1062     signal(SIGINT, signal_int);
1063     MPI_Finalize();
1064     #endif
1065     exit(0);
1066   }
1067 }
1068
1069 #endif
1070
1071 static void CommunicationServerThread(int sleepTime)
1072 {
1073 #if CMK_SMP
1074   CommunicationServer(sleepTime);
1075 #endif
1076 #if CMK_IMMEDIATE_MSG
1077   CmiHandleImmediate();
1078 #endif
1079 }
1080
1081 #if CMK_NODE_QUEUE_AVAILABLE
1082 char *CmiGetNonLocalNodeQ(void)
1083 {
1084   CmiState cs = CmiGetState();
1085   char *result = 0;
1086   CmiIdleLock_checkMessage(&cs->idle);
1087 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1088     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1089     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1090     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1091     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1092     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1093 /*  }  */
1094
1095   return result;
1096 }
1097 #endif
1098
1099 void *CmiGetNonLocal(void)
1100 {
1101   static int count=0;
1102   CmiState cs = CmiGetState();
1103   void *msg;
1104
1105 #if ! CMK_SMP
1106   if (CmiNumPes() == 1) return NULL;
1107 #endif
1108
1109   CmiIdleLock_checkMessage(&cs->idle);
1110   /* although it seems that lock is not needed, I found it crashes very often
1111      on mpi-smp without lock */
1112
1113 #if ! CMK_SMP
1114   CmiReleaseSentMessages();
1115   PumpMsgs();
1116 #endif
1117
1118   /* CmiLock(procState[cs->rank].recvLock); */
1119   msg =  PCQueuePop(cs->recv);
1120   /* CmiUnlock(procState[cs->rank].recvLock); */
1121
1122 /*
1123   if (msg) {
1124     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1125   else {
1126     count++;
1127     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1128   }
1129 */
1130 #if ! CMK_SMP
1131   if (no_outstanding_sends) {
1132     while (MsgQueueLen>0) {
1133       CmiReleaseSentMessages();
1134       PumpMsgs();
1135     }
1136   }
1137
1138   if(!msg) {
1139     CmiReleaseSentMessages();
1140     if (PumpMsgs())
1141       return  PCQueuePop(cs->recv);
1142     else
1143       return 0;
1144   }
1145 #endif
1146
1147   return msg;
1148 }
1149
1150 /* called in non-smp mode */
1151 void CmiNotifyIdle(void)
1152 {
1153   CmiReleaseSentMessages();
1154   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1155 }
1156
1157
1158 /********************************************************
1159     The call to probe immediate messages has been renamed to
1160     CmiMachineProgressImpl
1161 ******************************************************/
1162 /* user call to handle immediate message, only useful in non SMP version
1163    using polling method to schedule message.
1164 */
1165 /*
1166 #if CMK_IMMEDIATE_MSG
1167 void CmiProbeImmediateMsg()
1168 {
1169 #if !CMK_SMP
1170   PumpMsgs();
1171   CmiHandleImmediate();
1172 #endif
1173 }
1174 #endif
1175 */
1176
1177 /* Network progress function is used to poll the network when for
1178    messages. This flushes receive buffers on some  implementations*/
1179 #if CMK_MACHINE_PROGRESS_DEFINED
1180 void CmiMachineProgressImpl()
1181 {
1182 #if !CMK_SMP
1183     PumpMsgs();
1184 #if CMK_IMMEDIATE_MSG
1185     CmiHandleImmediate();
1186 #endif
1187 #else
1188     /*Not implemented yet. Communication server does not seem to be
1189       thread safe, so only communication thread call it */
1190     if (CmiMyRank() == CmiMyNodeSize())
1191         CommunicationServerThread(0);
1192 #endif
1193 }
1194 #endif
1195
1196 /********************* MESSAGE SEND FUNCTIONS ******************/
1197
1198 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1199
1200 static void CmiSendSelf(char *msg)
1201 {
1202 #if CMK_IMMEDIATE_MSG
1203     if (CmiIsImmediate(msg)) {
1204       /* CmiBecomeNonImmediate(msg); */
1205       CmiPushImmediateMsg(msg);
1206       CmiHandleImmediate();
1207       return;
1208     }
1209 #endif
1210     CQdCreate(CpvAccess(cQdState), 1);
1211     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1212 }
1213
1214 void CmiSyncSendFn(int destPE, int size, char *msg)
1215 {
1216   CmiState cs = CmiGetState();
1217   char *dupmsg = (char *) CmiAlloc(size);
1218   memcpy(dupmsg, msg, size);
1219
1220   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1221
1222   if (cs->pe==destPE) {
1223     CmiSendSelf(dupmsg);
1224   }
1225   else
1226     CmiAsyncSendFn_(destPE, size, dupmsg);
1227 }
1228
1229 #if CMK_SMP
1230
1231 /* called by communication thread in SMP */
1232 static int SendMsgBuf()
1233 {
1234   SMSG_LIST *msg_tmp;
1235   char *msg;
1236   int node, rank, size;
1237   int i;
1238   int sent = 0;
1239
1240 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1241         int sentCnt = 0;
1242 #endif  
1243         
1244   MACHSTATE(2,"SendMsgBuf begin {");
1245 #if MULTI_SENDQUEUE
1246   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1247   {
1248     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1249     {
1250       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1251 #else
1252     /* single message sending queue */
1253     /* CmiLock(sendMsgBufLock); */
1254     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1255     /* CmiUnlock(sendMsgBufLock); */
1256     while (NULL != msg_tmp)
1257     {
1258 #endif
1259       node = msg_tmp->destpe;
1260       size = msg_tmp->size;
1261       msg = msg_tmp->msg;
1262       msg_tmp->next = 0;
1263                 
1264 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1265       while (MsgQueueLen > request_max) {
1266                 CmiReleaseSentMessages();
1267                 PumpMsgs();
1268       }
1269 #endif
1270           
1271       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1272 #if CMK_ERROR_CHECKING
1273       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1274 #endif
1275       CMI_SET_CHECKSUM(msg, size);
1276
1277 #if MPI_POST_RECV_COUNT > 0
1278         if(size <= MPI_POST_RECV_SIZE){
1279
1280           START_EVENT();
1281           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1282                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1283
1284           STOP_EVENT(40);
1285         }
1286         else {
1287             START_EVENT();
1288             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1289                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1290             STOP_EVENT(40);
1291         }
1292 #else
1293         START_EVENT();
1294         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1295             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1296         /*END_EVENT(40);*/
1297 #endif
1298         
1299 #if CMK_SMP_TRACE_COMMTHREAD
1300         traceBeginCommOp(msg);
1301         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1302         /* traceSendMsgComm must execute after traceBeginCommOp because
1303          * we pretend we execute an entry method, and inside this we
1304          * pretend we will send another message. Otherwise how could
1305          * a message creation just before an entry method invocation?
1306          * If such logic is broken, the projections will not trace
1307          * messages correctly! -Chao Mei
1308          */
1309         traceSendMsgComm(msg);
1310         traceEndCommOp(msg);
1311         #if CMI_MPI_TRACE_MOREDETAILED
1312         char tmp[64];
1313         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1314         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1315         #endif
1316 #endif
1317                 
1318                 
1319       MACHSTATE(3,"}MPI_send end");
1320       MsgQueueLen++;
1321       if(sent_msgs==0)
1322         sent_msgs = msg_tmp;
1323       else
1324         end_sent->next = msg_tmp;
1325       end_sent = msg_tmp;
1326       sent=1;
1327           
1328 #if CMI_EXERT_SEND_CAP    
1329           if(++sentCnt == SEND_CAP) break;
1330 #elif CMI_DYNAMIC_EXERT_CAP
1331           if(++sentCnt == dynamicSendCap) break;
1332           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1333                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1334 #endif    
1335           
1336 #if ! MULTI_SENDQUEUE
1337       /* CmiLock(sendMsgBufLock); */
1338       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1339       /* CmiUnlock(sendMsgBufLock); */
1340 #endif
1341     }
1342 #if MULTI_SENDQUEUE
1343   }
1344 #endif
1345   MACHSTATE(2,"}SendMsgBuf end ");
1346   return sent;
1347 }
1348
1349 void EnqueueMsg(void *m, int size, int node)
1350 {
1351   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1352   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1353   msg_tmp->msg = m;
1354   msg_tmp->size = size;
1355   msg_tmp->destpe = node;
1356         
1357 #if CMK_SMP_TRACE_COMMTHREAD
1358         msg_tmp->srcpe = CmiMyPe();
1359 #endif  
1360
1361 #if MULTI_SENDQUEUE
1362   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1363 #else
1364   CmiLock(sendMsgBufLock);
1365   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1366   CmiUnlock(sendMsgBufLock);
1367 #endif
1368         
1369   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1370 }
1371
1372 #endif
1373
1374 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1375 {
1376   CmiState cs = CmiGetState();
1377   SMSG_LIST *msg_tmp;
1378   CmiUInt2  rank, node;
1379
1380   if(destPE == cs->pe) {
1381     char *dupmsg = (char *) CmiAlloc(size);
1382     memcpy(dupmsg, msg, size);
1383     CmiSendSelf(dupmsg);
1384     return 0;
1385   }
1386   CQdCreate(CpvAccess(cQdState), 1);
1387 #if CMK_SMP
1388   node = CmiNodeOf(destPE);
1389   rank = CmiRankOf(destPE);
1390   if (node == CmiMyNode())  {
1391     CmiPushPE(rank, msg);
1392     return 0;
1393   }
1394   CMI_DEST_RANK(msg) = rank;
1395   EnqueueMsg(msg, size, node);
1396   return 0;
1397 #else
1398   /* non smp */
1399   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1400   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1401   msg_tmp->msg = msg;
1402   msg_tmp->next = 0;
1403   while (MsgQueueLen > request_max) {
1404         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1405         CmiReleaseSentMessages();
1406         PumpMsgs();
1407   }
1408 #if CMK_ERROR_CHECKING
1409   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1410 #endif
1411   CMI_SET_CHECKSUM(msg, size);
1412
1413 #if MPI_POST_RECV_COUNT > 0
1414         if(size <= MPI_POST_RECV_SIZE){
1415
1416           START_EVENT();
1417           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1418                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1419           END_EVENT(40);
1420         }
1421         else {
1422           START_EVENT();
1423           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1424                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1425           END_EVENT(40);
1426         }
1427 #else
1428   START_EVENT();
1429   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1430     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1431   /*END_EVENT(40);*/
1432   #if CMK_TRACE_COMMOVERHEAD
1433         char tmp[64];
1434         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1435         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1436   #endif
1437 #endif
1438
1439   MsgQueueLen++;
1440   if(sent_msgs==0)
1441     sent_msgs = msg_tmp;
1442   else
1443     end_sent->next = msg_tmp;
1444   end_sent = msg_tmp;
1445   return (CmiCommHandle) &(msg_tmp->req);
1446 #endif              /* non-smp */
1447 }
1448
1449 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1450 {
1451   CMI_SET_BROADCAST_ROOT(msg, 0);
1452   CmiAsyncSendFn_(destPE, size, msg);
1453 }
1454
1455 void CmiFreeSendFn(int destPE, int size, char *msg)
1456 {
1457   CmiState cs = CmiGetState();
1458   CMI_SET_BROADCAST_ROOT(msg, 0);
1459
1460   if (cs->pe==destPE) {
1461     CmiSendSelf(msg);
1462   } else {
1463     CmiAsyncSendFn_(destPE, size, msg);
1464   }
1465 }
1466
1467 /*********************** BROADCAST FUNCTIONS **********************/
1468
1469 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1470 void CmiSyncSendFn1(int destPE, int size, char *msg)
1471 {
1472   CmiState cs = CmiGetState();
1473   char *dupmsg = (char *) CmiAlloc(size);
1474   memcpy(dupmsg, msg, size);
1475   if (cs->pe==destPE)
1476     CmiSendSelf(dupmsg);
1477   else
1478     CmiAsyncSendFn_(destPE, size, dupmsg);
1479 }
1480
1481 /* send msg to its spanning children in broadcast. G. Zheng */
1482 void SendSpanningChildren(int size, char *msg)
1483 {
1484   CmiState cs = CmiGetState();
1485   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1486   int startnode = CmiNodeOf(startpe);
1487   int i, exceptRank;
1488         
1489    /* first send msgs to other nodes */
1490   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1491   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1492     int nd = CmiMyNode()-startnode;
1493     if (nd<0) nd+=CmiNumNodes();
1494     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1495     if (nd > CmiNumNodes() - 1) break;
1496     nd += startnode;
1497     nd = nd%CmiNumNodes();
1498     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1499         #if CMK_SMP
1500         /* always send to the first rank of other nodes */
1501         char *newmsg = CmiCopyMsg(msg, size);
1502         CMI_DEST_RANK(newmsg) = 0;
1503     EnqueueMsg(newmsg, size, nd);
1504         #else
1505         CmiSyncSendFn1(nd, size, msg);
1506         #endif
1507   }
1508 #if CMK_SMP  
1509    /* second send msgs to my peers on this node */
1510   /* FIXME: now it's just a flat p2p send!! When node size is large,
1511    * it should also be sent in a tree
1512    */
1513    exceptRank = CMI_DEST_RANK(msg);
1514    for(i=0; i<exceptRank; i++){
1515            CmiPushPE(i, CmiCopyMsg(msg, size));
1516    }
1517    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1518            CmiPushPE(i, CmiCopyMsg(msg, size));
1519    }
1520 #endif
1521 }
1522
1523 #include <math.h>
1524
1525 /* send msg along the hypercube in broadcast. (Sameer) */
1526 void SendHypercube(int size, char *msg)
1527 {
1528   CmiState cs = CmiGetState();
1529   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1530   int startnode = CmiNodeOf(startpe);
1531   int i, exceptRank, cnt, tmp, relPE;
1532   int dims=0;
1533
1534   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1535   tmp = CmiNumNodes()-1;
1536   while(tmp>0){
1537           dims++;
1538           tmp = tmp >> 1;
1539   }
1540   if(CmiNumNodes()==1) dims=1;
1541   
1542    /* first send msgs to other nodes */  
1543   relPE = CmiMyNode()-startnode;
1544   if(relPE < 0) relPE += CmiNumNodes();
1545   cnt=0;
1546   tmp = relPE;
1547   /* count how many zeros (in binary format) relPE has */
1548   for(i=0; i<dims; i++, cnt++){
1549     if(tmp & 1 == 1) break;
1550     tmp = tmp >> 1;
1551   }
1552   
1553   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1554   for (i = cnt-1; i >= 0; i--) {
1555     int nd = relPE + (1 << i);
1556         if(nd >= CmiNumNodes()) continue;
1557         nd = (nd+startnode)%CmiNumNodes();
1558         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1559 #if CMK_SMP
1560     /* always send to the first rank of other nodes */
1561     char *newmsg = CmiCopyMsg(msg, size);
1562     CMI_DEST_RANK(newmsg) = 0;
1563     EnqueueMsg(newmsg, size, nd);
1564 #else
1565         CmiSyncSendFn1(nd, size, msg);
1566 #endif
1567   }
1568   
1569 #if CMK_SMP
1570    /* second send msgs to my peers on this node */
1571    /* FIXME: now it's just a flat p2p send!! When node size is large,
1572     * it should also be sent in a tree
1573     */
1574    exceptRank = CMI_DEST_RANK(msg);
1575    for(i=0; i<exceptRank; i++){
1576            CmiPushPE(i, CmiCopyMsg(msg, size));
1577    }
1578    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1579            CmiPushPE(i, CmiCopyMsg(msg, size));
1580    }
1581 #endif
1582 }
1583
1584 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1585 {
1586   CmiState cs = CmiGetState();
1587
1588 #if CMK_SMP     
1589   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1590   CMI_DEST_RANK(msg) = CmiMyRank();
1591 #endif
1592         
1593 #if CMK_BROADCAST_SPANNING_TREE
1594   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1595   SendSpanningChildren(size, msg);
1596
1597 #elif CMK_BROADCAST_HYPERCUBE
1598   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1599   SendHypercube(size, msg);
1600
1601 #else
1602   int i;
1603
1604   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1605     CmiSyncSendFn(i, size,msg) ;
1606   for ( i=0; i<cs->pe; i++ )
1607     CmiSyncSendFn(i, size,msg) ;
1608 #endif
1609
1610   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1611 }
1612
1613
1614 /*  FIXME: luckily async is never used  G. Zheng */
1615 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1616 {
1617   CmiState cs = CmiGetState();
1618   int i ;
1619
1620   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1621     CmiAsyncSendFn(i,size,msg) ;
1622   for ( i=0; i<cs->pe; i++ )
1623     CmiAsyncSendFn(i,size,msg) ;
1624
1625   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1626   CmiAbort("CmiAsyncBroadcastFn should never be called");
1627   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1628 }
1629
1630 void CmiFreeBroadcastFn(int size, char *msg)
1631 {
1632    CmiSyncBroadcastFn(size,msg);
1633    CmiFree(msg);
1634 }
1635
1636 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1637 {
1638
1639 #if CMK_SMP     
1640   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1641   CMI_DEST_RANK(msg) = CmiMyRank();
1642 #endif
1643
1644 #if CMK_BROADCAST_SPANNING_TREE
1645   CmiState cs = CmiGetState();
1646   CmiSyncSendFn(cs->pe, size,msg) ;
1647   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1648   SendSpanningChildren(size, msg);
1649
1650 #elif CMK_BROADCAST_HYPERCUBE
1651   CmiState cs = CmiGetState();
1652   CmiSyncSendFn(cs->pe, size,msg) ;
1653   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1654   SendHypercube(size, msg);
1655
1656 #else
1657     int i ;
1658
1659   for ( i=0; i<_Cmi_numpes; i++ )
1660     CmiSyncSendFn(i,size,msg) ;
1661 #endif
1662
1663   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1664 }
1665
1666 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1667 {
1668   int i ;
1669
1670   for ( i=1; i<_Cmi_numpes; i++ )
1671     CmiAsyncSendFn(i,size,msg) ;
1672
1673   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1674
1675   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1676 }
1677
1678 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1679 {
1680 #if CMK_SMP     
1681   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1682   CMI_DEST_RANK(msg) = CmiMyRank();
1683 #endif
1684
1685 #if CMK_BROADCAST_SPANNING_TREE
1686   CmiState cs = CmiGetState();
1687   CmiSyncSendFn(cs->pe, size,msg) ;
1688   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1689   SendSpanningChildren(size, msg);
1690
1691 #elif CMK_BROADCAST_HYPERCUBE
1692   CmiState cs = CmiGetState();
1693   CmiSyncSendFn(cs->pe, size,msg) ;
1694   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1695   SendHypercube(size, msg);
1696
1697 #else
1698   int i ;
1699
1700   for ( i=0; i<_Cmi_numpes; i++ )
1701     CmiSyncSendFn(i,size,msg) ;
1702 #endif
1703   CmiFree(msg) ;
1704   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1705 }
1706
1707 #if CMK_NODE_QUEUE_AVAILABLE
1708
1709 static void CmiSendNodeSelf(char *msg)
1710 {
1711 #if CMK_IMMEDIATE_MSG
1712 #if 0
1713     if (CmiIsImmediate(msg) && !_immRunning) {
1714       /*CmiHandleImmediateMessage(msg); */
1715       CmiPushImmediateMsg(msg);
1716       CmiHandleImmediate();
1717       return;
1718     }
1719 #endif
1720     if (CmiIsImmediate(msg))
1721     {
1722       CmiPushImmediateMsg(msg);
1723       if (!_immRunning) CmiHandleImmediate();
1724       return;
1725     }
1726 #endif
1727     CQdCreate(CpvAccess(cQdState), 1);
1728     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1729     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1730     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1731 }
1732
1733 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1734 {
1735   int i;
1736   SMSG_LIST *msg_tmp;
1737   char *dupmsg;
1738
1739   CMI_SET_BROADCAST_ROOT(msg, 0);
1740   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1741   switch (dstNode) {
1742   case NODE_BROADCAST_ALL:
1743     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1744   case NODE_BROADCAST_OTHERS:
1745     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1746     for (i=0; i<_Cmi_numnodes; i++)
1747       if (i!=_Cmi_mynode) {
1748         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1749       }
1750     break;
1751   default:
1752     dupmsg = (char *)CmiCopyMsg(msg,size);
1753     if(dstNode == _Cmi_mynode) {
1754       CmiSendNodeSelf(dupmsg);
1755     }
1756     else {
1757       CQdCreate(CpvAccess(cQdState), 1);
1758       EnqueueMsg(dupmsg, size, dstNode);
1759     }
1760   }
1761   return 0;
1762 }
1763
1764 void CmiSyncNodeSendFn(int p, int s, char *m)
1765 {
1766   CmiAsyncNodeSendFn(p, s, m);
1767 }
1768
1769 /* need */
1770 void CmiFreeNodeSendFn(int p, int s, char *m)
1771 {
1772   CmiAsyncNodeSendFn(p, s, m);
1773   CmiFree(m);
1774 }
1775
1776 /* need */
1777 void CmiSyncNodeBroadcastFn(int s, char *m)
1778 {
1779   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1780 }
1781
1782 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1783 {
1784 }
1785
1786 /* need */
1787 void CmiFreeNodeBroadcastFn(int s, char *m)
1788 {
1789   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1790   CmiFree(m);
1791 }
1792
1793 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1794 {
1795   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1796 }
1797
1798 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1799 {
1800   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1801 }
1802
1803 /* need */
1804 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1805 {
1806   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1807   CmiFree(m);
1808 }
1809 #endif
1810
1811 /************************** MAIN ***********************************/
1812 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1813
1814 void ConverseExit(void)
1815 {
1816 #if ! CMK_SMP
1817   while(!CmiAllAsyncMsgsSent()) {
1818     PumpMsgs();
1819     CmiReleaseSentMessages();
1820   }
1821   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1822     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1823
1824   ConverseCommonExit();
1825 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1826   if (CmiMyPe() == 0){
1827     CmiPrintf("End of program\n");
1828 #if MPI_POST_RECV_COUNT > 0
1829     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1830 #endif
1831 }
1832 #endif
1833 #if ! CMK_AUTOBUILD
1834   signal(SIGINT, signal_int);
1835   MPI_Finalize();
1836 #endif
1837   exit(0);
1838
1839 #else
1840     /* SMP version, communication thread will exit */
1841   ConverseCommonExit();
1842   /* atomic increment */
1843   CmiLock(exitLock);
1844   inexit++;
1845   CmiUnlock(exitLock);
1846   while (1) CmiYield();
1847 #endif
1848 }
1849
1850 static void registerMPITraceEvents() {
1851 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1852     traceRegisterUserEvent("MPI_Barrier", 10);
1853     traceRegisterUserEvent("MPI_Send", 20);
1854     traceRegisterUserEvent("MPI_Recv", 30);
1855     traceRegisterUserEvent("MPI_Isend", 40);
1856     traceRegisterUserEvent("MPI_Irecv", 50);
1857     traceRegisterUserEvent("MPI_Test", 60);
1858     traceRegisterUserEvent("MPI_Iprobe", 70);
1859 #endif
1860 }
1861
1862
1863 static char     **Cmi_argv;
1864 static char     **Cmi_argvcopy;
1865 static CmiStartFn Cmi_startfn;   /* The start function */
1866 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1867
1868 typedef struct {
1869   int sleepMs; /*Milliseconds to sleep while idle*/
1870   int nIdles; /*Number of times we've been idle in a row*/
1871   CmiState cs; /*Machine state*/
1872 } CmiIdleState;
1873
1874 static CmiIdleState *CmiNotifyGetState(void)
1875 {
1876   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1877   s->sleepMs=0;
1878   s->nIdles=0;
1879   s->cs=CmiGetState();
1880   return s;
1881 }
1882
1883 static void CmiNotifyBeginIdle(CmiIdleState *s)
1884 {
1885   s->sleepMs=0;
1886   s->nIdles=0;
1887 }
1888
1889 static void CmiNotifyStillIdle(CmiIdleState *s)
1890 {
1891 #if ! CMK_SMP
1892   CmiReleaseSentMessages();
1893   PumpMsgs();
1894 #else
1895 /*  CmiYield();  */
1896 #endif
1897
1898 #if 1
1899   {
1900   int nSpins=20; /*Number of times to spin before sleeping*/
1901   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1902   s->nIdles++;
1903   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1904     s->sleepMs+=2;
1905     if (s->sleepMs>10) s->sleepMs=10;
1906   }
1907   /*Comm. thread will listen on sockets-- just sleep*/
1908   if (s->sleepMs>0) {
1909     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1910     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1911     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1912   }
1913   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1914   }
1915 #endif
1916 }
1917
1918 #if MACHINE_DEBUG_LOG
1919 FILE *debugLog = NULL;
1920 #endif
1921
1922 static int machine_exit_idx;
1923 static void machine_exit(char *m) {
1924   EmergencyExit();
1925   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1926   fflush(stdout);
1927   CmiNodeBarrier();
1928   if (CmiMyRank() == 0) {
1929     MPI_Barrier(MPI_COMM_WORLD);
1930     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1931     MPI_Abort(MPI_COMM_WORLD, 1);
1932   } else {
1933     while (1) CmiYield();
1934   }
1935 }
1936
1937 static void KillOnAllSigs(int sigNo) {
1938   static int already_in_signal_handler = 0;
1939   char *m;
1940   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1941   already_in_signal_handler = 1;
1942 #if CMK_CCS_AVAILABLE
1943   if (CpvAccess(cmiArgDebugFlag)) {
1944     CpdNotify(CPD_SIGNAL, sigNo);
1945     CpdFreeze();
1946   }
1947 #endif
1948   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1949       "Signal: %d\n",CmiMyPe(),sigNo);
1950   CmiPrintStackTrace(1);
1951
1952   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1953   CmiSetHandler(m, machine_exit_idx);
1954   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1955   machine_exit(m);
1956 }
1957
1958 static void ConverseRunPE(int everReturn)
1959 {
1960   CmiIdleState *s=CmiNotifyGetState();
1961   CmiState cs;
1962   char** CmiMyArgv;
1963
1964   CmiNodeAllBarrier();
1965
1966   cs = CmiGetState();
1967   CpvInitialize(void *,CmiLocalQueue);
1968   CpvAccess(CmiLocalQueue) = cs->localqueue;
1969
1970   if (CmiMyRank())
1971     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1972   else
1973     CmiMyArgv=Cmi_argv;
1974
1975   CthInit(CmiMyArgv);
1976
1977   ConverseCommonInit(CmiMyArgv);
1978   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1979
1980 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1981   CpvInitialize(double, projTraceStart);
1982   /* only PE 0 needs to care about registration (to generate sts file). */
1983   if (CmiMyPe() == 0) {
1984     registerMachineUserEventsFunction(&registerMPITraceEvents);
1985   }
1986 #endif
1987
1988   /* initialize the network progress counter*/
1989   /* Network progress function is used to poll the network when for
1990      messages. This flushes receive buffers on some  implementations*/
1991   CpvInitialize(int , networkProgressCount);
1992   CpvAccess(networkProgressCount) = 0;
1993
1994 #if CMK_SMP
1995   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1996   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1997 #else
1998   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1999 #endif
2000
2001 #if MACHINE_DEBUG_LOG
2002   if (CmiMyRank() == 0) {
2003     char ln[200];
2004     sprintf(ln,"debugLog.%d",CmiMyNode());
2005     debugLog=fopen(ln,"w");
2006   }
2007 #endif
2008
2009   /* Converse initialization finishes, immediate messages can be processed.
2010      node barrier previously should take care of the node synchronization */
2011   _immediateReady = 1;
2012
2013   /* communication thread */
2014   if (CmiMyRank() == CmiMyNodeSize()) {
2015     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2016     while (1) CommunicationServerThread(5);
2017   }
2018   else {  /* worker thread */
2019   if (!everReturn) {
2020     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2021     if (Cmi_usrsched==0) CsdScheduler(-1);
2022     ConverseExit();
2023   }
2024   }
2025 }
2026
2027 static char *thread_level_tostring(int thread_level)
2028 {
2029 #if CMK_MPI_INIT_THREAD
2030   switch (thread_level) {
2031   case MPI_THREAD_SINGLE:
2032       return "MPI_THREAD_SINGLE";
2033   case MPI_THREAD_FUNNELED:
2034       return "MPI_THREAD_FUNNELED";
2035   case MPI_THREAD_SERIALIZED:
2036       return "MPI_THREAD_SERIALIZED";
2037   case MPI_THREAD_MULTIPLE :
2038       return "MPI_THREAD_MULTIPLE ";
2039   default: {
2040       char *str = (char*)malloc(5);
2041       sprintf(str,"%d", thread_level);
2042       return str;
2043       }
2044   }
2045   return  "unknown";
2046 #else
2047   char *str = (char*)malloc(5);
2048   sprintf(str,"%d", thread_level);
2049   return str;
2050 #endif
2051 }
2052
2053 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2054 {
2055   int n,i;
2056   int ver, subver;
2057   int provided;
2058   int thread_level;
2059
2060 #if MACHINE_DEBUG
2061   debugLog=NULL;
2062 #endif
2063 #if CMK_USE_HP_MAIN_FIX
2064 #if FOR_CPLUS
2065   _main(argc,argv);
2066 #endif
2067 #endif
2068
2069 #if CMK_MPI_INIT_THREAD
2070 #if CMK_SMP
2071   thread_level = MPI_THREAD_FUNNELED;
2072 #else
2073   thread_level = MPI_THREAD_SINGLE;
2074 #endif
2075   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2076   _thread_provided = provided;
2077 #else
2078   MPI_Init(&argc, &argv);
2079   thread_level = 0;
2080   provided = -1;
2081 #endif
2082   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2083   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2084
2085   MPI_Get_version(&ver, &subver);
2086   if (_Cmi_mynode == 0) {
2087     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2088   }
2089
2090   /* processor per node */
2091   _Cmi_mynodesize = 1;
2092   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2093     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2094 #if ! CMK_SMP
2095   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2096     CmiAbort("+ppn cannot be used in non SMP version!\n");
2097 #endif
2098   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2099   if (idleblock && _Cmi_mynode == 0) {
2100     printf("Charm++: Running in idle blocking mode.\n");
2101   }
2102
2103   /* setup signal handlers */
2104   signal(SIGSEGV, KillOnAllSigs);
2105   signal(SIGFPE, KillOnAllSigs);
2106   signal(SIGILL, KillOnAllSigs);
2107   signal_int = signal(SIGINT, KillOnAllSigs);
2108   signal(SIGTERM, KillOnAllSigs);
2109   signal(SIGABRT, KillOnAllSigs);
2110 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2111   signal(SIGQUIT, KillOnAllSigs);
2112   signal(SIGBUS, KillOnAllSigs);
2113 /*#     if CMK_HANDLE_SIGUSR
2114   signal(SIGUSR1, HandleUserSignals);
2115   signal(SIGUSR2, HandleUserSignals);
2116 #     endif*/
2117 #   endif /*UNIX*/
2118   
2119 #if CMK_NO_OUTSTANDING_SENDS
2120   no_outstanding_sends=1;
2121 #endif
2122   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2123     no_outstanding_sends = 1;
2124     if (_Cmi_mynode == 0)
2125       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2126         no_outstanding_sends?"":" not");
2127   }
2128   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2129   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2130   Cmi_argvcopy = CmiCopyArgs(argv);
2131   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2132   /* find dim = log2(numpes), to pretend we are a hypercube */
2133   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2134     Cmi_dim++ ;
2135  /* CmiSpanTreeInit();*/
2136   request_max=MAX_QLEN;
2137   CmiGetArgInt(argv,"+requestmax",&request_max);
2138   /*printf("request max=%d\n", request_max);*/
2139
2140   /* checksum flag */
2141   if (CmiGetArgFlag(argv,"+checksum")) {
2142 #if !CMK_OPTIMIZE
2143     checksum_flag = 1;
2144     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2145 #else
2146     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2147 #endif
2148   }
2149
2150   {
2151   int debug = CmiGetArgFlag(argv,"++debug");
2152   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2153   if (debug || debug_no_pause)
2154   {   /*Pause so user has a chance to start and attach debugger*/
2155 #if CMK_HAS_GETPID
2156     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2157     fflush(stdout);
2158     if (!debug_no_pause)
2159       sleep(15);
2160 #else
2161     printf("++debug ignored.\n");
2162 #endif
2163   }
2164   }
2165
2166 #if MPI_POST_RECV_COUNT > 0
2167
2168   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2169   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2170   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2171   CpvInitialize(char*,CmiPostedRecvBuffers);
2172
2173     /* Post some extra recvs to help out with incoming messages */
2174     /* On some MPIs the messages are unexpected and thus slow */
2175
2176     /* An array of request handles for posted recvs */
2177     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2178
2179     /* An array of buffers for posted recvs */
2180     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2181
2182     /* Post Recvs */
2183     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2184         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2185                     MPI_POST_RECV_SIZE,
2186                     MPI_BYTE,
2187                     MPI_ANY_SOURCE,
2188                     POST_RECV_TAG,
2189                     MPI_COMM_WORLD,
2190                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2191           CmiAbort("MPI_Irecv failed\n");
2192     }
2193
2194 #endif
2195
2196
2197
2198   /* CmiTimerInit(); */
2199
2200 #if 0
2201   CthInit(argv);
2202   ConverseCommonInit(argv);
2203
2204   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2205   if (initret==0) {
2206     fn(CmiGetArgc(argv), argv);
2207     if (usched==0) CsdScheduler(-1);
2208     ConverseExit();
2209   }
2210 #endif
2211
2212   CsvInitialize(CmiNodeState, NodeState);
2213   CmiNodeStateInit(&CsvAccess(NodeState));
2214
2215   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2216
2217   for (i=0; i<_Cmi_mynodesize+1; i++) {
2218 #if MULTI_SENDQUEUE
2219     procState[i].sendMsgBuf = PCQueueCreate();
2220 #endif
2221     procState[i].recvLock = CmiCreateLock();
2222   }
2223 #if CMK_SMP
2224 #if !MULTI_SENDQUEUE
2225   sendMsgBuf = PCQueueCreate();
2226   sendMsgBufLock = CmiCreateLock();
2227 #endif
2228   exitLock = CmiCreateLock();            /* exit count lock */
2229 #endif
2230
2231   /* Network progress function is used to poll the network when for
2232      messages. This flushes receive buffers on some  implementations*/
2233   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2234   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2235
2236   CmiStartThreads(argv);
2237   ConverseRunPE(initret);
2238 }
2239
2240 /***********************************************************************
2241  *
2242  * Abort function:
2243  *
2244  ************************************************************************/
2245
2246 void CmiAbort(const char *message)
2247 {
2248   char *m;
2249   /* if CharmDebug is attached simply try to send a message to it */
2250 #if CMK_CCS_AVAILABLE
2251   if (CpvAccess(cmiArgDebugFlag)) {
2252     CpdNotify(CPD_ABORT, message);
2253     CpdFreeze();
2254   }
2255 #endif  
2256   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2257         "Reason: %s\n",CmiMyPe(),message);
2258  /*  CmiError(message); */
2259   CmiPrintStackTrace(0);
2260   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2261   CmiSetHandler(m, machine_exit_idx);
2262   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2263   machine_exit(m);
2264   /* Program never reaches here */
2265   MPI_Abort(MPI_COMM_WORLD, 1);
2266 }
2267
2268
2269 #if 0
2270
2271 /* ****************************************************************** */
2272 /*    The following internal functions implement recd msg queue       */
2273 /* ****************************************************************** */
2274
2275 static void ** AllocBlock(unsigned int len)
2276 {
2277   void ** blk;
2278
2279   blk=(void **)CmiAlloc(len*sizeof(void *));
2280   if(blk==(void **)0) {
2281     CmiError("Cannot Allocate Memory!\n");
2282     MPI_Abort(MPI_COMM_WORLD, 1);
2283   }
2284   return blk;
2285 }
2286
2287 static void
2288 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2289 {
2290   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2291   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2292 }
2293
2294 void recdQueueInit(void)
2295 {
2296   recdQueue_blk = AllocBlock(BLK_LEN);
2297   recdQueue_blk_len = BLK_LEN;
2298   recdQueue_first = 0;
2299   recdQueue_len = 0;
2300 }
2301
2302 void recdQueueAddToBack(void *element)
2303 {
2304 #if NODE_0_IS_CONVHOST
2305   inside_comm = 1;
2306 #endif
2307   if(recdQueue_len==recdQueue_blk_len) {
2308     void **blk;
2309     recdQueue_blk_len *= 3;
2310     blk = AllocBlock(recdQueue_blk_len);
2311     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2312     CmiFree(recdQueue_blk);
2313     recdQueue_blk = blk;
2314     recdQueue_first = 0;
2315   }
2316   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2317 #if NODE_0_IS_CONVHOST
2318   inside_comm = 0;
2319 #endif
2320 }
2321
2322
2323 void * recdQueueRemoveFromFront(void)
2324 {
2325   if(recdQueue_len) {
2326     void *element;
2327     element = recdQueue_blk[recdQueue_first++];
2328     recdQueue_first %= recdQueue_blk_len;
2329     recdQueue_len--;
2330     return element;
2331   }
2332   return 0;
2333 }
2334
2335 #endif
2336
2337 /*@}*/