Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22
23 #ifdef AMPI
24 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
25 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
26 #  error "Can't build Charm++ using AMPI version of mpi.h header"
27 #endif
28
29 /*Support for ++debug: */
30 #if defined(_WIN32) && ! defined(__CYGWIN__)
31 #include <windows.h>
32 #include <wincon.h>
33 #include <sys/types.h>
34 #include <sys/timeb.h>
35 static void sleep(int secs) {Sleep(1000*secs);}
36 #else
37 #include <unistd.h> /*For getpid()*/
38 #endif
39 #include <stdlib.h> /*For sleep()*/
40
41 #define MULTI_SENDQUEUE    0
42
43 #define CMI_EXERT_SEND_CAP 0
44 #define CMI_EXERT_RECV_CAP 0
45
46 #if CMI_EXERT_SEND_CAP
47 #define SEND_CAP 3
48 #endif
49
50 #if CMI_EXERT_RECV_CAP
51 #define RECV_CAP 2
52 #endif
53
54 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
55 #define CMK_SMP 1
56 #endif
57
58
59 #if CMK_SMP_TRACE_COMMTHREAD
60 #define CMI_MPI_TRACE_MOREDETAILED 0
61 #undef CMI_MPI_TRACE_USEREVENTS
62 #define CMI_MPI_TRACE_USEREVENTS 1
63 #endif
64
65 #define CMK_TRACE_COMMOVERHEAD 0
66 #if CMK_TRACE_COMMOVERHEAD
67 #undef CMI_MPI_TRACE_USEREVENTS
68 #define CMI_MPI_TRACE_USEREVENTS 1
69 #endif
70
71 #include "machine.h"
72
73 #include "pcqueue.h"
74
75 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
76
77 #if CMK_BLUEGENEL
78 #define MAX_QLEN 8
79 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
80 #else
81 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
82 #define MAX_QLEN 200
83 #endif
84
85 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
86 CpvStaticDeclare(double, projTraceStart);
87 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
88 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
89 #else
90 # define  START_EVENT()
91 # define  END_EVENT(x)
92 #endif
93
94 /*
95     To reduce the buffer used in broadcast and distribute the load from
96   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
97   spanning tree broadcast algorithm.
98     This will use the fourth short in message as an indicator of spanning tree
99   root.
100 */
101 #define CMK_BROADCAST_SPANNING_TREE    1
102 #define CMK_BROADCAST_HYPERCUBE        0
103
104 #define BROADCAST_SPANNING_FACTOR      4
105 /* The number of children used when a msg is broadcast inside a node */
106 #define BROADCAST_SPANNING_INTRA_FACTOR      8
107
108 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
109 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
110
111 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
112 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
113
114 /* FIXME: need a random number that everyone agrees ! */
115 #define CHARM_MAGIC_NUMBER               126
116
117 #if CMK_ERROR_CHECKING
118 static int checksum_flag = 0;
119 #define CMI_SET_CHECKSUM(msg, len)      \
120         if (checksum_flag)  {   \
121           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
122           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
123         }
124 #define CMI_CHECK_CHECKSUM(msg, len)    \
125         if (checksum_flag)      \
126           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
127             CmiAbort("Fatal error: checksum doesn't agree!\n");
128 #else
129 #define CMI_SET_CHECKSUM(msg, len)
130 #define CMI_CHECK_CHECKSUM(msg, len)
131 #endif
132
133 #if CMK_BROADCAST_SPANNING_TREE || CMK_BROADCAST_HYPERCUBE
134 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
135 #else
136 #define CMI_SET_BROADCAST_ROOT(msg, root) 
137 #endif
138
139
140 /** 
141     If MPI_POST_RECV is defined, we provide default values for size 
142     and number of posted recieves. If MPI_POST_RECV_COUNT is set
143     then a default value for MPI_POST_RECV_SIZE is used if not specified
144     by the user.
145 */
146 #ifdef MPI_POST_RECV
147 #define MPI_POST_RECV_COUNT 10
148 #undef MPI_POST_RECV
149 #endif
150 #if MPI_POST_RECV_COUNT > 0
151 #warning "Using MPI posted receives which have not yet been tested"
152 #ifndef MPI_POST_RECV_SIZE
153 #define MPI_POST_RECV_SIZE 200
154 #endif
155 /* #undef  MPI_POST_RECV_DEBUG  */
156 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
157 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
158 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
159 CpvDeclare(char*,CmiPostedRecvBuffers);
160 #endif
161
162 /*
163  to avoid MPI's in order delivery, changing MPI Tag all the time
164 */
165 #define TAG     1375
166
167 #if MPI_POST_RECV_COUNT > 0
168 #define POST_RECV_TAG TAG+1
169 #define BARRIER_ZERO_TAG TAG
170 #else
171 #define BARRIER_ZERO_TAG     1375
172 #endif
173
174 #include <signal.h>
175 void (*signal_int)(int);
176
177 /*
178 static int mpi_tag = TAG;
179 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
180 */
181
182 static int        _thread_provided = -1;
183 int               _Cmi_numpes;
184 int               _Cmi_mynode;    /* Which address space am I */
185 int               _Cmi_mynodesize;/* Number of processors in my address space */
186 int               _Cmi_numnodes;  /* Total number of address spaces */
187 int               _Cmi_numpes;    /* Total number of processors */
188 static int        Cmi_nodestart; /* First processor in this address space */
189 CpvDeclare(void*, CmiLocalQueue);
190
191 /*Network progress utility variables. Period controls the rate at
192   which the network poll is called */
193 CpvDeclare(unsigned , networkProgressCount);
194 int networkProgressPeriod;
195
196 int               idleblock = 0;
197
198 #define BLK_LEN  512
199
200 #if CMK_NODE_QUEUE_AVAILABLE
201 #define DGRAM_NODEMESSAGE   (0xFB)
202
203 #define NODE_BROADCAST_OTHERS (-1)
204 #define NODE_BROADCAST_ALL    (-2)
205 #endif
206
207 #if 0
208 static void **recdQueue_blk;
209 static unsigned int recdQueue_blk_len;
210 static unsigned int recdQueue_first;
211 static unsigned int recdQueue_len;
212 static void recdQueueInit(void);
213 static void recdQueueAddToBack(void *element);
214 static void *recdQueueRemoveFromFront(void);
215 #endif
216
217 static void ConverseRunPE(int everReturn);
218 static void CommunicationServer(int sleepTime);
219 static void CommunicationServerThread(int sleepTime);
220
221 typedef struct msg_list {
222      char *msg;
223      struct msg_list *next;
224      int size, destpe;
225
226 #if CMK_SMP_TRACE_COMMTHREAD
227         int srcpe;
228 #endif  
229         
230      MPI_Request req;
231 } SMSG_LIST;
232
233 int MsgQueueLen=0;
234 static int request_max;
235
236 static SMSG_LIST *sent_msgs=0;
237 static SMSG_LIST *end_sent=0;
238
239 static int Cmi_dim;
240
241 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
242
243 #if NODE_0_IS_CONVHOST
244 int inside_comm = 0;
245 #endif
246
247 void CmiAbort(const char *message);
248 static void PerrorExit(const char *msg);
249
250 void SendSpanningChildren(int size, char *msg);
251 void SendHypercube(int size, char *msg);
252
253 static void PerrorExit(const char *msg)
254 {
255   perror(msg);
256   exit(1);
257 }
258
259 extern unsigned char computeCheckSum(unsigned char *data, int len);
260
261 /**************************  TIMER FUNCTIONS **************************/
262
263 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
264
265 /* MPI calls are not threadsafe, even the timer on some machines */
266 static CmiNodeLock  timerLock = 0;
267 static double starttimer = 0;
268 static int _is_global = 0;
269
270 int CmiTimerIsSynchronized()
271 {
272   int  flag;
273   void *v;
274
275   /*  check if it using synchronized timer */
276   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
277     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
278   if (flag) {
279     _is_global = *(int*)v;
280     if (_is_global && CmiMyPe() == 0)
281       printf("Charm++> MPI timer is synchronized!\n");
282   }
283   return _is_global;
284 }
285
286 void CmiTimerInit()
287 {
288   _is_global = CmiTimerIsSynchronized();
289
290   if (_is_global) {
291     if (CmiMyRank() == 0) {
292       double minTimer;
293 #if CMK_TIMER_USE_XT3_DCLOCK
294       starttimer = dclock();
295 #else
296       starttimer = MPI_Wtime();
297 #endif
298
299       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
300                                   MPI_COMM_WORLD );
301       starttimer = minTimer;
302     }
303   }
304   else {  /* we don't have a synchronous timer, set our own start time */
305     CmiBarrier();
306     CmiBarrier();
307     CmiBarrier();
308 #if CMK_TIMER_USE_XT3_DCLOCK
309     starttimer = dclock();
310 #else
311     starttimer = MPI_Wtime();
312 #endif
313   }
314
315 #if 0 && CMK_SMP && CMK_MPI_INIT_THREAD
316   if (CmiMyRank()==0 && _thread_provided == MPI_THREAD_SINGLE)
317     timerLock = CmiCreateLock();
318 #endif
319   CmiNodeAllBarrier();          /* for smp */
320 }
321
322 /**
323  * Since the timerLock is never created, and is
324  * always NULL, then all the if-condition inside
325  * the timer functions could be disabled right
326  * now in the case of SMP. --Chao Mei
327  */
328 double CmiTimer(void)
329 {
330   double t;
331 #if 0 && CMK_SMP
332   if (timerLock) CmiLock(timerLock);
333 #endif
334 #if CMK_TIMER_USE_XT3_DCLOCK
335   t = dclock() - starttimer;
336 #else
337   t = MPI_Wtime() - starttimer;
338 #endif
339
340 #if 0 && CMK_SMP
341   if (timerLock) CmiUnlock(timerLock);
342 #endif
343
344   return t;
345 }
346
347 double CmiWallTimer(void)
348 {
349   double t;
350 #if 0 && CMK_SMP
351   if (timerLock) CmiLock(timerLock);
352 #endif
353 #if CMK_TIMER_USE_XT3_DCLOCK
354   t = dclock() - starttimer;
355 #else
356   t = MPI_Wtime() - starttimer;
357 #endif
358 #if 0 && CMK_SMP
359   if (timerLock) CmiUnlock(timerLock);
360 #endif
361   return t;
362 }
363
364 double CmiCpuTimer(void)
365 {
366   double t;
367 #if 0 && CMK_SMP
368   if (timerLock) CmiLock(timerLock);
369 #endif
370 #if CMK_TIMER_USE_XT3_DCLOCK
371   t = dclock() - starttimer;
372 #else
373   t = MPI_Wtime() - starttimer;
374 #endif
375 #if 0 && CMK_SMP
376   if (timerLock) CmiUnlock(timerLock);
377 #endif
378   return t;
379 }
380
381 #endif
382
383 /* must be called on all ranks including comm thread in SMP */
384 int CmiBarrier()
385 {
386 #if CMK_SMP
387     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */
388   CmiNodeAllBarrier();
389   if (CmiMyRank() == CmiMyNodeSize()) 
390 #else
391   if (CmiMyRank() == 0) 
392 #endif
393   {
394 /**
395  *  The call of CmiBarrier is usually before the initialization
396  *  of trace module of Charm++, therefore, the START_EVENT
397  *  and END_EVENT are disabled here. -Chao Mei
398  */     
399     /*START_EVENT();*/
400
401     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
402         CmiAbort("Timernit: MPI_Barrier failed!\n");
403
404     /*END_EVENT(10);*/
405   }
406   CmiNodeAllBarrier();
407   return 0;
408 }
409
410 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
411 int CmiBarrierZero()
412 {
413   int i;
414 #if CMK_SMP
415   if (CmiMyRank() == CmiMyNodeSize()) 
416 #else
417   if (CmiMyRank() == 0) 
418 #endif
419   {
420     char msg[1];
421     MPI_Status sts;
422     if (CmiMyNode() == 0)  {
423       for (i=0; i<CmiNumNodes()-1; i++) {
424          START_EVENT();
425
426          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
427             CmiPrintf("MPI_Recv failed!\n");
428
429          END_EVENT(30);
430       }
431     }
432     else {
433       START_EVENT();
434
435       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
436          printf("MPI_Send failed!\n");
437
438       END_EVENT(20);
439     }
440   }
441   CmiNodeAllBarrier();
442   return 0;
443 }
444
445 typedef struct ProcState {
446 #if MULTI_SENDQUEUE
447 PCQueue      sendMsgBuf;       /* per processor message sending queue */
448 #endif
449 CmiNodeLock  recvLock;              /* for cs->recv */
450 } ProcState;
451
452 static ProcState  *procState;
453
454 #if CMK_SMP
455
456 #if !MULTI_SENDQUEUE
457 static PCQueue sendMsgBuf;
458 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
459 #endif
460
461 #endif
462
463 /************************************************************
464  *
465  * Processor state structure
466  *
467  ************************************************************/
468
469 /* fake Cmi_charmrun_fd */
470 static int Cmi_charmrun_fd = 0;
471 #include "machine-smp.c"
472
473 CsvDeclare(CmiNodeState, NodeState);
474
475 #include "immediate.c"
476
477 #if CMK_SHARED_VARS_UNAVAILABLE
478 /************ non SMP **************/
479 static struct CmiStateStruct Cmi_state;
480 int _Cmi_mype;
481 int _Cmi_myrank;
482
483 void CmiMemLock() {}
484 void CmiMemUnlock() {}
485
486 #define CmiGetState() (&Cmi_state)
487 #define CmiGetStateN(n) (&Cmi_state)
488
489 void CmiYield(void) { sleep(0); }
490
491 static void CmiStartThreads(char **argv)
492 {
493   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
494   _Cmi_mype = Cmi_nodestart;
495   _Cmi_myrank = 0;
496 }
497 #endif  /* non smp */
498
499 /*Add a message to this processor's receive queue, pe is a rank */
500 void CmiPushPE(int pe,void *msg)
501 {
502   CmiState cs = CmiGetStateN(pe);
503   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
504 #if CMK_IMMEDIATE_MSG
505   if (CmiIsImmediate(msg)) {
506 /*
507 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
508     CmiHandleMessage(msg);
509 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
510 */
511     /**(CmiUInt2 *)msg = pe;*/
512     CMI_DEST_RANK(msg) = pe;
513     CmiPushImmediateMsg(msg);
514     return;
515   }
516 #endif
517
518 #if CMK_SMP
519   CmiLock(procState[pe].recvLock);
520 #endif
521   PCQueuePush(cs->recv,msg);
522 #if CMK_SMP
523   CmiUnlock(procState[pe].recvLock);
524 #endif
525   CmiIdleLock_addMessage(&cs->idle);
526   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
527 }
528
529 #if CMK_NODE_QUEUE_AVAILABLE
530 /*Add a message to this processor's receive queue */
531 static void CmiPushNode(void *msg)
532 {
533   MACHSTATE(3,"Pushing message into NodeRecv queue");
534 #if CMK_IMMEDIATE_MSG
535   if (CmiIsImmediate(msg)) {
536     CMI_DEST_RANK(msg) = 0;
537     CmiPushImmediateMsg(msg);
538     return;
539   }
540 #endif
541   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
542   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
543   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
544   {
545   CmiState cs=CmiGetStateN(0);
546   CmiIdleLock_addMessage(&cs->idle);
547   }
548 }
549 #endif
550
551 #ifndef CmiMyPe
552 int CmiMyPe(void)
553 {
554   return CmiGetState()->pe;
555 }
556 #endif
557
558 #ifndef CmiMyRank
559 int CmiMyRank(void)
560 {
561   return CmiGetState()->rank;
562 }
563 #endif
564
565 #ifndef CmiNodeFirst
566 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
567 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
568 #endif
569
570 #ifndef CmiNodeOf
571 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
572 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
573 #endif
574
575 static size_t CmiAllAsyncMsgsSent(void)
576 {
577    SMSG_LIST *msg_tmp = sent_msgs;
578    MPI_Status sts;
579    int done;
580
581    while(msg_tmp!=0) {
582     done = 0;
583     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
584       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
585     if(!done)
586       return 0;
587     msg_tmp = msg_tmp->next;
588 /*    MsgQueueLen--; ????? */
589    }
590    return 1;
591 }
592
593 int CmiAsyncMsgSent(CmiCommHandle c) {
594
595   SMSG_LIST *msg_tmp = sent_msgs;
596   int done;
597   MPI_Status sts;
598
599   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
600     msg_tmp = msg_tmp->next;
601   if(msg_tmp) {
602     done = 0;
603     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
604       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
605     return ((done)?1:0);
606   } else {
607     return 1;
608   }
609 }
610
611 void CmiReleaseCommHandle(CmiCommHandle c)
612 {
613   return;
614 }
615
616 #if CMK_BLUEGENEL
617 extern void MPID_Progress_test();
618 #endif
619
620 void CmiReleaseSentMessages(void)
621 {
622   SMSG_LIST *msg_tmp=sent_msgs;
623   SMSG_LIST *prev=0;
624   SMSG_LIST *temp;
625   int done;
626   MPI_Status sts;
627
628 #if CMK_BLUEGENEL
629   MPID_Progress_test();
630 #endif
631
632   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
633   while(msg_tmp!=0) {
634     done =0;
635 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
636     double startT = CmiWallTimer();
637 #endif
638     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
639       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
640     if(done) {
641       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
642       MsgQueueLen--;
643       /* Release the message */
644       temp = msg_tmp->next;
645       if(prev==0)  /* first message */
646         sent_msgs = temp;
647       else
648         prev->next = temp;
649       CmiFree(msg_tmp->msg);
650       CmiFree(msg_tmp);
651       msg_tmp = temp;
652     } else {
653       prev = msg_tmp;
654       msg_tmp = msg_tmp->next;
655     }
656 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
657     {
658     double endT = CmiWallTimer();
659     /* only record the event if it takes more than 1ms */
660     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Test: release a msg", 60, startT, endT);
661     }
662 #endif
663   }
664   end_sent = prev;
665   MACHSTATE(2,"} CmiReleaseSentMessages end");
666 }
667
668 int PumpMsgs(void)
669 {
670   int nbytes, flg, res;
671   char *msg;
672   MPI_Status sts;
673   int recd=0;
674
675 #if CMI_EXERT_RECV_CAP
676   int recvCnt=0;
677 #endif
678         
679 #if CMK_BLUEGENEL
680   MPID_Progress_test();
681 #endif
682
683   MACHSTATE(2,"PumpMsgs begin {");
684
685         
686   while(1) {
687 #if CMI_EXERT_RECV_CAP
688         if(recvCnt==RECV_CAP) break;
689 #endif
690           
691     /* First check posted recvs then do  probe unmatched outstanding messages */
692 #if MPI_POST_RECV_COUNT > 0 
693     int completed_index=-1;
694     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
695         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
696     if(flg){
697         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
698             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
699
700         recd = 1;
701         msg = (char *) CmiAlloc(nbytes);
702         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
703         /* and repost the recv */
704
705         START_EVENT();
706
707         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
708             MPI_POST_RECV_SIZE,
709             MPI_BYTE,
710             MPI_ANY_SOURCE,
711             POST_RECV_TAG,
712             MPI_COMM_WORLD,
713             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
714                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
715
716         END_EVENT(50);
717
718         CpvAccess(Cmi_posted_recv_total)++;
719     }
720     else {
721         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
722         if(res != MPI_SUCCESS)
723         CmiAbort("MPI_Iprobe failed\n");
724         if(!flg) break;
725         recd = 1;
726         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
727         msg = (char *) CmiAlloc(nbytes);
728
729         START_EVENT();
730
731         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
732             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
733
734         END_EVENT(30);
735
736         CpvAccess(Cmi_unposted_recv_total)++;
737     }
738 #else
739     /* Original version */
740 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
741   double startT = CmiWallTimer(); 
742 #endif
743     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
744     if(res != MPI_SUCCESS)
745       CmiAbort("MPI_Iprobe failed\n");
746
747     if(!flg) break;
748 #if CMK_SMP_TRACE_COMMTHREAD || CMK_TRACE_COMMOVERHEAD
749     {
750     double endT = CmiWallTimer();
751     /* only trace the probe that last longer than 1ms */
752     if(endT-startT>=0.001) traceUserSuppliedBracketedNote("MPI_Iprobe before a recv call", 70, startT, endT);
753     }
754 #endif
755
756     recd = 1;
757     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
758     msg = (char *) CmiAlloc(nbytes);
759     
760     START_EVENT();
761
762     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
763       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
764
765     /*END_EVENT(30);*/
766
767 #endif
768
769 #if CMK_SMP_TRACE_COMMTHREAD
770         traceBeginCommOp(msg);
771         traceChangeLastTimestamp(CpvAccess(projTraceStart));
772         traceEndCommOp(msg);
773         #if CMI_MPI_TRACE_MOREDETAILED
774         char tmp[32];
775         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
776         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
777         #endif
778 #elif CMK_TRACE_COMMOVERHEAD
779         char tmp[32];
780         sprintf(tmp, "MPI_Recv: to proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
781         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
782 #endif
783         
784         
785     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
786     CMI_CHECK_CHECKSUM(msg, nbytes);
787 #if CMK_ERROR_CHECKING
788     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
789       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
790       CmiFree(msg);
791       CmiAbort("Abort!\n");
792       continue;
793     }
794 #endif
795         
796 #if CMK_BROADCAST_SPANNING_TREE
797     if (CMI_BROADCAST_ROOT(msg))
798       SendSpanningChildren(nbytes, msg);
799 #elif CMK_BROADCAST_HYPERCUBE
800     if (CMI_BROADCAST_ROOT(msg))
801       SendHypercube(nbytes, msg);
802 #endif
803         
804         /* In SMP mode, this push operation needs to be executed
805      * after forwarding broadcast messages. If it is executed
806      * earlier, then during the bcast msg forwarding period,    
807          * the msg could be already freed on the worker thread.
808          * As a result, the forwarded message could be wrong! 
809          * --Chao Mei
810          */
811 #if CMK_NODE_QUEUE_AVAILABLE
812     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
813       CmiPushNode(msg);
814     else
815 #endif
816         CmiPushPE(CMI_DEST_RANK(msg), msg);     
817         
818 #if CMI_EXERT_RECV_CAP
819         recvCnt++;
820 #endif  
821   }
822
823   
824 #if CMK_IMMEDIATE_MSG && !CMK_SMP
825   CmiHandleImmediate();
826 #endif
827   
828   MACHSTATE(2,"} PumpMsgs end ");
829   return recd;
830 }
831
832 /* blocking version */
833 static void PumpMsgsBlocking(void)
834 {
835   static int maxbytes = 20000000;
836   static char *buf = NULL;
837   int nbytes, flg;
838   MPI_Status sts;
839   char *msg;
840   int recd=0;
841
842   if (!PCQueueEmpty(CmiGetState()->recv)) return;
843   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
844   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
845   if (sent_msgs)  return;
846
847 #if 0
848   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
849 #endif
850
851   if (buf == NULL) {
852     buf = (char *) CmiAlloc(maxbytes);
853     _MEMCHECK(buf);
854   }
855
856
857 #if MPI_POST_RECV_COUNT > 0
858 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
859 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
860 #endif
861
862   START_EVENT();
863
864   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
865       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
866
867   /*END_EVENT(30);*/
868     
869    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
870    msg = (char *) CmiAlloc(nbytes);
871    memcpy(msg, buf, nbytes);
872
873 #if CMK_SMP_TRACE_COMMTHREAD
874         traceBeginCommOp(msg);
875         traceChangeLastTimestamp(CpvAccess(projTraceStart));
876         traceEndCommOp(msg);
877         #if CMI_MPI_TRACE_MOREDETAILED
878         char tmp[32];
879         sprintf(tmp, "To proc %d", CmiNodeFirst(CmiMyNode())+CMI_DEST_RANK(msg));
880         traceUserSuppliedBracketedNote(tmp, 30, CpvAccess(projTraceStart), CmiWallTimer());
881         #endif
882 #endif
883
884 #if CMK_BROADCAST_SPANNING_TREE
885    if (CMI_BROADCAST_ROOT(msg))
886       SendSpanningChildren(nbytes, msg);
887 #elif CMK_BROADCAST_HYPERCUBE
888    if (CMI_BROADCAST_ROOT(msg))
889       SendHypercube(nbytes, msg);
890 #endif
891   
892         /* In SMP mode, this push operation needs to be executed
893      * after forwarding broadcast messages. If it is executed
894      * earlier, then during the bcast msg forwarding period,    
895          * the msg could be already freed on the worker thread.
896          * As a result, the forwarded message could be wrong! 
897          * --Chao Mei
898          */  
899 #if CMK_NODE_QUEUE_AVAILABLE
900    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
901       CmiPushNode(msg);
902    else
903 #endif
904       CmiPushPE(CMI_DEST_RANK(msg), msg);
905 }
906
907 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
908
909 #if CMK_SMP
910
911 static int inexit = 0;
912 static CmiNodeLock  exitLock = 0;
913
914 static int MsgQueueEmpty()
915 {
916   int i;
917 #if MULTI_SENDQUEUE
918   for (i=0; i<_Cmi_mynodesize; i++)
919     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
920 #else
921   return PCQueueEmpty(sendMsgBuf);
922 #endif
923   return 1;
924 }
925
926 static int SendMsgBuf();
927
928 /* test if all processors recv queues are empty */
929 static int RecvQueueEmpty()
930 {
931   int i;
932   for (i=0; i<_Cmi_mynodesize; i++) {
933     CmiState cs=CmiGetStateN(i);
934     if (!PCQueueEmpty(cs->recv)) return 0;
935   }
936   return 1;
937 }
938
939 /**
940 CommunicationServer calls MPI to send messages in the queues and probe message from network.
941 */
942
943 #define REPORT_COMM_METRICS 0
944 #if REPORT_COMM_METRICS
945 static double pumptime = 0.0;
946 static double releasetime = 0.0;
947 static double sendtime = 0.0;
948 #endif
949
950 static void CommunicationServer(int sleepTime)
951 {
952   int static count=0;
953 /*
954   count ++;
955   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
956 */
957 #if REPORT_COMM_METRICS
958   double t1, t2, t3, t4;
959   t1 = CmiWallTimer();
960 #endif
961   PumpMsgs();
962 #if REPORT_COMM_METRICS
963   t2 = CmiWallTimer();
964 #endif
965   CmiReleaseSentMessages();
966 #if REPORT_COMM_METRICS
967   t3 = CmiWallTimer();
968 #endif
969   SendMsgBuf();
970 #if REPORT_COMM_METRICS
971   t4 = CmiWallTimer();
972   pumptime += (t2-t1);
973   releasetime += (t3-t2);
974   sendtime += (t4-t3);
975 #endif
976 /*
977   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
978 */
979   if (inexit == CmiMyNodeSize()) {
980     MACHSTATE(2, "CommunicationServer exiting {");
981 #if 0
982     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
983 #endif
984     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
985       CmiReleaseSentMessages();
986       SendMsgBuf();
987       PumpMsgs();
988     }
989     MACHSTATE(2, "CommunicationServer barrier begin {");
990
991     START_EVENT();
992
993     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
994       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
995
996     END_EVENT(10);
997
998     MACHSTATE(2, "} CommunicationServer barrier end");
999 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1000     if (CmiMyNode() == 0){
1001       CmiPrintf("End of program\n");
1002     }
1003 #endif
1004     MACHSTATE(2, "} CommunicationServer EXIT");
1005
1006     ConverseCommonExit();   
1007 #if REPORT_COMM_METRICS
1008     CmiPrintf("Report comm metrics from node %d[%d-%d]: pumptime: %f, releasetime: %f, senttime: %f\n", CmiMyNode(), CmiNodeFirst(CmiMyNode()), CmiNodeFirst(CmiMyNode())+CmiMyNodeSize()-1, pumptime, releasetime, sendtime);
1009 #endif
1010
1011 #if ! CMK_AUTOBUILD
1012     signal(SIGINT, signal_int);
1013     MPI_Finalize();
1014     #endif
1015     exit(0);
1016   }
1017 }
1018
1019 #endif
1020
1021 static void CommunicationServerThread(int sleepTime)
1022 {
1023 #if CMK_SMP
1024   CommunicationServer(sleepTime);
1025 #endif
1026 #if CMK_IMMEDIATE_MSG
1027   CmiHandleImmediate();
1028 #endif
1029 }
1030
1031 #if CMK_NODE_QUEUE_AVAILABLE
1032 char *CmiGetNonLocalNodeQ(void)
1033 {
1034   CmiState cs = CmiGetState();
1035   char *result = 0;
1036   CmiIdleLock_checkMessage(&cs->idle);
1037 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1038     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1039     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1040     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1041     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1042     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1043 /*  }  */
1044
1045   return result;
1046 }
1047 #endif
1048
1049 void *CmiGetNonLocal(void)
1050 {
1051   static int count=0;
1052   CmiState cs = CmiGetState();
1053   void *msg;
1054
1055 #if ! CMK_SMP
1056   if (CmiNumPes() == 1) return NULL;
1057 #endif
1058
1059   CmiIdleLock_checkMessage(&cs->idle);
1060   /* although it seems that lock is not needed, I found it crashes very often
1061      on mpi-smp without lock */
1062
1063 #if ! CMK_SMP
1064   CmiReleaseSentMessages();
1065   PumpMsgs();
1066 #endif
1067
1068   /* CmiLock(procState[cs->rank].recvLock); */
1069   msg =  PCQueuePop(cs->recv);
1070   /* CmiUnlock(procState[cs->rank].recvLock); */
1071
1072 /*
1073   if (msg) {
1074     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1075   else {
1076     count++;
1077     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1078   }
1079 */
1080 #if ! CMK_SMP
1081   if (no_outstanding_sends) {
1082     while (MsgQueueLen>0) {
1083       CmiReleaseSentMessages();
1084       PumpMsgs();
1085     }
1086   }
1087
1088   if(!msg) {
1089     CmiReleaseSentMessages();
1090     if (PumpMsgs())
1091       return  PCQueuePop(cs->recv);
1092     else
1093       return 0;
1094   }
1095 #endif
1096
1097   return msg;
1098 }
1099
1100 /* called in non-smp mode */
1101 void CmiNotifyIdle(void)
1102 {
1103   CmiReleaseSentMessages();
1104   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1105 }
1106
1107
1108 /********************************************************
1109     The call to probe immediate messages has been renamed to
1110     CmiMachineProgressImpl
1111 ******************************************************/
1112 /* user call to handle immediate message, only useful in non SMP version
1113    using polling method to schedule message.
1114 */
1115 /*
1116 #if CMK_IMMEDIATE_MSG
1117 void CmiProbeImmediateMsg()
1118 {
1119 #if !CMK_SMP
1120   PumpMsgs();
1121   CmiHandleImmediate();
1122 #endif
1123 }
1124 #endif
1125 */
1126
1127 /* Network progress function is used to poll the network when for
1128    messages. This flushes receive buffers on some  implementations*/
1129 #if CMK_MACHINE_PROGRESS_DEFINED
1130 void CmiMachineProgressImpl()
1131 {
1132 #if !CMK_SMP
1133     PumpMsgs();
1134 #if CMK_IMMEDIATE_MSG
1135     CmiHandleImmediate();
1136 #endif
1137 #else
1138     /*Not implemented yet. Communication server does not seem to be
1139       thread safe, so only communication thread call it */
1140     if (CmiMyRank() == CmiMyNodeSize())
1141         CommunicationServerThread(0);
1142 #endif
1143 }
1144 #endif
1145
1146 /********************* MESSAGE SEND FUNCTIONS ******************/
1147
1148 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1149
1150 static void CmiSendSelf(char *msg)
1151 {
1152 #if CMK_IMMEDIATE_MSG
1153     if (CmiIsImmediate(msg)) {
1154       /* CmiBecomeNonImmediate(msg); */
1155       CmiPushImmediateMsg(msg);
1156       CmiHandleImmediate();
1157       return;
1158     }
1159 #endif
1160     CQdCreate(CpvAccess(cQdState), 1);
1161     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1162 }
1163
1164 void CmiSyncSendFn(int destPE, int size, char *msg)
1165 {
1166   CmiState cs = CmiGetState();
1167   char *dupmsg = (char *) CmiAlloc(size);
1168   memcpy(dupmsg, msg, size);
1169
1170   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1171
1172   if (cs->pe==destPE) {
1173     CmiSendSelf(dupmsg);
1174   }
1175   else
1176     CmiAsyncSendFn_(destPE, size, dupmsg);
1177 }
1178
1179 #if CMK_SMP
1180
1181 /* called by communication thread in SMP */
1182 static int SendMsgBuf()
1183 {
1184   SMSG_LIST *msg_tmp;
1185   char *msg;
1186   int node, rank, size;
1187   int i;
1188   int sent = 0;
1189
1190 #if CMI_EXERT_SEND_CAP
1191         int sentCnt = 0;
1192 #endif  
1193         
1194   MACHSTATE(2,"SendMsgBuf begin {");
1195 #if MULTI_SENDQUEUE
1196   for (i=0; i<_Cmi_mynodesize=1; i++)  /* subtle: including comm thread */
1197   {
1198     if (!PCQueueEmpty(procState[i].sendMsgBuf))
1199     {
1200       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1201 #else
1202     /* single message sending queue */
1203     /* CmiLock(sendMsgBufLock); */
1204     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1205     /* CmiUnlock(sendMsgBufLock); */
1206     while (NULL != msg_tmp)
1207     {
1208 #endif
1209       node = msg_tmp->destpe;
1210       size = msg_tmp->size;
1211       msg = msg_tmp->msg;
1212       msg_tmp->next = 0;
1213       while (MsgQueueLen > request_max) {
1214         CmiReleaseSentMessages();
1215         PumpMsgs();
1216       }
1217       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1218 #if CMK_ERROR_CHECKING
1219       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1220 #endif
1221       CMI_SET_CHECKSUM(msg, size);
1222
1223 #if MPI_POST_RECV_COUNT > 0
1224         if(size <= MPI_POST_RECV_SIZE){
1225
1226           START_EVENT();
1227           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1228                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1229
1230           STOP_EVENT(40);
1231         }
1232         else {
1233             START_EVENT();
1234             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1235                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1236             STOP_EVENT(40);
1237         }
1238 #else
1239         START_EVENT();
1240         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1241             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1242         /*END_EVENT(40);*/
1243 #endif
1244         
1245 #if CMK_SMP_TRACE_COMMTHREAD
1246         traceBeginCommOp(msg);
1247         traceChangeLastTimestamp(CpvAccess(projTraceStart));
1248         /* traceSendMsgComm must execute after traceBeginCommOp because
1249          * we pretend we execute an entry method, and inside this we
1250          * pretend we will send another message. Otherwise how could
1251          * a message creation just before an entry method invocation?
1252          * If such logic is broken, the projections will not trace
1253          * messages correctly! -Chao Mei
1254          */
1255         traceSendMsgComm(msg);
1256         traceEndCommOp(msg);
1257         #if CMI_MPI_TRACE_MOREDETAILED
1258         char tmp[64];
1259         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", msg_tmp->srcpe, CmiNodeFirst(node)+CMI_DEST_RANK(msg));
1260         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1261         #endif
1262 #endif
1263                 
1264                 
1265       MACHSTATE(3,"}MPI_send end");
1266       MsgQueueLen++;
1267       if(sent_msgs==0)
1268         sent_msgs = msg_tmp;
1269       else
1270         end_sent->next = msg_tmp;
1271       end_sent = msg_tmp;
1272       sent=1;
1273           
1274 #if CMI_EXERT_SEND_CAP    
1275           if(++sentCnt == SEND_CAP) break;
1276 #endif    
1277           
1278 #if ! MULTI_SENDQUEUE
1279       /* CmiLock(sendMsgBufLock); */
1280       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1281       /* CmiUnlock(sendMsgBufLock); */
1282 #endif
1283     }
1284 #if MULTI_SENDQUEUE
1285   }
1286 #endif
1287   MACHSTATE(2,"}SendMsgBuf end ");
1288   return sent;
1289 }
1290
1291 void EnqueueMsg(void *m, int size, int node)
1292 {
1293   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1294   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1295   msg_tmp->msg = m;
1296   msg_tmp->size = size;
1297   msg_tmp->destpe = node;
1298         
1299 #if CMK_SMP_TRACE_COMMTHREAD
1300         msg_tmp->srcpe = CmiMyPe();
1301 #endif  
1302
1303 #if MULTI_SENDQUEUE
1304   PCQueuePush(procState[CmiMyRank()].sendMsgBuf,(char *)msg_tmp);
1305 #else
1306   CmiLock(sendMsgBufLock);
1307   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1308   CmiUnlock(sendMsgBufLock);
1309 #endif
1310         
1311   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1312 }
1313
1314 #endif
1315
1316 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1317 {
1318   CmiState cs = CmiGetState();
1319   SMSG_LIST *msg_tmp;
1320   CmiUInt2  rank, node;
1321
1322   if(destPE == cs->pe) {
1323     char *dupmsg = (char *) CmiAlloc(size);
1324     memcpy(dupmsg, msg, size);
1325     CmiSendSelf(dupmsg);
1326     return 0;
1327   }
1328   CQdCreate(CpvAccess(cQdState), 1);
1329 #if CMK_SMP
1330   node = CmiNodeOf(destPE);
1331   rank = CmiRankOf(destPE);
1332   if (node == CmiMyNode())  {
1333     CmiPushPE(rank, msg);
1334     return 0;
1335   }
1336   CMI_DEST_RANK(msg) = rank;
1337   EnqueueMsg(msg, size, node);
1338   return 0;
1339 #else
1340   /* non smp */
1341   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1342   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1343   msg_tmp->msg = msg;
1344   msg_tmp->next = 0;
1345   while (MsgQueueLen > request_max) {
1346         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1347         CmiReleaseSentMessages();
1348         PumpMsgs();
1349   }
1350 #if CMK_ERROR_CHECKING
1351   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1352 #endif
1353   CMI_SET_CHECKSUM(msg, size);
1354
1355 #if MPI_POST_RECV_COUNT > 0
1356         if(size <= MPI_POST_RECV_SIZE){
1357
1358           START_EVENT();
1359           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1360                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1361           END_EVENT(40);
1362         }
1363         else {
1364           START_EVENT();
1365           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1366                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1367           END_EVENT(40);
1368         }
1369 #else
1370   START_EVENT();
1371   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1372     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1373   /*END_EVENT(40);*/
1374   #if CMK_TRACE_COMMOVERHEAD
1375         char tmp[64];
1376         sprintf(tmp, "MPI_Isend: from proc %d to proc %d", CmiMyPe(), destPE);
1377         traceUserSuppliedBracketedNote(tmp, 40, CpvAccess(projTraceStart), CmiWallTimer());
1378   #endif
1379 #endif
1380
1381   MsgQueueLen++;
1382   if(sent_msgs==0)
1383     sent_msgs = msg_tmp;
1384   else
1385     end_sent->next = msg_tmp;
1386   end_sent = msg_tmp;
1387   return (CmiCommHandle) &(msg_tmp->req);
1388 #endif              /* non-smp */
1389 }
1390
1391 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1392 {
1393   CMI_SET_BROADCAST_ROOT(msg, 0);
1394   CmiAsyncSendFn_(destPE, size, msg);
1395 }
1396
1397 void CmiFreeSendFn(int destPE, int size, char *msg)
1398 {
1399   CmiState cs = CmiGetState();
1400   CMI_SET_BROADCAST_ROOT(msg, 0);
1401
1402   if (cs->pe==destPE) {
1403     CmiSendSelf(msg);
1404   } else {
1405     CmiAsyncSendFn_(destPE, size, msg);
1406   }
1407 }
1408
1409 /*********************** BROADCAST FUNCTIONS **********************/
1410
1411 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1412 void CmiSyncSendFn1(int destPE, int size, char *msg)
1413 {
1414   CmiState cs = CmiGetState();
1415   char *dupmsg = (char *) CmiAlloc(size);
1416   memcpy(dupmsg, msg, size);
1417   if (cs->pe==destPE)
1418     CmiSendSelf(dupmsg);
1419   else
1420     CmiAsyncSendFn_(destPE, size, dupmsg);
1421 }
1422
1423 /* send msg to its spanning children in broadcast. G. Zheng */
1424 void SendSpanningChildren(int size, char *msg)
1425 {
1426   CmiState cs = CmiGetState();
1427   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1428   int startnode = CmiNodeOf(startpe);
1429   int i, exceptRank;
1430         
1431    /* first send msgs to other nodes */
1432   CmiAssert(startnode >=0 &&  startnode<CmiNumNodes());
1433   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1434     int nd = CmiMyNode()-startnode;
1435     if (nd<0) nd+=CmiNumNodes();
1436     nd = BROADCAST_SPANNING_FACTOR*nd + i;
1437     if (nd > CmiNumNodes() - 1) break;
1438     nd += startnode;
1439     nd = nd%CmiNumNodes();
1440     CmiAssert(nd>=0 && nd!=CmiMyNode());        
1441         #if CMK_SMP
1442         /* always send to the first rank of other nodes */
1443         char *newmsg = CmiCopyMsg(msg, size);
1444         CMI_DEST_RANK(newmsg) = 0;
1445     EnqueueMsg(newmsg, size, nd);
1446         #else
1447         CmiSyncSendFn1(nd, size, msg);
1448         #endif
1449   }
1450 #if CMK_SMP  
1451    /* second send msgs to my peers on this node */
1452   /* FIXME: now it's just a flat p2p send!! When node size is large,
1453    * it should also be sent in a tree
1454    */
1455    exceptRank = CMI_DEST_RANK(msg);
1456    for(i=0; i<exceptRank; i++){
1457            CmiPushPE(i, CmiCopyMsg(msg, size));
1458    }
1459    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1460            CmiPushPE(i, CmiCopyMsg(msg, size));
1461    }
1462 #endif
1463 }
1464
1465 #include <math.h>
1466
1467 /* send msg along the hypercube in broadcast. (Sameer) */
1468 void SendHypercube(int size, char *msg)
1469 {
1470   CmiState cs = CmiGetState();
1471   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1472   int startnode = CmiNodeOf(startpe);
1473   int i, exceptRank, cnt, tmp, relPE;
1474   int dims=0;
1475
1476   /* dims = ceil(log2(CmiNumNodes)) except when #nodes is 1*/
1477   tmp = CmiNumNodes()-1;
1478   while(tmp>0){
1479           dims++;
1480           tmp = tmp >> 1;
1481   }
1482   if(CmiNumNodes()==1) dims=1;
1483   
1484    /* first send msgs to other nodes */  
1485   relPE = CmiMyNode()-startnode;
1486   if(relPE < 0) relPE += CmiNumNodes();
1487   cnt=0;
1488   tmp = relPE;
1489   /* count how many zeros (in binary format) relPE has */
1490   for(i=0; i<dims; i++, cnt++){
1491     if(tmp & 1 == 1) break;
1492     tmp = tmp >> 1;
1493   }
1494   
1495   /*CmiPrintf("ND[%d]: SendHypercube with spe=%d, snd=%d, relpe=%d, cnt=%d\n", CmiMyNode(), startpe, startnode, relPE, cnt);*/
1496   for (i = cnt-1; i >= 0; i--) {
1497     int nd = relPE + (1 << i);
1498         if(nd >= CmiNumNodes()) continue;
1499         nd = (nd+startnode)%CmiNumNodes();
1500         /*CmiPrintf("ND[%d]: send to node %d\n", CmiMyNode(), nd);*/
1501 #if CMK_SMP
1502     /* always send to the first rank of other nodes */
1503     char *newmsg = CmiCopyMsg(msg, size);
1504     CMI_DEST_RANK(newmsg) = 0;
1505     EnqueueMsg(newmsg, size, nd);
1506 #else
1507         CmiSyncSendFn1(nd, size, msg);
1508 #endif
1509   }
1510   
1511 #if CMK_SMP
1512    /* second send msgs to my peers on this node */
1513    /* FIXME: now it's just a flat p2p send!! When node size is large,
1514     * it should also be sent in a tree
1515     */
1516    exceptRank = CMI_DEST_RANK(msg);
1517    for(i=0; i<exceptRank; i++){
1518            CmiPushPE(i, CmiCopyMsg(msg, size));
1519    }
1520    for(i=exceptRank+1; i<CmiMyNodeSize(); i++){
1521            CmiPushPE(i, CmiCopyMsg(msg, size));
1522    }
1523 #endif
1524 }
1525
1526 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1527 {
1528   CmiState cs = CmiGetState();
1529
1530 #if CMK_SMP     
1531   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1532   CMI_DEST_RANK(msg) = CmiMyRank();
1533 #endif
1534         
1535 #if CMK_BROADCAST_SPANNING_TREE
1536   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1537   SendSpanningChildren(size, msg);
1538
1539 #elif CMK_BROADCAST_HYPERCUBE
1540   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1541   SendHypercube(size, msg);
1542
1543 #else
1544   int i;
1545
1546   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1547     CmiSyncSendFn(i, size,msg) ;
1548   for ( i=0; i<cs->pe; i++ )
1549     CmiSyncSendFn(i, size,msg) ;
1550 #endif
1551
1552   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1553 }
1554
1555
1556 /*  FIXME: luckily async is never used  G. Zheng */
1557 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1558 {
1559   CmiState cs = CmiGetState();
1560   int i ;
1561
1562   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1563     CmiAsyncSendFn(i,size,msg) ;
1564   for ( i=0; i<cs->pe; i++ )
1565     CmiAsyncSendFn(i,size,msg) ;
1566
1567   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1568   CmiAbort("CmiAsyncBroadcastFn should never be called");
1569   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1570 }
1571
1572 void CmiFreeBroadcastFn(int size, char *msg)
1573 {
1574    CmiSyncBroadcastFn(size,msg);
1575    CmiFree(msg);
1576 }
1577
1578 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1579 {
1580
1581 #if CMK_SMP     
1582   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1583   CMI_DEST_RANK(msg) = CmiMyRank();
1584 #endif
1585
1586 #if CMK_BROADCAST_SPANNING_TREE
1587   CmiState cs = CmiGetState();
1588   CmiSyncSendFn(cs->pe, size,msg) ;
1589   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1590   SendSpanningChildren(size, msg);
1591
1592 #elif CMK_BROADCAST_HYPERCUBE
1593   CmiState cs = CmiGetState();
1594   CmiSyncSendFn(cs->pe, size,msg) ;
1595   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1596   SendHypercube(size, msg);
1597
1598 #else
1599     int i ;
1600
1601   for ( i=0; i<_Cmi_numpes; i++ )
1602     CmiSyncSendFn(i,size,msg) ;
1603 #endif
1604
1605   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1606 }
1607
1608 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1609 {
1610   int i ;
1611
1612   for ( i=1; i<_Cmi_numpes; i++ )
1613     CmiAsyncSendFn(i,size,msg) ;
1614
1615   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1616
1617   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1618 }
1619
1620 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1621 {
1622 #if CMK_SMP     
1623   /* record the rank to avoid re-sending the msg in SendSpanningChildren */
1624   CMI_DEST_RANK(msg) = CmiMyRank();
1625 #endif
1626
1627 #if CMK_BROADCAST_SPANNING_TREE
1628   CmiState cs = CmiGetState();
1629   CmiSyncSendFn(cs->pe, size,msg) ;
1630   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1631   SendSpanningChildren(size, msg);
1632
1633 #elif CMK_BROADCAST_HYPERCUBE
1634   CmiState cs = CmiGetState();
1635   CmiSyncSendFn(cs->pe, size,msg) ;
1636   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1637   SendHypercube(size, msg);
1638
1639 #else
1640   int i ;
1641
1642   for ( i=0; i<_Cmi_numpes; i++ )
1643     CmiSyncSendFn(i,size,msg) ;
1644 #endif
1645   CmiFree(msg) ;
1646   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1647 }
1648
1649 #if CMK_NODE_QUEUE_AVAILABLE
1650
1651 static void CmiSendNodeSelf(char *msg)
1652 {
1653 #if CMK_IMMEDIATE_MSG
1654 #if 0
1655     if (CmiIsImmediate(msg) && !_immRunning) {
1656       /*CmiHandleImmediateMessage(msg); */
1657       CmiPushImmediateMsg(msg);
1658       CmiHandleImmediate();
1659       return;
1660     }
1661 #endif
1662     if (CmiIsImmediate(msg))
1663     {
1664       CmiPushImmediateMsg(msg);
1665       if (!_immRunning) CmiHandleImmediate();
1666       return;
1667     }
1668 #endif
1669     CQdCreate(CpvAccess(cQdState), 1);
1670     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1671     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1672     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1673 }
1674
1675 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1676 {
1677   int i;
1678   SMSG_LIST *msg_tmp;
1679   char *dupmsg;
1680
1681   CMI_SET_BROADCAST_ROOT(msg, 0);
1682   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1683   switch (dstNode) {
1684   case NODE_BROADCAST_ALL:
1685     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1686   case NODE_BROADCAST_OTHERS:
1687     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1688     for (i=0; i<_Cmi_numnodes; i++)
1689       if (i!=_Cmi_mynode) {
1690         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1691       }
1692     break;
1693   default:
1694     dupmsg = (char *)CmiCopyMsg(msg,size);
1695     if(dstNode == _Cmi_mynode) {
1696       CmiSendNodeSelf(dupmsg);
1697     }
1698     else {
1699       CQdCreate(CpvAccess(cQdState), 1);
1700       EnqueueMsg(dupmsg, size, dstNode);
1701     }
1702   }
1703   return 0;
1704 }
1705
1706 void CmiSyncNodeSendFn(int p, int s, char *m)
1707 {
1708   CmiAsyncNodeSendFn(p, s, m);
1709 }
1710
1711 /* need */
1712 void CmiFreeNodeSendFn(int p, int s, char *m)
1713 {
1714   CmiAsyncNodeSendFn(p, s, m);
1715   CmiFree(m);
1716 }
1717
1718 /* need */
1719 void CmiSyncNodeBroadcastFn(int s, char *m)
1720 {
1721   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1722 }
1723
1724 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1725 {
1726 }
1727
1728 /* need */
1729 void CmiFreeNodeBroadcastFn(int s, char *m)
1730 {
1731   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1732   CmiFree(m);
1733 }
1734
1735 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1736 {
1737   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1738 }
1739
1740 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1741 {
1742   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1743 }
1744
1745 /* need */
1746 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1747 {
1748   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1749   CmiFree(m);
1750 }
1751 #endif
1752
1753 /************************** MAIN ***********************************/
1754 #define MPI_REQUEST_MAX 16      /* 1024*10 */
1755
1756 void ConverseExit(void)
1757 {
1758 #if ! CMK_SMP
1759   while(!CmiAllAsyncMsgsSent()) {
1760     PumpMsgs();
1761     CmiReleaseSentMessages();
1762   }
1763   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1764     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1765
1766   ConverseCommonExit();
1767 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1768   if (CmiMyPe() == 0){
1769     CmiPrintf("End of program\n");
1770 #if MPI_POST_RECV_COUNT > 0
1771     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1772 #endif
1773 }
1774 #endif
1775 #if ! CMK_AUTOBUILD
1776   signal(SIGINT, signal_int);
1777   MPI_Finalize();
1778 #endif
1779   exit(0);
1780
1781 #else
1782     /* SMP version, communication thread will exit */
1783   ConverseCommonExit();
1784   /* atomic increment */
1785   CmiLock(exitLock);
1786   inexit++;
1787   CmiUnlock(exitLock);
1788   while (1) CmiYield();
1789 #endif
1790 }
1791
1792 static void registerMPITraceEvents() {
1793 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1794     traceRegisterUserEvent("MPI_Barrier", 10);
1795     traceRegisterUserEvent("MPI_Send", 20);
1796     traceRegisterUserEvent("MPI_Recv", 30);
1797     traceRegisterUserEvent("MPI_Isend", 40);
1798     traceRegisterUserEvent("MPI_Irecv", 50);
1799     traceRegisterUserEvent("MPI_Test", 60);
1800     traceRegisterUserEvent("MPI_Iprobe", 70);
1801 #endif
1802 }
1803
1804
1805 static char     **Cmi_argv;
1806 static char     **Cmi_argvcopy;
1807 static CmiStartFn Cmi_startfn;   /* The start function */
1808 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1809
1810 typedef struct {
1811   int sleepMs; /*Milliseconds to sleep while idle*/
1812   int nIdles; /*Number of times we've been idle in a row*/
1813   CmiState cs; /*Machine state*/
1814 } CmiIdleState;
1815
1816 static CmiIdleState *CmiNotifyGetState(void)
1817 {
1818   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1819   s->sleepMs=0;
1820   s->nIdles=0;
1821   s->cs=CmiGetState();
1822   return s;
1823 }
1824
1825 static void CmiNotifyBeginIdle(CmiIdleState *s)
1826 {
1827   s->sleepMs=0;
1828   s->nIdles=0;
1829 }
1830
1831 static void CmiNotifyStillIdle(CmiIdleState *s)
1832 {
1833 #if ! CMK_SMP
1834   CmiReleaseSentMessages();
1835   PumpMsgs();
1836 #else
1837 /*  CmiYield();  */
1838 #endif
1839
1840 #if 1
1841   {
1842   int nSpins=20; /*Number of times to spin before sleeping*/
1843   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1844   s->nIdles++;
1845   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1846     s->sleepMs+=2;
1847     if (s->sleepMs>10) s->sleepMs=10;
1848   }
1849   /*Comm. thread will listen on sockets-- just sleep*/
1850   if (s->sleepMs>0) {
1851     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1852     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1853     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1854   }
1855   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1856   }
1857 #endif
1858 }
1859
1860 #if MACHINE_DEBUG_LOG
1861 FILE *debugLog = NULL;
1862 #endif
1863
1864 static int machine_exit_idx;
1865 static void machine_exit(char *m) {
1866   EmergencyExit();
1867   /*printf("--> %d: machine_exit\n",CmiMyPe());*/
1868   fflush(stdout);
1869   CmiNodeBarrier();
1870   if (CmiMyRank() == 0) {
1871     MPI_Barrier(MPI_COMM_WORLD);
1872     /*printf("==> %d: passed barrier\n",CmiMyPe());*/
1873     MPI_Abort(MPI_COMM_WORLD, 1);
1874   } else {
1875     while (1) CmiYield();
1876   }
1877 }
1878
1879 static void KillOnAllSigs(int sigNo) {
1880   static int already_in_signal_handler = 0;
1881   char *m;
1882   if (already_in_signal_handler) MPI_Abort(MPI_COMM_WORLD,1);
1883   already_in_signal_handler = 1;
1884 #if CMK_CCS_AVAILABLE
1885   if (CpvAccess(cmiArgDebugFlag)) {
1886     CpdNotify(CPD_SIGNAL, sigNo);
1887     CpdFreeze();
1888   }
1889 #endif
1890   CmiError("------------- Processor %d Exiting: Caught Signal ------------\n"
1891       "Signal: %d\n",CmiMyPe(),sigNo);
1892   CmiPrintStackTrace(1);
1893
1894   m = CmiAlloc(CmiMsgHeaderSizeBytes);
1895   CmiSetHandler(m, machine_exit_idx);
1896   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
1897   machine_exit(m);
1898 }
1899
1900 static void ConverseRunPE(int everReturn)
1901 {
1902   CmiIdleState *s=CmiNotifyGetState();
1903   CmiState cs;
1904   char** CmiMyArgv;
1905
1906   CmiNodeAllBarrier();
1907
1908   cs = CmiGetState();
1909   CpvInitialize(void *,CmiLocalQueue);
1910   CpvAccess(CmiLocalQueue) = cs->localqueue;
1911
1912   if (CmiMyRank())
1913     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1914   else
1915     CmiMyArgv=Cmi_argv;
1916
1917   CthInit(CmiMyArgv);
1918
1919   ConverseCommonInit(CmiMyArgv);
1920   machine_exit_idx = CmiRegisterHandler((CmiHandler)machine_exit);
1921
1922 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1923   CpvInitialize(double, projTraceStart);
1924   /* only PE 0 needs to care about registration (to generate sts file). */
1925   if (CmiMyPe() == 0) {
1926     registerMachineUserEventsFunction(&registerMPITraceEvents);
1927   }
1928 #endif
1929
1930   /* initialize the network progress counter*/
1931   /* Network progress function is used to poll the network when for
1932      messages. This flushes receive buffers on some  implementations*/
1933   CpvInitialize(int , networkProgressCount);
1934   CpvAccess(networkProgressCount) = 0;
1935
1936 #if CMK_SMP
1937   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1938   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1939 #else
1940   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1941 #endif
1942
1943 #if MACHINE_DEBUG_LOG
1944   if (CmiMyRank() == 0) {
1945     char ln[200];
1946     sprintf(ln,"debugLog.%d",CmiMyNode());
1947     debugLog=fopen(ln,"w");
1948   }
1949 #endif
1950
1951   /* Converse initialization finishes, immediate messages can be processed.
1952      node barrier previously should take care of the node synchronization */
1953   _immediateReady = 1;
1954
1955   /* communication thread */
1956   if (CmiMyRank() == CmiMyNodeSize()) {
1957     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1958     while (1) CommunicationServerThread(5);
1959   }
1960   else {  /* worker thread */
1961   if (!everReturn) {
1962     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1963     if (Cmi_usrsched==0) CsdScheduler(-1);
1964     ConverseExit();
1965   }
1966   }
1967 }
1968
1969 static char *thread_level_tostring(int thread_level)
1970 {
1971 #if CMK_MPI_INIT_THREAD
1972   switch (thread_level) {
1973   case MPI_THREAD_SINGLE:
1974       return "MPI_THREAD_SINGLE";
1975   case MPI_THREAD_FUNNELED:
1976       return "MPI_THREAD_FUNNELED";
1977   case MPI_THREAD_SERIALIZED:
1978       return "MPI_THREAD_SERIALIZED";
1979   case MPI_THREAD_MULTIPLE :
1980       return "MPI_THREAD_MULTIPLE ";
1981   default: {
1982       char *str = (char*)malloc(5);
1983       sprintf(str,"%d", thread_level);
1984       return str;
1985       }
1986   }
1987   return  "unknown";
1988 #else
1989   char *str = (char*)malloc(5);
1990   sprintf(str,"%d", thread_level);
1991   return str;
1992 #endif
1993 }
1994
1995 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1996 {
1997   int n,i;
1998   int ver, subver;
1999   int provided;
2000   int thread_level;
2001
2002 #if MACHINE_DEBUG
2003   debugLog=NULL;
2004 #endif
2005 #if CMK_USE_HP_MAIN_FIX
2006 #if FOR_CPLUS
2007   _main(argc,argv);
2008 #endif
2009 #endif
2010
2011 #if CMK_MPI_INIT_THREAD
2012 #if CMK_SMP
2013   thread_level = MPI_THREAD_FUNNELED;
2014 #else
2015   thread_level = MPI_THREAD_SINGLE;
2016 #endif
2017   MPI_Init_thread(&argc, &argv, thread_level, &provided);
2018   _thread_provided = provided;
2019 #else
2020   MPI_Init(&argc, &argv);
2021   thread_level = 0;
2022   provided = -1;
2023 #endif
2024   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
2025   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
2026
2027   MPI_Get_version(&ver, &subver);
2028   if (_Cmi_mynode == 0) {
2029     printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
2030   }
2031
2032   /* processor per node */
2033   _Cmi_mynodesize = 1;
2034   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
2035     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
2036 #if ! CMK_SMP
2037   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
2038     CmiAbort("+ppn cannot be used in non SMP version!\n");
2039 #endif
2040   idleblock = CmiGetArgFlag(argv, "+idleblocking");
2041   if (idleblock && _Cmi_mynode == 0) {
2042     printf("Charm++: Running in idle blocking mode.\n");
2043   }
2044
2045   /* setup signal handlers */
2046   signal(SIGSEGV, KillOnAllSigs);
2047   signal(SIGFPE, KillOnAllSigs);
2048   signal(SIGILL, KillOnAllSigs);
2049   signal_int = signal(SIGINT, KillOnAllSigs);
2050   signal(SIGTERM, KillOnAllSigs);
2051   signal(SIGABRT, KillOnAllSigs);
2052 #   if !defined(_WIN32) || defined(__CYGWIN__) /*UNIX-only signals*/
2053   signal(SIGQUIT, KillOnAllSigs);
2054   signal(SIGBUS, KillOnAllSigs);
2055 /*#     if CMK_HANDLE_SIGUSR
2056   signal(SIGUSR1, HandleUserSignals);
2057   signal(SIGUSR2, HandleUserSignals);
2058 #     endif*/
2059 #   endif /*UNIX*/
2060   
2061 #if CMK_NO_OUTSTANDING_SENDS
2062   no_outstanding_sends=1;
2063 #endif
2064   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2065     no_outstanding_sends = 1;
2066     if (_Cmi_mynode == 0)
2067       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2068         no_outstanding_sends?"":" not");
2069   }
2070   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2071   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2072   Cmi_argvcopy = CmiCopyArgs(argv);
2073   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2074   /* find dim = log2(numpes), to pretend we are a hypercube */
2075   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2076     Cmi_dim++ ;
2077  /* CmiSpanTreeInit();*/
2078   request_max=MAX_QLEN;
2079   CmiGetArgInt(argv,"+requestmax",&request_max);
2080   /*printf("request max=%d\n", request_max);*/
2081
2082   /* checksum flag */
2083   if (CmiGetArgFlag(argv,"+checksum")) {
2084 #if !CMK_OPTIMIZE
2085     checksum_flag = 1;
2086     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2087 #else
2088     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2089 #endif
2090   }
2091
2092   {
2093   int debug = CmiGetArgFlag(argv,"++debug");
2094   int debug_no_pause = CmiGetArgFlag(argv,"++debug-no-pause");
2095   if (debug || debug_no_pause)
2096   {   /*Pause so user has a chance to start and attach debugger*/
2097 #if CMK_HAS_GETPID
2098     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2099     fflush(stdout);
2100     if (!debug_no_pause)
2101       sleep(15);
2102 #else
2103     printf("++debug ignored.\n");
2104 #endif
2105   }
2106   }
2107
2108 #if MPI_POST_RECV_COUNT > 0
2109
2110   CpvInitialize(unsigned long long, Cmi_posted_recv_total);
2111   CpvInitialize(unsigned long long, Cmi_unposted_recv_total);
2112   CpvInitialize(MPI_Request*, CmiPostedRecvRequests); 
2113   CpvInitialize(char*,CmiPostedRecvBuffers);
2114
2115     /* Post some extra recvs to help out with incoming messages */
2116     /* On some MPIs the messages are unexpected and thus slow */
2117
2118     /* An array of request handles for posted recvs */
2119     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2120
2121     /* An array of buffers for posted recvs */
2122     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2123
2124     /* Post Recvs */
2125     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2126         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2127                     MPI_POST_RECV_SIZE,
2128                     MPI_BYTE,
2129                     MPI_ANY_SOURCE,
2130                     POST_RECV_TAG,
2131                     MPI_COMM_WORLD,
2132                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2133           CmiAbort("MPI_Irecv failed\n");
2134     }
2135
2136 #endif
2137
2138
2139
2140   /* CmiTimerInit(); */
2141
2142 #if 0
2143   CthInit(argv);
2144   ConverseCommonInit(argv);
2145
2146   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2147   if (initret==0) {
2148     fn(CmiGetArgc(argv), argv);
2149     if (usched==0) CsdScheduler(-1);
2150     ConverseExit();
2151   }
2152 #endif
2153
2154   CsvInitialize(CmiNodeState, NodeState);
2155   CmiNodeStateInit(&CsvAccess(NodeState));
2156
2157   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2158
2159   for (i=0; i<_Cmi_mynodesize+1; i++) {
2160 #if MULTI_SENDQUEUE
2161     procState[i].sendMsgBuf = PCQueueCreate();
2162 #endif
2163     procState[i].recvLock = CmiCreateLock();
2164   }
2165 #if CMK_SMP
2166 #if !MULTI_SENDQUEUE
2167   sendMsgBuf = PCQueueCreate();
2168   sendMsgBufLock = CmiCreateLock();
2169 #endif
2170   exitLock = CmiCreateLock();            /* exit count lock */
2171 #endif
2172
2173   /* Network progress function is used to poll the network when for
2174      messages. This flushes receive buffers on some  implementations*/
2175   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2176   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2177
2178   CmiStartThreads(argv);
2179   ConverseRunPE(initret);
2180 }
2181
2182 /***********************************************************************
2183  *
2184  * Abort function:
2185  *
2186  ************************************************************************/
2187
2188 void CmiAbort(const char *message)
2189 {
2190   char *m;
2191   /* if CharmDebug is attached simply try to send a message to it */
2192 #if CMK_CCS_AVAILABLE
2193   if (CpvAccess(cmiArgDebugFlag)) {
2194     CpdNotify(CPD_ABORT, message);
2195     CpdFreeze();
2196   }
2197 #endif  
2198   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2199         "Reason: %s\n",CmiMyPe(),message);
2200  /*  CmiError(message); */
2201   CmiPrintStackTrace(0);
2202   m = CmiAlloc(CmiMsgHeaderSizeBytes);
2203   CmiSetHandler(m, machine_exit_idx);
2204   CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes, m);
2205   machine_exit(m);
2206   /* Program never reaches here */
2207   MPI_Abort(MPI_COMM_WORLD, 1);
2208 }
2209
2210
2211 #if 0
2212
2213 /* ****************************************************************** */
2214 /*    The following internal functions implement recd msg queue       */
2215 /* ****************************************************************** */
2216
2217 static void ** AllocBlock(unsigned int len)
2218 {
2219   void ** blk;
2220
2221   blk=(void **)CmiAlloc(len*sizeof(void *));
2222   if(blk==(void **)0) {
2223     CmiError("Cannot Allocate Memory!\n");
2224     MPI_Abort(MPI_COMM_WORLD, 1);
2225   }
2226   return blk;
2227 }
2228
2229 static void
2230 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2231 {
2232   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2233   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2234 }
2235
2236 void recdQueueInit(void)
2237 {
2238   recdQueue_blk = AllocBlock(BLK_LEN);
2239   recdQueue_blk_len = BLK_LEN;
2240   recdQueue_first = 0;
2241   recdQueue_len = 0;
2242 }
2243
2244 void recdQueueAddToBack(void *element)
2245 {
2246 #if NODE_0_IS_CONVHOST
2247   inside_comm = 1;
2248 #endif
2249   if(recdQueue_len==recdQueue_blk_len) {
2250     void **blk;
2251     recdQueue_blk_len *= 3;
2252     blk = AllocBlock(recdQueue_blk_len);
2253     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2254     CmiFree(recdQueue_blk);
2255     recdQueue_blk = blk;
2256     recdQueue_first = 0;
2257   }
2258   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2259 #if NODE_0_IS_CONVHOST
2260   inside_comm = 0;
2261 #endif
2262 }
2263
2264
2265 void * recdQueueRemoveFromFront(void)
2266 {
2267   if(recdQueue_len) {
2268     void *element;
2269     element = recdQueue_blk[recdQueue_first++];
2270     recdQueue_first %= recdQueue_blk_len;
2271     recdQueue_len--;
2272     return element;
2273   }
2274   return 0;
2275 }
2276
2277 #endif
2278
2279 /*@}*/