indentation change
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
44 #define CMK_SMP 1
45 #endif
46
47 #define CMI_EXERT_SEND_CAP 0
48 #define CMI_EXERT_RECV_CAP 0
49
50 #if CMK_SMP
51 /* currently only considering the smp case */
52 #define CMI_DYNAMIC_EXERT_CAP 0
53 /* This macro defines the max number of msgs in the sender msg buffer 
54  * that is allowed for recving operation to continue
55  */
56 #define CMI_DYNAMIC_OUTGOING_THRESHOLD 8
57 #define CMI_DYNAMIC_MAXCAPSIZE 1000
58 #define CMI_DYNAMIC_SEND_CAPSIZE 6
59 #define CMI_DYNAMIC_RECV_CAPSIZE 6
60 /* initial values, -1 indiates there's no cap */
61 static int dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
62 static int dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
63 #endif
64
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 3
67 #endif
68
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
72
73
74 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
75 #define CMI_MPI_TRACE_MOREDETAILED 0
76 #undef CMI_MPI_TRACE_USEREVENTS
77 #define CMI_MPI_TRACE_USEREVENTS 1
78 #else
79 #undef CMK_SMP_TRACE_COMMTHREAD
80 #define CMK_SMP_TRACE_COMMTHREAD 0
81 #endif
82
83 #define CMK_TRACE_COMMOVERHEAD 0
84 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
85 #undef CMI_MPI_TRACE_USEREVENTS
86 #define CMI_MPI_TRACE_USEREVENTS 1
87 #else
88 #undef CMK_TRACE_COMMOVERHEAD
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #endif
91
92 #include "machine.h"
93
94 #include "pcqueue.h"
95
96 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
97
98 #if CMK_BLUEGENEL
99 #define MAX_QLEN 8
100 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
101 #else
102 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
103 #define MAX_QLEN 200
104 #endif
105
106 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
107 CpvStaticDeclare(double, projTraceStart);
108 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
109 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
110 #else
111 # define  START_EVENT()
112 # define  END_EVENT(x)
113 #endif
114
115 /*
116     To reduce the buffer used in broadcast and distribute the load from
117   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
118   spanning tree broadcast algorithm.
119     This will use the fourth short in message as an indicator of spanning tree
120   root.
121 */
122 #define CMK_BROADCAST_SPANNING_TREE    1
123 #define CMK_BROADCAST_HYPERCUBE        0
124
125 #define BROADCAST_SPANNING_FACTOR      4
126 /* The number of children used when a msg is broadcast inside a node */
127 #define BROADCAST_SPANNING_INTRA_FACTOR      8
128
129 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
130 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
131
132 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
133 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
134
135 /* FIXME: need a random number that everyone agrees ! */
136 #define CHARM_MAGIC_NUMBER               126
137
138 #if CMK_ERROR_CHECKING
139 static int checksum_flag = 0;
140 #define CMI_SET_CHECKSUM(msg, len)      \
141         if (checksum_flag)  {   \
142           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
143           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
144         }
145 #define CMI_CHECK_CHECKSUM(msg, len)    \
146         if (checksum_flag)      \
147           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
148             CmiAbort("Fatal error: checksum doesn't agree!\n");
149 #else
150 #define CMI_SET_CHECKSUM(msg, len)
151 #define CMI_CHECK_CHECKSUM(msg, len)
152 #endif
153
154 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
155 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
156 #else
157 #define CMI_SET_BROADCAST_ROOT(msg, root) 
158 #endif
159
160
161 /** 
162     If MPI_POST_RECV is defined, we provide default values for size 
163     and number of posted recieves. If MPI_POST_RECV_COUNT is set
164     then a default value for MPI_POST_RECV_SIZE is used if not specified
165     by the user.
166 */
167 #ifdef MPI_POST_RECV
168 #define MPI_POST_RECV_COUNT 10
169 #undef MPI_POST_RECV
170 #endif
171 #if MPI_POST_RECV_COUNT > 0
172 #warning "Using MPI posted receives which have not yet been tested"
173 #ifndef MPI_POST_RECV_SIZE
174 #define MPI_POST_RECV_SIZE 200
175 #endif
176 /* #undef  MPI_POST_RECV_DEBUG  */
177 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
178 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
179 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
180 CpvDeclare(char*,CmiPostedRecvBuffers);
181 #endif
182
183 /*
184  to avoid MPI's in order delivery, changing MPI Tag all the time
185 */
186 #define TAG     1375
187
188 #if MPI_POST_RECV_COUNT > 0
189 #define POST_RECV_TAG TAG+1
190 #define BARRIER_ZERO_TAG TAG
191 #else
192 #define BARRIER_ZERO_TAG     1375
193 #endif
194
195 #include <signal.h>
196 void (*signal_int)(int);
197
198 /*
199 static int mpi_tag = TAG;
200 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
201 */
202
203 static int        _thread_provided = -1;
204 int               _Cmi_numpes;
205 int               _Cmi_mynode;    /* Which address space am I */
206 int               _Cmi_mynodesize;/* Number of processors in my address space */
207 int               _Cmi_numnodes;  /* Total number of address spaces */
208 int               _Cmi_numpes;    /* Total number of processors */
209 static int        Cmi_nodestart; /* First processor in this address space */
210 CpvDeclare(void*, CmiLocalQueue);
211
212 /*Network progress utility variables. Period controls the rate at
213   which the network poll is called */
214 CpvDeclare(unsigned , networkProgressCount);
215 int networkProgressPeriod;
216
217 int               idleblock = 0;
218
219 #define BLK_LEN  512
220
221 #if CMK_NODE_QUEUE_AVAILABLE
222 #define DGRAM_NODEMESSAGE   (0xFB)
223
224 #define NODE_BROADCAST_OTHERS (-1)
225 #define NODE_BROADCAST_ALL    (-2)
226 #endif
227
228 #if 0
229 static void **recdQueue_blk;
230 static unsigned int recdQueue_blk_len;
231 static unsigned int recdQueue_first;
232 static unsigned int recdQueue_len;
233 static void recdQueueInit(void);
234 static void recdQueueAddToBack(void *element);
235 static void *recdQueueRemoveFromFront(void);
236 #endif
237
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 typedef struct msg_list {
243      char *msg;
244      struct msg_list *next;
245      int size, destpe;
246
247 #if CMK_SMP_TRACE_COMMTHREAD
248         int srcpe;
249 #endif  
250         
251      MPI_Request req;
252 } SMSG_LIST;
253
254 int MsgQueueLen=0;
255 static int request_max;
256
257 static SMSG_LIST *sent_msgs=0;
258 static SMSG_LIST *end_sent=0;
259
260 static int Cmi_dim;
261
262 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
263
264 #if NODE_0_IS_CONVHOST
265 int inside_comm = 0;
266 #endif
267
268 void CmiAbort(const char *message);
269 static void PerrorExit(const char *msg);
270
271 void SendSpanningChildren(int size, char *msg);
272 void SendHypercube(int size, char *msg);
273
274 static void PerrorExit(const char *msg)
275 {
276   perror(msg);
277   exit(1);
278 }
279
280 extern unsigned char computeCheckSum(unsigned char *data, int len);
281
282 /**************************  TIMER FUNCTIONS **************************/
283
284 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
285
286 /* MPI calls are not threadsafe, even the timer on some machines */
287 static CmiNodeLock  timerLock = 0;
288 static int _absoluteTime = 0;
289 static double starttimer = 0;
290 static int _is_global = 0;
291
292 int CmiTimerIsSynchronized()
293 {
294   int  flag;
295   void *v;
296
297   /*  check if it using synchronized timer */
298   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
299     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
300   if (flag) {
301     _is_global = *(int*)v;
302     if (_is_global && CmiMyPe() == 0)
303       printf("Charm++> MPI timer is synchronized\n");
304   }
305   return _is_global;
306 }
307
308 int CmiTimerAbsolute()
309 {       
310   return _absoluteTime;
311 }
312
313 double CmiStartTimer()
314 {
315   return 0.0;
316 }
317
318 double CmiInitTime()
319 {
320   return starttimer;
321 }
322
323 void CmiTimerInit(char **argv)
324 {
325   _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
326   if (_absoluteTime && CmiMyPe() == 0)
327       printf("Charm++> absolute MPI timer is used\n");
328
329   _is_global = CmiTimerIsSynchronized();
330
331   if (_is_global) {
332     if (CmiMyRank() == 0) {
333       double minTimer;
334 #if CMK_TIMER_USE_XT3_DCLOCK
335       starttimer = dclock();
336 #else
337       starttimer = MPI_Wtime();
338 #endif
339
340       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
341                                   MPI_COMM_WORLD );
342       starttimer = minTimer;
343     }
344   }
345   else {  /* we don't have a synchronous timer, set our own start time */
346     CmiBarrier();
347     CmiBarrier();
348     CmiBarrier();
349 #if CMK_TIMER_USE_XT3_DCLOCK
350     starttimer = dclock();
351 #else
352     starttimer = MPI_Wtime();
353 #endif
354   }
355
356 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
357   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
358     timerLock = CmiCreateLock();
359 #endif
360   CmiNodeAllBarrier();          /* for smp */
361 }
362
363 /**
364  * Since the timerLock is never created, and is
365  * always NULL, then all the if-condition inside
366  * the timer functions could be disabled right
367  * now in the case of SMP. --Chao Mei
368  */
369 double CmiTimer(void)
370 {
371   double t;
372 #if 0 && CMK_SMP
373   if (timerLock) CmiLock(timerLock);
374 #endif
375
376 #if CMK_TIMER_USE_XT3_DCLOCK
377   t = dclock();
378 #else
379   t = MPI_Wtime();
380 #endif
381
382 #if 0 && CMK_SMP
383   if (timerLock) CmiUnlock(timerLock);
384 #endif
385
386   return _absoluteTime?t: (t-starttimer);
387 }
388
389 double CmiWallTimer(void)
390 {
391   double t;
392 #if 0 && CMK_SMP
393   if (timerLock) CmiLock(timerLock);
394 #endif
395
396 #if CMK_TIMER_USE_XT3_DCLOCK
397   t = dclock();
398 #else
399   t = MPI_Wtime();
400 #endif
401
402 #if 0 && CMK_SMP
403   if (timerLock) CmiUnlock(timerLock);
404 #endif
405
406   return _absoluteTime? t: (t-starttimer);
407 }
408
409 double CmiCpuTimer(void)
410 {
411   double t;
412 #if 0 && CMK_SMP
413   if (timerLock) CmiLock(timerLock);
414 #endif
415 #if CMK_TIMER_USE_XT3_DCLOCK
416   t = dclock() - starttimer;
417 #else
418   t = MPI_Wtime() - starttimer;
419 #endif
420 #if 0 && CMK_SMP
421   if (timerLock) CmiUnlock(timerLock);
422 #endif
423   return t;
424 }
425
426 #endif
427
428 /* must be called on all ranks including comm thread in SMP */
429 int CmiBarrier()
430 {
431 #if CMK_SMP
432     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
433   CmiNodeAllBarrier();
434   if (CmiMyRank() == CmiMyNodeSize()) 
435 #else
436   if (CmiMyRank() == 0) 
437 #endif
438   {
439 /**
440  *  The call of CmiBarrier is usually before the initialization
441  *  of trace module of Charm++, therefore, the START_EVENT
442  *  and END_EVENT are disabled here. -Chao Mei
443  */     
444     /*START_EVENT();*/
445
446     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
447         CmiAbort("Timernit: MPI_Barrier failed!\n");
448
449     /*END_EVENT(10);*/
450   }
451   CmiNodeAllBarrier();
452   return 0;
453 }
454
455 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
456 int CmiBarrierZero()
457 {
458   int i;
459 #if CMK_SMP
460   if (CmiMyRank() == CmiMyNodeSize()) 
461 #else
462   if (CmiMyRank() == 0) 
463 #endif
464   {
465     char msg[1];
466     MPI_Status sts;
467     if (CmiMyNode() == 0)  {
468       for (i=0; i<CmiNumNodes()-1; i++) {
469          START_EVENT();
470
471          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
472             CmiPrintf("MPI_Recv failed!\n");
473
474          END_EVENT(30);
475       }
476     }
477     else {
478       START_EVENT();
479
480       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
481          printf("MPI_Send failed!\n");
482
483       END_EVENT(20);
484     }
485   }
486   CmiNodeAllBarrier();
487   return 0;
488 }
489
490 typedef struct ProcState {
491 #if MULTI_SENDQUEUE
492 PCQueue      sendMsgBuf;       /* per processor message sending queue */
493 #endif
494 CmiNodeLock  recvLock;              /* for cs->recv */
495 } ProcState;
496
497 static ProcState  *procState;
498
499 #if CMK_SMP
500
501 #if !MULTI_SENDQUEUE
502 static PCQueue sendMsgBuf;
503 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
504 #endif
505
506 #endif
507
508 /************************************************************
509  *
510  * Processor state structure
511  *
512  ************************************************************/
513
514 /* fake Cmi_charmrun_fd */
515 static int Cmi_charmrun_fd = 0;
516 #include "machine-smp.c"
517
518 CsvDeclare(CmiNodeState, NodeState);
519
520 #include "immediate.c"
521
522 #if CMK_SHARED_VARS_UNAVAILABLE
523 /************ non SMP **************/
524 static struct CmiStateStruct Cmi_state;
525 int _Cmi_mype;
526 int _Cmi_myrank;
527
528 void CmiMemLock() {}
529 void CmiMemUnlock() {}
530
531 #define CmiGetState() (&Cmi_state)
532 #define CmiGetStateN(n) (&Cmi_state)
533
534 void CmiYield(void) { sleep(0); }
535
536 static void CmiStartThreads(char **argv)
537 {
538   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
539   _Cmi_mype = Cmi_nodestart;
540   _Cmi_myrank = 0;
541 }
542 #endif  /* non smp */
543
544 /*Add a message to this processor's receive queue, pe is a rank */
545 void CmiPushPE(int pe,void *msg)
546 {
547   CmiState cs = CmiGetStateN(pe);
548   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
549 #if CMK_IMMEDIATE_MSG
550   if (CmiIsImmediate(msg)) {
551 /*
552 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
553     CmiHandleMessage(msg);
554 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
555 */
556     /**(CmiUInt2 *)msg = pe;*/
557     CMI_DEST_RANK(msg) = pe;
558     CmiPushImmediateMsg(msg);
559     return;
560   }
561 #endif
562
563 #if CMK_SMP
564   CmiLock(procState[pe].recvLock);
565 #endif
566   PCQueuePush(cs->recv,msg);
567 #if CMK_SMP
568   CmiUnlock(procState[pe].recvLock);
569 #endif
570   CmiIdleLock_addMessage(&cs->idle);
571   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
572 }
573
574 #if CMK_NODE_QUEUE_AVAILABLE
575 /*Add a message to this processor's receive queue */
576 static void CmiPushNode(void *msg)
577 {
578   MACHSTATE(3,"Pushing message into NodeRecv queue");
579 #if CMK_IMMEDIATE_MSG
580   if (CmiIsImmediate(msg)) {
581     CMI_DEST_RANK(msg) = 0;
582     CmiPushImmediateMsg(msg);
583     return;
584   }
585 #endif
586   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
587   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
588   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
589   {
590   CmiState cs=CmiGetStateN(0);
591   CmiIdleLock_addMessage(&cs->idle);
592   }
593 }
594 #endif
595
596 #ifndef CmiMyPe
597 int CmiMyPe(void)
598 {
599   return CmiGetState()->pe;
600 }
601 #endif
602
603 #ifndef CmiMyRank
604 int CmiMyRank(void)
605 {
606   return CmiGetState()->rank;
607 }
608 #endif
609
610 #ifndef CmiNodeFirst
611 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
612 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
613 #endif
614
615 #ifndef CmiNodeOf
616 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
617 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
618 #endif
619
620 static size_t CmiAllAsyncMsgsSent(void)
621 {
622    SMSG_LIST *msg_tmp = sent_msgs;
623    MPI_Status sts;
624    int done;
625
626    while(msg_tmp!=0) {
627     done = 0;
628     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
629       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
630     if(!done)
631       return 0;
632     msg_tmp = msg_tmp->next;
633 /*    MsgQueueLen--; ????? */
634    }
635    return 1;
636 }
637
638 int CmiAsyncMsgSent(CmiCommHandle c) {
639
640   SMSG_LIST *msg_tmp = sent_msgs;
641   int done;
642   MPI_Status sts;
643
644   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
645     msg_tmp = msg_tmp->next;
646   if(msg_tmp) {
647     done = 0;
648     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
649       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
650     return ((done)?1:0);
651   } else {
652     return 1;
653   }
654 }
655
656 void CmiReleaseCommHandle(CmiCommHandle c)
657 {
658   return;
659 }
660
661 #if CMK_BLUEGENEL
662 extern void MPID_Progress_test();
663 #endif
664
665 void CmiReleaseSentMessages(void)
666 {
667   SMSG_LIST *msg_tmp=sent_msgs;
668   SMSG_LIST *prev=0;
669   SMSG_LIST *temp;
670   int done;
671   MPI_Status sts;
672
673 #if CMK_BLUEGENEL
674   MPID_Progress_test();
675 #endif
676
677   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
678   while(msg_tmp!=0) {
679     done =0;
680 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
681     double startT = CmiWallTimer();
682 #endif
683     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
684       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
685     if(done) {
686       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
687       MsgQueueLen--;
688       /* Release the message */
689       temp = msg_tmp->next;
690       if(prev==0)  /* first message */
691         sent_msgs = temp;
692       else
693         prev->next = temp;
694       CmiFree(msg_tmp->msg);
695       CmiFree(msg_tmp);
696       msg_tmp = temp;
697     } else {
698       prev = msg_tmp;
699       msg_tmp = msg_tmp->next;
700     }
701 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
702     {
703     double endT = CmiWallTimer();
704     /* only record the event if it takes more than 1ms */
705     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
706     }
707 #endif
708   }
709   end_sent = prev;
710   MACHSTATE(2,"} CmiReleaseSentMessages end");
711 }
712
713 int PumpMsgs(void)
714 {
715   int nbytes, flg, res;
716   char *msg;
717   MPI_Status sts;
718   int recd=0;
719
720 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
721   int recvCnt=0;
722 #endif
723         
724 #if CMK_BLUEGENEL
725   MPID_Progress_test();
726 #endif
727
728   MACHSTATE(2,"PumpMsgs begin {");
729
730 #if CMI_DYNAMIC_EXERT_CAP
731   dynamicRecvCap = CMI_DYNAMIC_MAXCAPSIZE;
732 #endif
733         
734   while(1) {
735 #if CMI_EXERT_RECV_CAP
736         if(recvCnt==RECV_CAP) break;
737 #elif CMI_DYNAMIC_EXERT_CAP
738         if(recvCnt >= dynamicRecvCap) break;
739 #endif
740           
741     /* First check posted recvs then do  probe unmatched outstanding messages */
742 #if MPI_POST_RECV_COUNT > 0 
743     int completed_index=-1;
744     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
745         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
746     if(flg){
747         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
748             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
749
750         recd = 1;
751         msg = (char *) CmiAlloc(nbytes);
752         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
753         /* and repost the recv */
754
755         START_EVENT();
756
757         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
758             MPI_POST_RECV_SIZE,
759             MPI_BYTE,
760             MPI_ANY_SOURCE,
761             POST_RECV_TAG,
762             MPI_COMM_WORLD,
763             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
764                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
765
766         END_EVENT(50);
767
768         CpvAccess(Cmi_posted_recv_total)++;
769     }
770     else {
771         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
772         if(res != MPI_SUCCESS)
773         CmiAbort("MPI_Iprobe failed\n");
774         if(!flg) break;
775         recd = 1;
776         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
777         msg = (char *) CmiAlloc(nbytes);
778
779         START_EVENT();
780
781         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
782             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
783
784         END_EVENT(30);
785
786         CpvAccess(Cmi_unposted_recv_total)++;
787     }
788 #else
789     /* Original version */
790 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
791   double startT = CmiWallTimer(); 
792 #endif
793     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
794     if(res != MPI_SUCCESS)
795       CmiAbort("MPI_Iprobe failed\n");
796
797     if(!flg) break;
798 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
799     {
800     double endT = CmiWallTimer();
801     /* only trace the probe that last longer than 1ms */
802     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
803     }
804 #endif
805
806     recd = 1;
807     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
808     msg = (char *) CmiAlloc(nbytes);
809     
810     START_EVENT();
811
812     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
813       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
814
815     /*END_EVENT(30);*/
816
817 #endif
818
819 #if CMK_SMP_TRACE_COMMTHREAD
820         traceBeginCommOp(msg);
821         traceChangeLastTimestamp(CpvAccess(projTraceStart));
822         traceEndCommOp(msg);
823         #if CMI_MPI_TRACE_MOREDETAILED
824         char tmp[32];
825         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
826         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
827         #endif
828 #elif CMK_TRACE_COMMOVERHEAD
829         char tmp[32];
830         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
831         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
832 #endif
833         
834         
835     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
836     CMI_CHECK_CHECKSUM(msg, nbytes);
837 #if CMK_ERROR_CHECKING
838     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
839       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
840       CmiFree(msg);
841       CmiAbort("Abort!\n");
842       continue;
843     }
844 #endif
845         
846 #if CMK_BROADCAST_SPANNING_TREE
847     if (CMI_BROADCAST_ROOT(msg))
848       SendSpanningChildren(nbytes, msg);
849 #elif CMK_BROADCAST_HYPERCUBE
850     if (CMI_BROADCAST_ROOT(msg))
851       SendHypercube(nbytes, msg);
852 #endif
853         
854         /* In SMP mode, this push operation needs to be executed
855      * after forwarding broadcast messages. If it is executed
856      * earlier, then during the bcast msg forwarding period,    
857          * the msg could be already freed on the worker thread.
858          * As a result, the forwarded message could be wrong! 
859          * --Chao Mei
860          */
861 #if CMK_NODE_QUEUE_AVAILABLE
862     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
863       CmiPushNode(msg);
864     else
865 #endif
866         CmiPushPE(CMI_DEST_RANK(msg), msg);     
867         
868 #if CMI_EXERT_RECV_CAP || CMI_DYNAMIC_EXERT_CAP
869         recvCnt++;
870         /* check sendMsgBuf  to get the  number of messages that have not been sent */
871         /* MsgQueueLen indicates the number of messages that have not been released by MPI */
872         if(PCQueueLength(sendMsgBuf) > CMI_DYNAMIC_OUTGOING_THRESHOLD
873                 || MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD){
874                 dynamicRecvCap = CMI_DYNAMIC_RECV_CAPSIZE;
875         }
876 #endif  
877         
878   }
879
880   
881 #if CMK_IMMEDIATE_MSG && !CMK_SMP
882   CmiHandleImmediate();
883 #endif
884   
885   MACHSTATE(2,"} PumpMsgs end ");
886   return recd;
887 }
888
889 /* blocking version */
890 static void PumpMsgsBlocking(void)
891 {
892   static int maxbytes = 20000000;
893   static char *buf = NULL;
894   int nbytes, flg;
895   MPI_Status sts;
896   char *msg;
897   int recd=0;
898
899   if (!PCQueueEmpty(CmiGetState()->recv)) return;
900   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
901   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
902   if (sent_msgs)  return;
903
904 #if 0
905   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
906 #endif
907
908   if (buf == NULL) {
909     buf = (char *) CmiAlloc(maxbytes);
910     _MEMCHECK(buf);
911   }
912
913
914 #if MPI_POST_RECV_COUNT > 0
915 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
916 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
917 #endif
918
919   START_EVENT();
920
921   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
922       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
923
924   /*END_EVENT(30);*/
925     
926    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
927    msg = (char *) CmiAlloc(nbytes);
928    memcpy(msg, buf, nbytes);
929
930 #if CMK_SMP_TRACE_COMMTHREAD
931         traceBeginCommOp(msg);
932         traceChangeLastTimestamp(CpvAccess(projTraceStart));
933         traceEndCommOp(msg);
934         #if CMI_MPI_TRACE_MOREDETAILED
935         char tmp[32];
936         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
937         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
938         #endif
939 #endif
940
941 #if CMK_BROADCAST_SPANNING_TREE
942    if (CMI_BROADCAST_ROOT(msg))
943       SendSpanningChildren(nbytes, msg);
944 #elif CMK_BROADCAST_HYPERCUBE
945    if (CMI_BROADCAST_ROOT(msg))
946       SendHypercube(nbytes, msg);
947 #endif
948   
949         /* In SMP mode, this push operation needs to be executed
950      * after forwarding broadcast messages. If it is executed
951      * earlier, then during the bcast msg forwarding period,    
952          * the msg could be already freed on the worker thread.
953          * As a result, the forwarded message could be wrong! 
954          * --Chao Mei
955          */  
956 #if CMK_NODE_QUEUE_AVAILABLE
957    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
958       CmiPushNode(msg);
959    else
960 #endif
961       CmiPushPE(CMI_DEST_RANK(msg), msg);
962 }
963
964 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
965
966 #if CMK_SMP
967
968 static int inexit = 0;
969 static CmiNodeLock  exitLock = 0;
970
971 static int MsgQueueEmpty()
972 {
973   int i;
974 #if MULTI_SENDQUEUE
975   for (i=0; i<_Cmi_mynodesize; i++)
976     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
977 #else
978   return PCQueueEmpty(sendMsgBuf);
979 #endif
980   return 1;
981 }
982
983 static int SendMsgBuf();
984
985 /* test if all processors recv queues are empty */
986 static int RecvQueueEmpty()
987 {
988   int i;
989   for (i=0; i<_Cmi_mynodesize; i++) {
990     CmiState cs=CmiGetStateN(i);
991     if (!PCQueueEmpty(cs->recv)) return 0;
992   }
993   return 1;
994 }
995
996 /**
997 CommunicationServer calls MPI to send messages in the queues and probe message from network.
998 */
999
1000 #define REPORT_COMM_METRICS 0
1001 #if REPORT_COMM_METRICS
1002 static double pumptime = 0.0;
1003 static double releasetime = 0.0;
1004 static double sendtime = 0.0;
1005 #endif
1006
1007 static void CommunicationServer(int sleepTime)
1008 {
1009   int static count=0;
1010 /*
1011   count ++;
1012   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
1013 */
1014 #if REPORT_COMM_METRICS
1015   double t1, t2, t3, t4;
1016   t1 = CmiWallTimer();
1017 #endif
1018   PumpMsgs();
1019 #if REPORT_COMM_METRICS
1020   t2 = CmiWallTimer();
1021 #endif
1022   CmiReleaseSentMessages();
1023 #if REPORT_COMM_METRICS
1024   t3 = CmiWallTimer();
1025 #endif
1026   SendMsgBuf();
1027 #if REPORT_COMM_METRICS
1028   t4 = CmiWallTimer();
1029   pumptime += (t2-t1);
1030   releasetime += (t3-t2);
1031   sendtime += (t4-t3);
1032 #endif
1033 /*
1034   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
1035 */
1036   if (inexit == CmiMyNodeSize()) {
1037     MACHSTATE(2, "CommunicationServer exiting {");
1038 #if 0
1039     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
1040 #endif
1041     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
1042       CmiReleaseSentMessages();
1043       SendMsgBuf();
1044       PumpMsgs();
1045     }
1046     MACHSTATE(2, "CommunicationServer barrier begin {");
1047
1048     START_EVENT();
1049
1050     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1051       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1052
1053     END_EVENT(10);
1054
1055     MACHSTATE(2, "} CommunicationServer barrier end");
1056 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1057     if (CmiMyNode() == 0){
1058       CmiPrintf("End of program\n");
1059     }
1060 #endif
1061     MACHSTATE(2, "} CommunicationServer EXIT");
1062
1063     ConverseCommonExit();   
1064 #if REPORT_COMM_METRICS
1065     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1066 #endif
1067
1068 #if ! CMK_AUTOBUILD
1069     signal(SIGINT, signal_int);
1070     MPI_Finalize();
1071     #endif
1072     exit(0);
1073   }
1074 }
1075
1076 #endif
1077
1078 static void CommunicationServerThread(int sleepTime)
1079 {
1080 #if CMK_SMP
1081   CommunicationServer(sleepTime);
1082 #endif
1083 #if CMK_IMMEDIATE_MSG
1084   CmiHandleImmediate();
1085 #endif
1086 }
1087
1088 #if CMK_NODE_QUEUE_AVAILABLE
1089 char *CmiGetNonLocalNodeQ(void)
1090 {
1091   CmiState cs = CmiGetState();
1092   char *result = 0;
1093   CmiIdleLock_checkMessage(&cs->idle);
1094 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1095     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1096     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1097     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1098     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1099     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1100 /*  }  */
1101
1102   return result;
1103 }
1104 #endif
1105
1106 void *CmiGetNonLocal(void)
1107 {
1108   static int count=0;
1109   CmiState cs = CmiGetState();
1110   void *msg;
1111
1112 #if ! CMK_SMP
1113   if (CmiNumPes() == 1) return NULL;
1114 #endif
1115
1116   CmiIdleLock_checkMessage(&cs->idle);
1117   /* although it seems that lock is not needed, I found it crashes very often
1118      on mpi-smp without lock */
1119
1120 #if ! CMK_SMP
1121   CmiReleaseSentMessages();
1122   PumpMsgs();
1123 #endif
1124
1125   /* CmiLock(procState[cs->rank].recvLock); */
1126   msg =  PCQueuePop(cs->recv);
1127   /* CmiUnlock(procState[cs->rank].recvLock); */
1128
1129 /*
1130   if (msg) {
1131     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1132   else {
1133     count++;
1134     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1135   }
1136 */
1137 #if ! CMK_SMP
1138   if (no_outstanding_sends) {
1139     while (MsgQueueLen>0) {
1140       CmiReleaseSentMessages();
1141       PumpMsgs();
1142     }
1143   }
1144
1145   if(!msg) {
1146     CmiReleaseSentMessages();
1147     if (PumpMsgs())
1148       return  PCQueuePop(cs->recv);
1149     else
1150       return 0;
1151   }
1152 #endif
1153
1154   return msg;
1155 }
1156
1157 /* called in non-smp mode */
1158 void CmiNotifyIdle(void)
1159 {
1160   CmiReleaseSentMessages();
1161   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1162 }
1163
1164
1165 /********************************************************
1166     The call to probe immediate messages has been renamed to
1167     CmiMachineProgressImpl
1168 ******************************************************/
1169 /* user call to handle immediate message, only useful in non SMP version
1170    using polling method to schedule message.
1171 */
1172 /*
1173 #if CMK_IMMEDIATE_MSG
1174 void CmiProbeImmediateMsg()
1175 {
1176 #if !CMK_SMP
1177   PumpMsgs();
1178   CmiHandleImmediate();
1179 #endif
1180 }
1181 #endif
1182 */
1183
1184 /* Network progress function is used to poll the network when for
1185    messages. This flushes receive buffers on some  implementations*/
1186 #if CMK_MACHINE_PROGRESS_DEFINED
1187 void CmiMachineProgressImpl()
1188 {
1189 #if !CMK_SMP
1190     PumpMsgs();
1191 #if CMK_IMMEDIATE_MSG
1192     CmiHandleImmediate();
1193 #endif
1194 #else
1195     /*Not implemented yet. Communication server does not seem to be
1196       thread safe, so only communication thread call it */
1197     if (CmiMyRank() == CmiMyNodeSize())
1198         CommunicationServerThread(0);
1199 #endif
1200 }
1201 #endif
1202
1203 /********************* MESSAGE SEND FUNCTIONS ******************/
1204
1205 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1206
1207 static void CmiSendSelf(char *msg)
1208 {
1209 #if CMK_IMMEDIATE_MSG
1210     if (CmiIsImmediate(msg)) {
1211       /* CmiBecomeNonImmediate(msg); */
1212       CmiPushImmediateMsg(msg);
1213       CmiHandleImmediate();
1214       return;
1215     }
1216 #endif
1217     CQdCreate(CpvAccess(cQdState), 1);
1218     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1219 }
1220
1221 void CmiSyncSendFn(int destPE, int size, char *msg)
1222 {
1223   CmiState cs = CmiGetState();
1224   char *dupmsg = (char *) CmiAlloc(size);
1225   memcpy(dupmsg, msg, size);
1226
1227   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1228
1229   if (cs->pe==destPE) {
1230     CmiSendSelf(dupmsg);
1231   }
1232   else
1233     CmiAsyncSendFn_(destPE, size, dupmsg);
1234 }
1235
1236 #if CMK_SMP
1237
1238 /* called by communication thread in SMP */
1239 static int SendMsgBuf()
1240 {
1241   SMSG_LIST *msg_tmp;
1242   char *msg;
1243   int node, rank, size;
1244   int i;
1245   int sent = 0;
1246
1247 #if CMI_EXERT_SEND_CAP || CMI_DYNAMIC_EXERT_CAP
1248         int sentCnt = 0;
1249 #endif  
1250         
1251 #if CMI_DYNAMIC_EXERT_CAP
1252         dynamicSendCap = CMI_DYNAMIC_MAXCAPSIZE;
1253 #endif
1254         
1255   MACHSTATE(2,"SendMsgBuf begin {");
1256 #if MULTI_SENDQUEUE
1257   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1258   {
1259     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1260     {
1261       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1262 #else
1263     /* single message sending queue */
1264     /* CmiLock(sendMsgBufLock); */
1265     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1266     /* CmiUnlock(sendMsgBufLock); */
1267     while (NULL != msg_tmp)
1268     {
1269 #endif
1270       node = msg_tmp->destpe;
1271       size = msg_tmp->size;
1272       msg = msg_tmp->msg;
1273       msg_tmp->next = 0;
1274                 
1275 #if !CMI_DYNAMIC_EXERT_CAP && !CMI_EXERT_SEND_CAP
1276       while (MsgQueueLen > request_max) {
1277                 CmiReleaseSentMessages();
1278                 PumpMsgs();
1279       }
1280 #endif
1281           
1282       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1283 #if CMK_ERROR_CHECKING
1284       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1285 #endif
1286       CMI_SET_CHECKSUM(msg, size);
1287
1288 #if MPI_POST_RECV_COUNT > 0
1289         if(size <= MPI_POST_RECV_SIZE){
1290
1291           START_EVENT();
1292           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1293                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1294
1295           STOP_EVENT(40);
1296         }
1297         else {
1298             START_EVENT();
1299             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1300                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1301             STOP_EVENT(40);
1302         }
1303 #else
1304         START_EVENT();
1305         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1306             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1307         /*END_EVENT(40);*/
1308 #endif
1309         
1310 #if CMK_SMP_TRACE_COMMTHREAD
1311         traceBeginCommOp(msg);
1312         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1313         /* traceSendMsgComm must execute after traceBeginCommOp because
1314          * we pretend we execute an entry method, and inside this we
1315          * pretend we will send another message. Otherwise how could
1316          * a message creation just before an entry method invocation?
1317          * If such logic is broken, the projections will not trace
1318          * messages correctly! -Chao Mei
1319          */
1320         traceSendMsgComm(msg);
1321         traceEndCommOp(msg);
1322         #if CMI_MPI_TRACE_MOREDETAILED
1323         char tmp[64];
1324         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1325         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1326         #endif
1327 #endif
1328                 
1329                 
1330       MACHSTATE(3,"}MPI_send end");
1331       MsgQueueLen++;
1332       if(sent_msgs==0)
1333         sent_msgs = msg_tmp;
1334       else
1335         end_sent->next = msg_tmp;
1336       end_sent = msg_tmp;
1337       sent=1;
1338           
1339 #if CMI_EXERT_SEND_CAP    
1340           if(++sentCnt == SEND_CAP) break;
1341 #elif CMI_DYNAMIC_EXERT_CAP
1342           if(++sentCnt >= dynamicSendCap) break;
1343           if(MsgQueueLen > CMI_DYNAMIC_OUTGOING_THRESHOLD)
1344                   dynamicSendCap = CMI_DYNAMIC_SEND_CAPSIZE;
1345 #endif    
1346           
1347 #if ! MULTI_SENDQUEUE
1348       /* CmiLock(sendMsgBufLock); */
1349       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1350       /* CmiUnlock(sendMsgBufLock); */
1351 #endif
1352     }
1353 #if MULTI_SENDQUEUE
1354   }
1355 #endif
1356   MACHSTATE(2,"}SendMsgBuf end ");
1357   return sent;
1358 }
1359
1360 void EnqueueMsg(void *m, int size, int node)
1361 {
1362   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1363   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1364   msg_tmp->msg = m;
1365   msg_tmp->size = size;
1366   msg_tmp->destpe = node;
1367         
1368 #if CMK_SMP_TRACE_COMMTHREAD
1369         msg_tmp->srcpe = CmiMyPe();
1370 #endif  
1371
1372 #if MULTI_SENDQUEUE
1373   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1374 #else
1375   CmiLock(sendMsgBufLock);
1376   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1377   CmiUnlock(sendMsgBufLock);
1378 #endif
1379         
1380   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1381 }
1382
1383 #endif
1384
1385 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1386 {
1387   CmiState cs = CmiGetState();
1388   SMSG_LIST *msg_tmp;
1389   CmiUInt2  rank, node;
1390
1391   if(destPE == cs->pe) {
1392     char *dupmsg = (char *) CmiAlloc(size);
1393     memcpy(dupmsg, msg, size);
1394     CmiSendSelf(dupmsg);
1395     return 0;
1396   }
1397   CQdCreate(CpvAccess(cQdState), 1);
1398 #if CMK_SMP
1399   node = CmiNodeOf(destPE);
1400   rank = CmiRankOf(destPE);
1401   if (node == CmiMyNode())  {
1402     CmiPushPE(rank, msg);
1403     return 0;
1404   }
1405   CMI_DEST_RANK(msg) = rank;
1406   EnqueueMsg(msg, size, node);
1407   return 0;
1408 #else
1409   /* non smp */
1410   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1411   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1412   msg_tmp->msg = msg;
1413   msg_tmp->next = 0;
1414   while (MsgQueueLen > request_max) {
1415         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1416         CmiReleaseSentMessages();
1417         PumpMsgs();
1418   }
1419 #if CMK_ERROR_CHECKING
1420   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1421 #endif
1422   CMI_SET_CHECKSUM(msg, size);
1423
1424 #if MPI_POST_RECV_COUNT > 0
1425         if(size <= MPI_POST_RECV_SIZE){
1426
1427           START_EVENT();
1428           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1429                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1430           END_EVENT(40);
1431         }
1432         else {
1433           START_EVENT();
1434           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1435                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1436           END_EVENT(40);
1437         }
1438 #else
1439   START_EVENT();
1440   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1441     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1442   /*END_EVENT(40);*/
1443   #if CMK_TRACE_COMMOVERHEAD
1444         char tmp[64];
1445         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1446         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1447   #endif
1448 #endif
1449
1450   MsgQueueLen++;
1451   if(sent_msgs==0)
1452     sent_msgs = msg_tmp;
1453   else
1454     end_sent->next = msg_tmp;
1455   end_sent = msg_tmp;
1456   return (CmiCommHandle) &(msg_tmp->req);
1457 #endif              /* non-smp */
1458 }
1459
1460 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1461 {
1462   CMI_SET_BROADCAST_ROOT(msg, 0);
1463   CmiAsyncSendFn_(destPE, size, msg);
1464 }
1465
1466 void CmiFreeSendFn(int destPE, int size, char *msg)
1467 {
1468   CmiState cs = CmiGetState();
1469   CMI_SET_BROADCAST_ROOT(msg, 0);
1470
1471   if (cs->pe==destPE) {
1472     CmiSendSelf(msg);
1473   } else {
1474     CmiAsyncSendFn_(destPE, size, msg);
1475   }
1476 }
1477
1478 /*********************** BROADCAST FUNCTIONS **********************/
1479
1480 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1481 void CmiSyncSendFn1(int destPE, int size, char *msg)
1482 {
1483   CmiState cs = CmiGetState();
1484   char *dupmsg = (char *) CmiAlloc(size);
1485   memcpy(dupmsg, msg, size);
1486   if (cs->pe==destPE)
1487     CmiSendSelf(dupmsg);
1488   else
1489     CmiAsyncSendFn_(destPE, size, dupmsg);
1490 }
1491
1492 /* send msg to its spanning children in broadcast. G. Zheng */
1493 void SendSpanningChildren(int size, char *msg)
1494 {
1495   CmiState cs = CmiGetState();
1496   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1497   int startnode = CmiNodeOf(startpe);
1498   int i, exceptRank;
1499         
1500    /* first send msgs to other nodes */
1501   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1502   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1503     int nd = CmiMyNode()-startnode;
1504     if (nd<0) nd+=CmiNumNodes();
1505     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1506     if (nd > CmiNumNodes() - 1) break;
1507     nd += startnode;
1508     nd = nd%CmiNumNodes();
1509     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1510 #if CMK_SMP
1511       /* always send to the first rank of other nodes */
1512     char *newmsg = CmiCopyMsg(msg, size);
1513     CMI_DEST_RANK(newmsg) = 0;
1514     EnqueueMsg(newmsg, size, nd);
1515 #else
1516     CmiSyncSendFn1(nd, size, msg);
1517 #endif
1518   }
1519 #if CMK_SMP  
1520    /* second send msgs to my peers on this node */
1521   /* FIXME: now it's just a flat p2p send!! When node size is large,
1522    * it should also be sent in a tree
1523    */
1524    exceptRank = CMI_DEST_RANK(msg);
1525    for(i=0; i<exceptRank; i++){
1526            CmiPushPE(i, CmiCopyMsg(msg, size));
1527    }
1528    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1529            CmiPushPE(i, CmiCopyMsg(msg, size));
1530    }
1531 #endif
1532 }
1533
1534 #include <math.h>
1535
1536 /* send msg along the hypercube in broadcast. (Sameer) */
1537 void SendHypercube(int size, char *msg)
1538 {
1539   CmiState cs = CmiGetState();
1540   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1541   int startnode = CmiNodeOf(startpe);
1542   int i, exceptRank, cnt, tmp, relPE;
1543   int dims=0;
1544
1545   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1546   tmp = CmiNumNodes()-1;
1547   while(tmp>0){
1548           dims++;
1549           tmp = tmp >> 1;
1550   }
1551   if(CmiNumNodes()==1) dims=1;
1552   
1553    /* first send msgs to other nodes */  
1554   relPE = CmiMyNode()-startnode;
1555   if(relPE < 0) relPE += CmiNumNodes();
1556   cnt=0;
1557   tmp = relPE;
1558   /* count how many zeros (in binary format) relPE has */
1559   for(i=0; i<dims; i++, cnt++){
1560     if(tmp & 1 == 1) break;
1561     tmp = tmp >> 1;
1562   }
1563   
1564   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1565   for (i = cnt-1; i >= 0; i--) {
1566     int nd = relPE + (1 << i);
1567         if(nd >= CmiNumNodes()) continue;
1568         nd = (nd+startnode)%CmiNumNodes();
1569         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1570 #if CMK_SMP
1571     /* always send to the first rank of other nodes */
1572     char *newmsg = CmiCopyMsg(msg, size);
1573     CMI_DEST_RANK(newmsg) = 0;
1574     EnqueueMsg(newmsg, size, nd);
1575 #else
1576         CmiSyncSendFn1(nd, size, msg);
1577 #endif
1578   }
1579   
1580 #if CMK_SMP
1581    /* second send msgs to my peers on this node */
1582    /* FIXME: now it's just a flat p2p send!! When node size is large,
1583     * it should also be sent in a tree
1584     */
1585    exceptRank = CMI_DEST_RANK(msg);
1586    for(i=0; i<exceptRank; i++){
1587            CmiPushPE(i, CmiCopyMsg(msg, size));
1588    }
1589    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1590            CmiPushPE(i, CmiCopyMsg(msg, size));
1591    }
1592 #endif
1593 }
1594
1595 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1596 {
1597   CmiState cs = CmiGetState();
1598
1599 #if CMK_SMP     
1600   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1601   CMI_DEST_RANK(msg) = CmiMyRank();
1602 #endif
1603         
1604 #if CMK_BROADCAST_SPANNING_TREE
1605   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1606   SendSpanningChildren(size, msg);
1607
1608 #elif CMK_BROADCAST_HYPERCUBE
1609   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1610   SendHypercube(size, msg);
1611
1612 #else
1613   int i;
1614
1615   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1616     CmiSyncSendFn(i, size,msg) ;
1617   for ( i=0; i<cs->pe; i++ )
1618     CmiSyncSendFn(i, size,msg) ;
1619 #endif
1620
1621   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1622 }
1623
1624
1625 /*  FIXME: luckily async is never used  G. Zheng */
1626 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1627 {
1628   CmiState cs = CmiGetState();
1629   int i ;
1630
1631   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1632     CmiAsyncSendFn(i,size,msg) ;
1633   for ( i=0; i<cs->pe; i++ )
1634     CmiAsyncSendFn(i,size,msg) ;
1635
1636   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1637   CmiAbort("CmiAsyncBroadcastFn should never be called");
1638   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1639 }
1640
1641 void CmiFreeBroadcastFn(int size, char *msg)
1642 {
1643    CmiSyncBroadcastFn(size,msg);
1644    CmiFree(msg);
1645 }
1646
1647 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1648 {
1649
1650 #if CMK_SMP     
1651   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1652   CMI_DEST_RANK(msg) = CmiMyRank();
1653 #endif
1654
1655 #if CMK_BROADCAST_SPANNING_TREE
1656   CmiState cs = CmiGetState();
1657   CmiSyncSendFn(cs->pe, size,msg) ;
1658   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1659   SendSpanningChildren(size, msg);
1660
1661 #elif CMK_BROADCAST_HYPERCUBE
1662   CmiState cs = CmiGetState();
1663   CmiSyncSendFn(cs->pe, size,msg) ;
1664   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1665   SendHypercube(size, msg);
1666
1667 #else
1668     int i ;
1669
1670   for ( i=0; i<_Cmi_numpes; i++ )
1671     CmiSyncSendFn(i,size,msg) ;
1672 #endif
1673
1674   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1675 }
1676
1677 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1678 {
1679   int i ;
1680
1681   for ( i=1; i<_Cmi_numpes; i++ )
1682     CmiAsyncSendFn(i,size,msg) ;
1683
1684   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1685
1686   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1687 }
1688
1689 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1690 {
1691 #if CMK_SMP     
1692   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1693   CMI_DEST_RANK(msg) = CmiMyRank();
1694 #endif
1695
1696 #if CMK_BROADCAST_SPANNING_TREE
1697   CmiState cs = CmiGetState();
1698   CmiSyncSendFn(cs->pe, size,msg) ;
1699   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1700   SendSpanningChildren(size, msg);
1701
1702 #elif CMK_BROADCAST_HYPERCUBE
1703   CmiState cs = CmiGetState();
1704   CmiSyncSendFn(cs->pe, size,msg) ;
1705   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1706   SendHypercube(size, msg);
1707
1708 #else
1709   int i ;
1710
1711   for ( i=0; i<_Cmi_numpes; i++ )
1712     CmiSyncSendFn(i,size,msg) ;
1713 #endif
1714   CmiFree(msg) ;
1715   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1716 }
1717
1718 #if CMK_NODE_QUEUE_AVAILABLE
1719
1720 static void CmiSendNodeSelf(char *msg)
1721 {
1722 #if CMK_IMMEDIATE_MSG
1723 #if 0
1724     if (CmiIsImmediate(msg) && !_immRunning) {
1725       /*CmiHandleImmediateMessage(msg); */
1726       CmiPushImmediateMsg(msg);
1727       CmiHandleImmediate();
1728       return;
1729     }
1730 #endif
1731     if (CmiIsImmediate(msg))
1732     {
1733       CmiPushImmediateMsg(msg);
1734       if (!_immRunning) CmiHandleImmediate();
1735       return;
1736     }
1737 #endif
1738     CQdCreate(CpvAccess(cQdState), 1);
1739     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1740     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1741     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1742 }
1743
1744 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1745 {
1746   int i;
1747   SMSG_LIST *msg_tmp;
1748   char *dupmsg;
1749
1750   CMI_SET_BROADCAST_ROOT(msg, 0);
1751   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1752   switch (dstNode) {
1753   case NODE_BROADCAST_ALL:
1754     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1755   case NODE_BROADCAST_OTHERS:
1756     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1757     for (i=0; i<_Cmi_numnodes; i++)
1758       if (i!=_Cmi_mynode) {
1759         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1760       }
1761     break;
1762   default:
1763     dupmsg = (char *)CmiCopyMsg(msg,size);
1764     if(dstNode == _Cmi_mynode) {
1765       CmiSendNodeSelf(dupmsg);
1766     }
1767     else {
1768       CQdCreate(CpvAccess(cQdState), 1);
1769       EnqueueMsg(dupmsg, size, dstNode);
1770     }
1771   }
1772   return 0;
1773 }
1774
1775 void CmiSyncNodeSendFn(int p, int s, char *m)
1776 {
1777   CmiAsyncNodeSendFn(p, s, m);
1778 }
1779
1780 /* need */
1781 void CmiFreeNodeSendFn(int p, int s, char *m)
1782 {
1783   CmiAsyncNodeSendFn(p, s, m);
1784   CmiFree(m);
1785 }
1786
1787 /* need */
1788 void CmiSyncNodeBroadcastFn(int s, char *m)
1789 {
1790   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1791 }
1792
1793 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1794 {
1795 }
1796
1797 /* need */
1798 void CmiFreeNodeBroadcastFn(int s, char *m)
1799 {
1800   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1801   CmiFree(m);
1802 }
1803
1804 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1805 {
1806   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1807 }
1808
1809 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1810 {
1811   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1812 }
1813
1814 /* need */
1815 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1816 {
1817   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1818   CmiFree(m);
1819 }
1820 #endif
1821
1822 /************************** MAIN ***********************************/
1823 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1824
1825 void ConverseExit(void)
1826 {
1827 #if ! CMK_SMP
1828   while(!CmiAllAsyncMsgsSent()) {
1829     PumpMsgs();
1830     CmiReleaseSentMessages();
1831   }
1832   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1833     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1834
1835   ConverseCommonExit();
1836 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1837   if (CmiMyPe() == 0){
1838     CmiPrintf("End of program\n");
1839 #if MPI_POST_RECV_COUNT > 0
1840     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1841 #endif
1842 }
1843 #endif
1844 #if ! CMK_AUTOBUILD
1845   signal(SIGINT, signal_int);
1846   MPI_Finalize();
1847 #endif
1848   exit(0);
1849
1850 #else
1851     /* SMP version, communication thread will exit */
1852   ConverseCommonExit();
1853   /* atomic increment */
1854   CmiLock(exitLock);
1855   inexit++;
1856   CmiUnlock(exitLock);
1857   while (1) CmiYield();
1858 #endif
1859 }
1860
1861 static void registerMPITraceEvents() {
1862 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1863     traceRegisterUserEvent("MPI_Barrier", 10);
1864     traceRegisterUserEvent("MPI_Send", 20);
1865     traceRegisterUserEvent("MPI_Recv", 30);
1866     traceRegisterUserEvent("MPI_Isend", 40);
1867     traceRegisterUserEvent("MPI_Irecv", 50);
1868     traceRegisterUserEvent("MPI_Test", 60);
1869     traceRegisterUserEvent("MPI_Iprobe", 70);
1870 #endif
1871 }
1872
1873
1874 static char     **Cmi_argv;
1875 static char     **Cmi_argvcopy;
1876 static CmiStartFn Cmi_startfn;   /* The start function */
1877 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1878
1879 typedef struct {
1880   int sleepMs; /*Milliseconds to sleep while idle*/
1881   int nIdles; /*Number of times we've been idle in a row*/
1882   CmiState cs; /*Machine state*/
1883 } CmiIdleState;
1884
1885 static CmiIdleState *CmiNotifyGetState(void)
1886 {
1887   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1888   s->sleepMs=0;
1889   s->nIdles=0;
1890   s->cs=CmiGetState();
1891   return s;
1892 }
1893
1894 static void CmiNotifyBeginIdle(CmiIdleState *s)
1895 {
1896   s->sleepMs=0;
1897   s->nIdles=0;
1898 }
1899
1900 static void CmiNotifyStillIdle(CmiIdleState *s)
1901 {
1902 #if ! CMK_SMP
1903   CmiReleaseSentMessages();
1904   PumpMsgs();
1905 #else
1906 /*  CmiYield();  */
1907 #endif
1908
1909 #if 1
1910   {
1911   int nSpins=20; /*Number of times to spin before sleeping*/
1912   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1913   s->nIdles++;
1914   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1915     s->sleepMs+=2;
1916     if (s->sleepMs>10) s->sleepMs=10;
1917   }
1918   /*Comm. thread will listen on sockets-- just sleep*/
1919   if (s->sleepMs>0) {
1920     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1921     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1922     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1923   }
1924   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1925   }
1926 #endif
1927 }
1928
1929 #if MACHINE_DEBUG_LOG
1930 FILE *debugLog = NULL;
1931 #endif
1932
1933 static int machine_exit_idx;
1934 static void machine_exit(char *m) {
1935   EmergencyExit();
1936   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1937   fflush(stdout);
1938   CmiNodeBarrier();
1939   if (CmiMyRank() == 0) {
1940     MPI_Barrier(MPI_COMM_WORLD);
1941     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1942     MPI_Abort(MPI_COMM_WORLD, 1);
1943   } else {
1944     while (1) CmiYield();
1945   }
1946 }
1947
1948 static void KillOnAllSigs(int sigNo) {
1949   static int already_in_signal_handler = 0;
1950   char *m;
1951   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1952   already_in_signal_handler = 1;
1953 #if CMK_CCS_AVAILABLE
1954   if (CpvAccess(cmiArgDebugFlag)) {
1955     CpdNotify(CPD_SIGNAL, sigNo);
1956     CpdFreeze();
1957   }
1958 #endif
1959   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1960       "Signal: %d\n",CmiMyPe(),sigNo);
1961   CmiPrintStackTrace(1);
1962
1963   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1964   CmiSetHandler(m, machine_exit_idx);
1965   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1966   machine_exit(m);
1967 }
1968
1969 static void ConverseRunPE(int everReturn)
1970 {
1971   CmiIdleState *s=CmiNotifyGetState();
1972   CmiState cs;
1973   char** CmiMyArgv;
1974
1975   CmiNodeAllBarrier();
1976
1977   cs = CmiGetState();
1978   CpvInitialize(void *,CmiLocalQueue);
1979   CpvAccess(CmiLocalQueue) = cs->localqueue;
1980
1981   if (CmiMyRank())
1982     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1983   else
1984     CmiMyArgv=Cmi_argv;
1985
1986   CthInit(CmiMyArgv);
1987
1988   ConverseCommonInit(CmiMyArgv);
1989   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1990
1991 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1992   CpvInitialize(double, projTraceStart);
1993   /* only PE 0 needs to care about registration (to generate sts file). */
1994   if (CmiMyPe() == 0) {
1995     registerMachineUserEventsFunction(&registerMPITraceEvents);
1996   }
1997 #endif
1998
1999   /* initialize the network progress counter*/
2000   /* Network progress function is used to poll the network when for
2001      messages. This flushes receive buffers on some  implementations*/
2002   CpvInitialize(int , networkProgressCount);
2003   CpvAccess(networkProgressCount) = 0;
2004
2005 #if CMK_SMP
2006   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
2007   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
2008 #else
2009   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
2010 #endif
2011
2012 #if MACHINE_DEBUG_LOG
2013   if (CmiMyRank() == 0) {
2014     char ln[200];
2015     sprintf(ln,"debugLog.%d",CmiMyNode());
2016     debugLog=fopen(ln,"w");
2017   }
2018 #endif
2019
2020   /* Converse initialization finishes, immediate messages can be processed.
2021      node barrier previously should take care of the node synchronization */
2022   _immediateReady = 1;
2023
2024   /* communication thread */
2025   if (CmiMyRank() == CmiMyNodeSize()) {
2026     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2027     while (1) CommunicationServerThread(5);
2028   }
2029   else {  /* worker thread */
2030   if (!everReturn) {
2031     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
2032     if (Cmi_usrsched==0) CsdScheduler(-1);
2033     ConverseExit();
2034   }
2035   }
2036 }
2037
2038 static char *thread_level_tostring(int thread_level)
2039 {
2040 #if CMK_MPI_INIT_THREAD
2041   switch (thread_level) {
2042   case MPI_THREAD_SINGLE:
2043       return "MPI_THREAD_SINGLE";
2044   case MPI_THREAD_FUNNELED:
2045       return "MPI_THREAD_FUNNELED";
2046   case MPI_THREAD_SERIALIZED:
2047       return "MPI_THREAD_SERIALIZED";
2048   case MPI_THREAD_MULTIPLE :
2049       return "MPI_THREAD_MULTIPLE ";
2050   default: {
2051       char *str = (char*)malloc(5);
2052       sprintf(str,"%d", thread_level);
2053       return str;
2054       }
2055   }
2056   return  "unknown";
2057 #else
2058   char *str = (char*)malloc(5);
2059   sprintf(str,"%d", thread_level);
2060   return str;
2061 #endif
2062 }
2063
2064 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
2065 {
2066   int n,i;
2067   int ver, subver;
2068   int provided;
2069   int thread_level;
2070
2071 #if MACHINE_DEBUG
2072   debugLog=NULL;
2073 #endif
2074 #if CMK_USE_HP_MAIN_FIX
2075 #if FOR_CPLUS
2076   _main(argc,argv);
2077 #endif
2078 #endif
2079
2080 #if CMK_MPI_INIT_THREAD
2081 #if CMK_SMP
2082   thread_level = MPI_THREAD_FUNNELED;
2083 #else
2084   thread_level = MPI_THREAD_SINGLE;
2085 #endif
2086   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2087   _thread_provided = provided;
2088 #else
2089   MPI_Init(&argc, &argv);
2090   thread_level = 0;
2091   provided = -1;
2092 #endif
2093   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2094   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2095
2096   MPI_Get_version(&ver, &subver);
2097   if (_Cmi_mynode == 0) {
2098     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2099   }
2100
2101   /* processor per node */
2102   _Cmi_mynodesize = 1;
2103   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2104     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2105 #if ! CMK_SMP
2106   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2107     CmiAbort("+ppn cannot be used in non SMP version!\n");
2108 #endif
2109   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2110   if (idleblock && _Cmi_mynode == 0) {
2111     printf("Charm++: Running in idle blocking mode.\n");
2112   }
2113
2114   /* setup signal handlers */
2115   signal(SIGSEGV, KillOnAllSigs);
2116   signal(SIGFPE, KillOnAllSigs);
2117   signal(SIGILL, KillOnAllSigs);
2118   signal_int = signal(SIGINT, KillOnAllSigs);
2119   signal(SIGTERM, KillOnAllSigs);
2120   signal(SIGABRT, KillOnAllSigs);
2121 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2122   signal(SIGQUIT, KillOnAllSigs);
2123   signal(SIGBUS, KillOnAllSigs);
2124 /*#     if CMK_HANDLE_SIGUSR
2125   signal(SIGUSR1, HandleUserSignals);
2126   signal(SIGUSR2, HandleUserSignals);
2127 #     endif*/
2128 #   endif /*UNIX*/
2129   
2130 #if CMK_NO_OUTSTANDING_SENDS
2131   no_outstanding_sends=1;
2132 #endif
2133   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2134     no_outstanding_sends = 1;
2135     if (_Cmi_mynode == 0)
2136       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2137         no_outstanding_sends?"":" not");
2138   }
2139   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2140   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2141   Cmi_argvcopy = CmiCopyArgs(argv);
2142   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2143   /* find dim = log2(numpes), to pretend we are a hypercube */
2144   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2145     Cmi_dim++ ;
2146  /* CmiSpanTreeInit();*/
2147   request_max=MAX_QLEN;
2148   CmiGetArgInt(argv,"+requestmax",&request_max);
2149   /*printf("request max=%d\n", request_max);*/
2150
2151   /* checksum flag */
2152   if (CmiGetArgFlag(argv,"+checksum")) {
2153 #if !CMK_OPTIMIZE
2154     checksum_flag = 1;
2155     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2156 #else
2157     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2158 #endif
2159   }
2160
2161   {
2162   int debug = CmiGetArgFlag(argv,"++debug");
2163   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2164   if (debug || debug_no_pause)
2165   {   /*Pause so user has a chance to start and attach debugger*/
2166 #if CMK_HAS_GETPID
2167     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2168     fflush(stdout);
2169     if (!debug_no_pause)
2170       sleep(15);
2171 #else
2172     printf("++debug ignored.\n");
2173 #endif
2174   }
2175   }
2176
2177 #if MPI_POST_RECV_COUNT > 0
2178
2179   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2180   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2181   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2182   CpvInitialize(char*,CmiPostedRecvBuffers);
2183
2184     /* Post some extra recvs to help out with incoming messages */
2185     /* On some MPIs the messages are unexpected and thus slow */
2186
2187     /* An array of request handles for posted recvs */
2188     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2189
2190     /* An array of buffers for posted recvs */
2191     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2192
2193     /* Post Recvs */
2194     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2195         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2196                     MPI_POST_RECV_SIZE,
2197                     MPI_BYTE,
2198                     MPI_ANY_SOURCE,
2199                     POST_RECV_TAG,
2200                     MPI_COMM_WORLD,
2201                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2202           CmiAbort("MPI_Irecv failed\n");
2203     }
2204
2205 #endif
2206
2207
2208
2209   /* CmiTimerInit(); */
2210
2211 #if 0
2212   CthInit(argv);
2213   ConverseCommonInit(argv);
2214
2215   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2216   if (initret==0) {
2217     fn(CmiGetArgc(argv), argv);
2218     if (usched==0) CsdScheduler(-1);
2219     ConverseExit();
2220   }
2221 #endif
2222
2223   CsvInitialize(CmiNodeState, NodeState);
2224   CmiNodeStateInit(&CsvAccess(NodeState));
2225
2226   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2227
2228   for (i=0; i<_Cmi_mynodesize+1; i++) {
2229 #if MULTI_SENDQUEUE
2230     procState[i].sendMsgBuf = PCQueueCreate();
2231 #endif
2232     procState[i].recvLock = CmiCreateLock();
2233   }
2234 #if CMK_SMP
2235 #if !MULTI_SENDQUEUE
2236   sendMsgBuf = PCQueueCreate();
2237   sendMsgBufLock = CmiCreateLock();
2238 #endif
2239   exitLock = CmiCreateLock();            /* exit count lock */
2240 #endif
2241
2242   /* Network progress function is used to poll the network when for
2243      messages. This flushes receive buffers on some  implementations*/
2244   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2245   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2246
2247   CmiStartThreads(argv);
2248   ConverseRunPE(initret);
2249 }
2250
2251 /***********************************************************************
2252  *
2253  * Abort function:
2254  *
2255  ************************************************************************/
2256
2257 void CmiAbort(const char *message)
2258 {
2259   char *m;
2260   /* if CharmDebug is attached simply try to send a message to it */
2261 #if CMK_CCS_AVAILABLE
2262   if (CpvAccess(cmiArgDebugFlag)) {
2263     CpdNotify(CPD_ABORT, message);
2264     CpdFreeze();
2265   }
2266 #endif  
2267   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2268         "Reason: %s\n",CmiMyPe(),message);
2269  /*  CmiError(message); */
2270   CmiPrintStackTrace(0);
2271   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2272   CmiSetHandler(m, machine_exit_idx);
2273   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2274   machine_exit(m);
2275   /* Program never reaches here */
2276   MPI_Abort(MPI_COMM_WORLD, 1);
2277 }
2278
2279
2280 #if 0
2281
2282 /* ****************************************************************** */
2283 /*    The following internal functions implement recd msg queue       */
2284 /* ****************************************************************** */
2285
2286 static void ** AllocBlock(unsigned int len)
2287 {
2288   void ** blk;
2289
2290   blk=(void **)CmiAlloc(len*sizeof(void *));
2291   if(blk==(void **)0) {
2292     CmiError("Cannot Allocate Memory!\n");
2293     MPI_Abort(MPI_COMM_WORLD, 1);
2294   }
2295   return blk;
2296 }
2297
2298 static void
2299 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2300 {
2301   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2302   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2303 }
2304
2305 void recdQueueInit(void)
2306 {
2307   recdQueue_blk = AllocBlock(BLK_LEN);
2308   recdQueue_blk_len = BLK_LEN;
2309   recdQueue_first = 0;
2310   recdQueue_len = 0;
2311 }
2312
2313 void recdQueueAddToBack(void *element)
2314 {
2315 #if NODE_0_IS_CONVHOST
2316   inside_comm = 1;
2317 #endif
2318   if(recdQueue_len==recdQueue_blk_len) {
2319     void **blk;
2320     recdQueue_blk_len *= 3;
2321     blk = AllocBlock(recdQueue_blk_len);
2322     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2323     CmiFree(recdQueue_blk);
2324     recdQueue_blk = blk;
2325     recdQueue_first = 0;
2326   }
2327   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2328 #if NODE_0_IS_CONVHOST
2329   inside_comm = 0;
2330 #endif
2331 }
2332
2333
2334 void * recdQueueRemoveFromFront(void)
2335 {
2336   if(recdQueue_len) {
2337     void *element;
2338     element = recdQueue_blk[recdQueue_first++];
2339     recdQueue_first %= recdQueue_blk_len;
2340     recdQueue_len--;
2341     return element;
2342   }
2343   return 0;
2344 }
2345
2346 #endif
2347
2348 /*@}*/