8a8d5178527309964837e83113915068a2da73cb
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22 #ifdef AMPI
23 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
24 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
25 #  error "Can't build Charm++ using AMPI version of mpi.h header"
26 #endif
27
28 /*Support for ++debug: */
29 #include <unistd.h> /*For getpid()*/
30 #include <stdlib.h> /*For sleep()*/
31
32 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
33 #define CMK_SMP 1
34 #endif
35
36 #include "machine.h"
37
38 #include "pcqueue.h"
39
40 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
41
42 #if CMK_VERSION_BLUEGENE
43 #define MAX_QLEN 8
44 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
45 #else
46 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
47 #define MAX_QLEN 200
48 #endif
49
50 #if CMI_MPI_TRACE_USEREVENTS
51 #ifndef CMK_OPTIMIZE
52 #if ! CMK_TRACE_IN_CHARM
53 CpvDeclare(double, projTraceStart);
54 #endif
55 #endif
56 #endif
57
58 /*
59     To reduce the buffer used in broadcast and distribute the load from
60   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
61   spanning tree broadcast algorithm.
62     This will use the fourth short in message as an indicator of spanning tree
63   root.
64 */
65 #if CMK_SMP
66 #define CMK_BROADCAST_SPANNING_TREE    0
67 #else
68 #define CMK_BROADCAST_SPANNING_TREE    1
69 #define CMK_BROADCAST_HYPERCUBE        0
70 #endif
71
72 #define BROADCAST_SPANNING_FACTOR      4
73
74 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
75 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
76
77 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
78 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
79
80 /* FIXME: need a random number that everyone agrees ! */
81 #define CHARM_MAGIC_NUMBER               126
82
83 #if !CMK_OPTIMIZE
84 static int checksum_flag = 0;
85 #define CMI_SET_CHECKSUM(msg, len)      \
86         if (checksum_flag)  {   \
87           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
88           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
89         }
90 #define CMI_CHECK_CHECKSUM(msg, len)    \
91         if (checksum_flag)      \
92           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
93             CmiAbort("Fatal error: checksum doesn't agree!\n");
94 #else
95 #define CMI_SET_CHECKSUM(msg, len)
96 #define CMI_CHECK_CHECKSUM(msg, len)
97 #endif
98
99 #if CMK_BROADCAST_SPANNING_TREE
100 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
101 #else
102 #  define CMI_SET_BROADCAST_ROOT(msg, root)
103 #endif
104
105 #if CMK_BROADCAST_HYPERCUBE
106 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
107 #else
108 #  define CMI_SET_CYCLE(msg, cycle)
109 #endif
110
111
112 /** 
113     If MPI_POST_RECV is defined, we provide default values for size 
114     and number of posted recieves. If MPI_POST_RECV_COUNT is set
115     then a default value for MPI_POST_RECV_SIZE is used if not specified
116     by the user.
117 */
118 #ifdef MPI_POST_RECV
119 #define MPI_POST_RECV_COUNT 10
120 #undef MPI_POST_RECV
121 #endif
122 #if MPI_POST_RECV_COUNT > 0
123 #warning "Using MPI posted receives which have not yet been tested"
124 #ifndef MPI_POST_RECV_SIZE
125 #define MPI_POST_RECV_SIZE 200
126 #endif
127 // #undef  MPI_POST_RECV_DEBUG 
128 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
129 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
130 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
131 CpvDeclare(char*,CmiPostedRecvBuffers);
132 #endif
133
134
135 /*
136  to avoid MPI's in order delivery, changing MPI Tag all the time
137 */
138 #define TAG     1375
139
140 #if MPI_POST_RECV_COUNT > 0
141 #define POST_RECV_TAG TAG+1
142 #define BARRIER_ZERO_TAG TAG
143 #else
144 #define BARRIER_ZERO_TAG     1375
145 #endif
146
147 /*
148 static int mpi_tag = TAG;
149 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
150 */
151
152 int               _Cmi_numpes;
153 int               _Cmi_mynode;    /* Which address space am I */
154 int               _Cmi_mynodesize;/* Number of processors in my address space */
155 int               _Cmi_numnodes;  /* Total number of address spaces */
156 int               _Cmi_numpes;    /* Total number of processors */
157 static int        Cmi_nodestart; /* First processor in this address space */
158 CpvDeclare(void*, CmiLocalQueue);
159
160 /*Network progress utility variables. Period controls the rate at
161   which the network poll is called */
162 CpvDeclare(unsigned , networkProgressCount);
163 int networkProgressPeriod;
164
165 int               idleblock = 0;
166
167 #define BLK_LEN  512
168
169 #if CMK_NODE_QUEUE_AVAILABLE
170 #define DGRAM_NODEMESSAGE   (0xFB)
171
172 #define NODE_BROADCAST_OTHERS (-1)
173 #define NODE_BROADCAST_ALL    (-2)
174 #endif
175
176 #if 0
177 static void **recdQueue_blk;
178 static unsigned int recdQueue_blk_len;
179 static unsigned int recdQueue_first;
180 static unsigned int recdQueue_len;
181 static void recdQueueInit(void);
182 static void recdQueueAddToBack(void *element);
183 static void *recdQueueRemoveFromFront(void);
184 #endif
185
186 static void ConverseRunPE(int everReturn);
187 static void CommunicationServer(int sleepTime);
188 static void CommunicationServerThread(int sleepTime);
189
190 typedef struct msg_list {
191      char *msg;
192      struct msg_list *next;
193      int size, destpe;
194      MPI_Request req;
195 } SMSG_LIST;
196
197 int MsgQueueLen=0;
198 static int request_max;
199
200 static SMSG_LIST *sent_msgs=0;
201 static SMSG_LIST *end_sent=0;
202
203 static int Cmi_dim;
204
205 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
206
207 #if NODE_0_IS_CONVHOST
208 int inside_comm = 0;
209 #endif
210
211 void CmiAbort(const char *message);
212 static void PerrorExit(const char *msg);
213
214 void SendSpanningChildren(int size, char *msg);
215 void SendHypercube(int size, char *msg);
216
217 static void PerrorExit(const char *msg)
218 {
219   perror(msg);
220   exit(1);
221 }
222
223 extern unsigned char computeCheckSum(unsigned char *data, int len);
224
225 /**************************  TIMER FUNCTIONS **************************/
226
227 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
228
229 /* MPI calls are not threadsafe, even the timer on some machines */
230 static CmiNodeLock  timerLock = 0;
231 static double starttimer = 0;
232 static int _is_global = 0;
233
234 int CmiTimerIsSynchronized()
235 {
236   int  flag;
237   void *v;
238
239   /*  check if it using synchronized timer */
240   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
241     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
242   if (flag) {
243     _is_global = *(int*)v;
244     if (_is_global && CmiMyPe() == 0)
245       printf("Charm++> MPI timer is synchronized!\n");
246   }
247   return _is_global;
248 }
249
250 void CmiTimerInit()
251 {
252   _is_global = CmiTimerIsSynchronized();
253
254   if (CmiMyRank() == 0) {
255     if (_is_global) {
256       double minTimer;
257 #if CMK_TIMER_USE_XT3_DCLOCK
258       starttimer = dclock();
259 #else
260       starttimer = MPI_Wtime();
261 #endif
262
263       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
264                                   MPI_COMM_WORLD );
265       starttimer = minTimer;
266     }
267     else {
268       /* we don't have a synchronous timer, set our own start time */
269       CmiBarrier();
270       CmiBarrier();
271       CmiBarrier();
272 #if CMK_TIMER_USE_XT3_DCLOCK
273       starttimer = dclock();
274 #else
275       starttimer = MPI_Wtime();
276 #endif
277     }
278   }
279   CmiNodeAllBarrier();          /* for smp */
280 /*  timerLock = CmiCreateLock(); */
281 }
282
283 double CmiTimer(void)
284 {
285   double t;
286 #if CMK_SMP
287   if (timerLock) CmiLock(timerLock);
288 #endif
289 #if CMK_TIMER_USE_XT3_DCLOCK
290   t = dclock() - starttimer;
291 #else
292   t = MPI_Wtime() - starttimer;
293 #endif
294 #if CMK_SMP
295   if (timerLock) CmiUnlock(timerLock);
296 #endif
297   return t;
298 }
299
300 double CmiWallTimer(void)
301 {
302   double t;
303 #if CMK_SMP
304   if (timerLock) CmiLock(timerLock);
305 #endif
306 #if CMK_TIMER_USE_XT3_DCLOCK
307   t = dclock() - starttimer;
308 #else
309   t = MPI_Wtime() - starttimer;
310 #endif
311 #if CMK_SMP
312   if (timerLock) CmiUnlock(timerLock);
313 #endif
314   return t;
315 }
316
317 double CmiCpuTimer(void)
318 {
319   double t;
320 #if CMK_SMP
321   if (timerLock) CmiLock(timerLock);
322 #endif
323 #if CMK_TIMER_USE_XT3_DCLOCK
324   t = dclock() - starttimer;
325 #else
326   t = MPI_Wtime() - starttimer;
327 #endif
328 #if CMK_SMP
329   if (timerLock) CmiUnlock(timerLock);
330 #endif
331   return t;
332 }
333
334 #endif
335
336 void CmiBarrier()
337 {
338   if (CmiMyRank() == 0) {
339
340 #if CMI_MPI_TRACE_USEREVENTS
341 #ifndef CMK_OPTIMIZE 
342 #if ! CMK_TRACE_IN_CHARM
343 #if CMK_TIMER_USE_XT3_DCLOCK
344   CpvAccess(projTraceStart) = dclock() - starttimer;
345 #else
346   CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
347 #endif
348 #endif
349 #endif
350 #endif
351
352     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
353         CmiAbort("Timernit: MPI_Barrier failed!\n");
354
355 #if CMI_MPI_TRACE_USEREVENTS
356 #ifndef CMK_OPTIMIZE 
357 #if ! CMK_TRACE_IN_CHARM
358 #if CMK_TIMER_USE_XT3_DCLOCK
359     traceUserBracketEvent(10, CpvAccess(projTraceStart), 
360                           (dclock() - starttimer));
361 #else
362     traceUserBracketEvent(10, CpvAccess(projTraceStart),
363                           (MPI_Wtime() - starttimer));
364 #endif
365 #endif
366 #endif
367 #endif
368
369   }
370 }
371
372 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
373 void CmiBarrierZero()
374 {
375   int i;
376   if (CmiMyRank() == 0) {
377     char msg[1];
378     MPI_Status sts;
379     if (CmiMyNode() == 0)  {
380       for (i=0; i<CmiNumNodes()-1; i++) {
381          CmiPrintf("CmiBarrierZero loop\n");
382
383 #if CMI_MPI_TRACE_USEREVENTS
384 #ifndef CMK_OPTIMIZE
385 #if ! CMK_TRACE_IN_CHARM
386 #if CMK_TIMER_USE_XT3_DCLOCK
387          CpvAccess(projTraceStart) = dclock() - starttimer;
388 #else
389          CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
390 #endif
391 #endif
392 #endif
393 #endif
394
395           if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
396             printf("MPI_Recv failed!\n");
397
398 #if CMI_MPI_TRACE_USEREVENTS
399 #ifndef CMK_OPTIMIZE 
400 #if ! CMK_TRACE_IN_CHARM
401 #if CMK_TIMER_USE_XT3_DCLOCK
402     traceUserBracketEvent(30, CpvAccess(projTraceStart), 
403                           (dclock() - starttimer));
404 #else
405     traceUserBracketEvent(30, CpvAccess(projTraceStart),
406                           (MPI_Wtime() - starttimer));
407 #endif
408 #endif
409 #endif
410 #endif
411
412       }
413     }
414     else {
415
416 #if CMI_MPI_TRACE_USEREVENTS
417 #ifndef CMK_OPTIMIZE
418 #if ! CMK_TRACE_IN_CHARM
419 #if CMK_TIMER_USE_XT3_DCLOCK
420       CpvAccess(projTraceStart) = dclock() - starttimer;
421 #else
422       CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
423 #endif
424 #endif
425 #endif
426 #endif
427
428       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
429          printf("MPI_Send failed!\n");
430
431 #if CMI_MPI_TRACE_USEREVENTS
432 #ifndef CMK_OPTIMIZE 
433 #if ! CMK_TRACE_IN_CHARM
434 #if CMK_TIMER_USE_XT3_DCLOCK
435     traceUserBracketEvent(20, CpvAccess(projTraceStart), 
436                           (dclock() - starttimer));
437 #else
438     traceUserBracketEvent(20, CpvAccess(projTraceStart),
439                           (MPI_Wtime() - starttimer));
440 #endif
441 #endif
442 #endif
443 #endif
444     }
445   }
446   CmiNodeAllBarrier();
447 }
448
449 typedef struct ProcState {
450 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
451 CmiNodeLock  recvLock;              /* for cs->recv */
452 } ProcState;
453
454 static ProcState  *procState;
455
456 #if CMK_SMP
457
458 static PCQueue sendMsgBuf;
459 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
460
461 #endif
462
463 /************************************************************
464  *
465  * Processor state structure
466  *
467  ************************************************************/
468
469 /* fake Cmi_charmrun_fd */
470 static int Cmi_charmrun_fd = 0;
471 #include "machine-smp.c"
472
473 CsvDeclare(CmiNodeState, NodeState);
474
475 #include "immediate.c"
476
477 #if ! CMK_SMP
478 /************ non SMP **************/
479 static struct CmiStateStruct Cmi_state;
480 int _Cmi_mype;
481 int _Cmi_myrank;
482
483 void CmiMemLock() {}
484 void CmiMemUnlock() {}
485
486 #define CmiGetState() (&Cmi_state)
487 #define CmiGetStateN(n) (&Cmi_state)
488
489 void CmiYield(void) { sleep(0); }
490
491 static void CmiStartThreads(char **argv)
492 {
493   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
494   _Cmi_mype = Cmi_nodestart;
495   _Cmi_myrank = 0;
496 }
497 #endif  /* non smp */
498
499 /*Add a message to this processor's receive queue, pe is a rank */
500 static void CmiPushPE(int pe,void *msg)
501 {
502   CmiState cs = CmiGetStateN(pe);
503   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
504 #if CMK_IMMEDIATE_MSG
505   if (CmiIsImmediate(msg)) {
506 /*
507 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
508     CmiHandleMessage(msg);
509 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
510 */
511     /**(CmiUInt2 *)msg = pe;*/
512     CMI_DEST_RANK(msg) = pe;
513     CmiPushImmediateMsg(msg);
514     return;
515   }
516 #endif
517
518 #if CMK_SMP
519   CmiLock(procState[pe].recvLock);
520 #endif
521   PCQueuePush(cs->recv,msg);
522 #if CMK_SMP
523   CmiUnlock(procState[pe].recvLock);
524 #endif
525   CmiIdleLock_addMessage(&cs->idle);
526   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
527 }
528
529 #if CMK_NODE_QUEUE_AVAILABLE
530 /*Add a message to this processor's receive queue */
531 static void CmiPushNode(void *msg)
532 {
533   MACHSTATE(3,"Pushing message into NodeRecv queue");
534 #if CMK_IMMEDIATE_MSG
535   if (CmiIsImmediate(msg)) {
536     CMI_DEST_RANK(msg) = 0;
537     CmiPushImmediateMsg(msg);
538     return;
539   }
540 #endif
541   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
542   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
543   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
544   {
545   CmiState cs=CmiGetStateN(0);
546   CmiIdleLock_addMessage(&cs->idle);
547   }
548 }
549 #endif
550
551 #ifndef CmiMyPe
552 int CmiMyPe(void)
553 {
554   return CmiGetState()->pe;
555 }
556 #endif
557
558 #ifndef CmiMyRank
559 int CmiMyRank(void)
560 {
561   return CmiGetState()->rank;
562 }
563 #endif
564
565 #ifndef CmiNodeFirst
566 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
567 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
568 #endif
569
570 #ifndef CmiNodeOf
571 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
572 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
573 #endif
574
575 static int CmiAllAsyncMsgsSent(void)
576 {
577    SMSG_LIST *msg_tmp = sent_msgs;
578    MPI_Status sts;
579    int done;
580
581    while(msg_tmp!=0) {
582     done = 0;
583     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
584       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
585     if(!done)
586       return 0;
587     msg_tmp = msg_tmp->next;
588 /*    MsgQueueLen--; ????? */
589    }
590    return 1;
591 }
592
593 int CmiAsyncMsgSent(CmiCommHandle c) {
594
595   SMSG_LIST *msg_tmp = sent_msgs;
596   int done;
597   MPI_Status sts;
598
599   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
600     msg_tmp = msg_tmp->next;
601   if(msg_tmp) {
602     done = 0;
603     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
604       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
605     return ((done)?1:0);
606   } else {
607     return 1;
608   }
609 }
610
611 void CmiReleaseCommHandle(CmiCommHandle c)
612 {
613   return;
614 }
615
616 #if CMK_VERSION_BLUEGENE
617 extern void MPID_Progress_test();
618 #endif
619
620 void CmiReleaseSentMessages(void)
621 {
622   SMSG_LIST *msg_tmp=sent_msgs;
623   SMSG_LIST *prev=0;
624   SMSG_LIST *temp;
625   int done;
626   MPI_Status sts;
627
628
629 #if CMK_VERSION_BLUEGENE
630   MPID_Progress_test();
631 #endif
632
633   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
634   while(msg_tmp!=0) {
635     done =0;
636     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
637       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
638     if(done) {
639       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
640       MsgQueueLen--;
641       /* Release the message */
642       temp = msg_tmp->next;
643       if(prev==0)  /* first message */
644         sent_msgs = temp;
645       else
646         prev->next = temp;
647       CmiFree(msg_tmp->msg);
648       CmiFree(msg_tmp);
649       msg_tmp = temp;
650     } else {
651       prev = msg_tmp;
652       msg_tmp = msg_tmp->next;
653     }
654   }
655   end_sent = prev;
656   MACHSTATE(2,"} CmiReleaseSentMessages end");
657 }
658
659 int PumpMsgs(void)
660 {
661   int nbytes, flg, res;
662   char *msg;
663   MPI_Status sts;
664   int recd=0;
665
666 #if CMK_VERSION_BLUEGENE
667   MPID_Progress_test();
668 #endif
669
670   MACHSTATE(2,"PumpMsgs begin {");
671
672   while(1) {
673     /* First check posted recvs then do  probe unmatched outstanding messages */
674 #if MPI_POST_RECV_COUNT > 0 
675     int completed_index=-1;
676     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
677         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
678     if(flg){
679         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
680             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
681
682
683         //        CmiPrintf("Received a posted message of %d bytes\n",nbytes);
684
685         recd = 1;
686         msg = (char *) CmiAlloc(nbytes);
687         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
688         /* and repost the recv */
689
690 #if CMI_MPI_TRACE_USEREVENTS
691 #ifndef CMK_OPTIMIZE
692 #if ! CMK_TRACE_IN_CHARM
693 #if CMK_TIMER_USE_XT3_DCLOCK
694         CpvAccess(projTraceStart) = dclock() - starttimer;
695 #else
696         CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
697 #endif
698 #endif
699 #endif
700 #endif
701
702         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
703             MPI_POST_RECV_SIZE,
704             MPI_BYTE,
705             MPI_ANY_SOURCE,
706             POST_RECV_TAG,
707             MPI_COMM_WORLD,
708             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
709                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
710
711 #if CMI_MPI_TRACE_USEREVENTS
712 #ifndef CMK_OPTIMIZE 
713 #if ! CMK_TRACE_IN_CHARM
714 #if CMK_TIMER_USE_XT3_DCLOCK
715     traceUserBracketEvent(50, CpvAccess(projTraceStart), 
716                           (dclock() - starttimer));
717 #else
718     traceUserBracketEvent(50, CpvAccess(projTraceStart),
719                           (MPI_Wtime() - starttimer));
720 #endif
721 #endif
722 #endif
723 #endif
724
725         CpvAccess(Cmi_posted_recv_total)++;
726     }
727     else {
728         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
729         if(res != MPI_SUCCESS)
730         CmiAbort("MPI_Iprobe failed\n");
731         if(!flg) break;
732         recd = 1;
733         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
734         msg = (char *) CmiAlloc(nbytes);
735
736 #if CMI_MPI_TRACE_USEREVENTS
737 #ifndef CMK_OPTIMIZE
738 #if ! CMK_TRACE_IN_CHARM
739 #if CMK_TIMER_USE_XT3_DCLOCK
740         CpvAccess(projTraceStart) = dclock() - starttimer;
741 #else
742         CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
743 #endif
744 #endif
745 #endif
746 #endif
747
748         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
749             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
750
751 #if CMI_MPI_TRACE_USEREVENTS
752 #ifndef CMK_OPTIMIZE 
753 #if ! CMK_TRACE_IN_CHARM
754 #if CMK_TIMER_USE_XT3_DCLOCK
755     traceUserBracketEvent(30, CpvAccess(projTraceStart), 
756                           (dclock() - starttimer));
757 #else
758     traceUserBracketEvent(30, CpvAccess(projTraceStart),
759                           (MPI_Wtime() - starttimer));
760 #endif
761 #endif
762 #endif
763 #endif
764
765         CpvAccess(Cmi_unposted_recv_total)++;
766     }
767 #else
768     /* Original version */
769     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
770     if(res != MPI_SUCCESS)
771       CmiAbort("MPI_Iprobe failed\n");
772
773     if(!flg) break;
774     recd = 1;
775     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
776     msg = (char *) CmiAlloc(nbytes);
777
778 #if CMI_MPI_TRACE_USEREVENTS
779 #ifndef CMK_OPTIMIZE
780 #if ! CMK_TRACE_IN_CHARM
781 #if CMK_TIMER_USE_XT3_DCLOCK
782     CpvAccess(projTraceStart) = dclock() - starttimer;
783 #else
784     CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
785 #endif
786 #endif
787 #endif
788 #endif
789
790     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
791       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
792
793 #if CMI_MPI_TRACE_USEREVENTS
794 #ifndef CMK_OPTIMIZE 
795 #if ! CMK_TRACE_IN_CHARM
796 #if CMK_TIMER_USE_XT3_DCLOCK
797     traceUserBracketEvent(30, CpvAccess(projTraceStart), 
798                           (dclock() - starttimer));
799 #else
800     traceUserBracketEvent(30, CpvAccess(projTraceStart),
801                           (MPI_Wtime() - starttimer));
802 #endif
803 #endif
804 #endif
805 #endif
806
807
808 #endif
809
810
811
812     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
813     CMI_CHECK_CHECKSUM(msg, nbytes);
814     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
815       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
816       CmiFree(msg);
817       CmiAbort("Abort!\n");
818       continue;
819     }
820 #if CMK_NODE_QUEUE_AVAILABLE
821     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
822       CmiPushNode(msg);
823     else
824 #endif
825       CmiPushPE(CMI_DEST_RANK(msg), msg);
826
827 #if CMK_BROADCAST_SPANNING_TREE
828     if (CMI_BROADCAST_ROOT(msg))
829       SendSpanningChildren(nbytes, msg);
830 #elif CMK_BROADCAST_HYPERCUBE
831     if (CMI_GET_CYCLE(msg))
832       SendHypercube(nbytes, msg);
833 #endif
834   }
835 #if CMK_IMMEDIATE_MSG && !CMK_SMP
836   CmiHandleImmediate();
837 #endif
838   MACHSTATE(2,"} PumpMsgs end ");
839   return recd;
840 }
841
842 /* blocking version */
843 static void PumpMsgsBlocking(void)
844 {
845   static int maxbytes = 20000000;
846   static char *buf = NULL;
847   int nbytes, flg;
848   MPI_Status sts;
849   char *msg;
850   int recd=0;
851
852   if (!PCQueueEmpty(CmiGetState()->recv)) return;
853   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
854   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
855   if (sent_msgs)  return;
856
857 #if 0
858   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
859 #endif
860
861   if (buf == NULL) {
862     buf = (char *) CmiAlloc(maxbytes);
863     _MEMCHECK(buf);
864   }
865
866
867 #if MPI_POST_RECV_COUNT > 0
868 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
869 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
870 #endif
871
872 #if CMI_MPI_TRACE_USEREVENTS
873 #ifndef CMK_OPTIMIZE
874 #if ! CMK_TRACE_IN_CHARM
875 #if CMK_TIMER_USE_XT3_DCLOCK
876  CpvAccess(projTraceStart) = dclock() - starttimer;
877 #else
878  CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
879 #endif
880 #endif
881 #endif
882 #endif
883
884   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
885       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
886
887 #if CMI_MPI_TRACE_USEREVENTS
888 #ifndef CMK_OPTIMIZE 
889 #if ! CMK_TRACE_IN_CHARM
890 #if CMK_TIMER_USE_XT3_DCLOCK
891     traceUserBracketEvent(30, CpvAccess(projTraceStart), 
892                           (dclock() - starttimer));
893 #else
894     traceUserBracketEvent(30, CpvAccess(projTraceStart),
895                           (MPI_Wtime() - starttimer));
896 #endif
897 #endif
898 #endif
899 #endif
900
901    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
902    msg = (char *) CmiAlloc(nbytes);
903    memcpy(msg, buf, nbytes);
904
905 #if CMK_NODE_QUEUE_AVAILABLE
906    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
907       CmiPushNode(msg);
908    else
909 #endif
910       CmiPushPE(CMI_DEST_RANK(msg), msg);
911
912 #if CMK_BROADCAST_SPANNING_TREE
913    if (CMI_BROADCAST_ROOT(msg))
914       SendSpanningChildren(nbytes, msg);
915 #elif CMK_BROADCAST_HYPERCUBE
916    if (CMI_GET_CYCLE(msg))
917       SendHypercube(nbytes, msg);
918 #endif
919 }
920
921 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
922
923 #if CMK_SMP
924
925 static int inexit = 0;
926
927 static int MsgQueueEmpty()
928 {
929   int i;
930 #if 0
931   for (i=0; i<_Cmi_mynodesize; i++)
932     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
933 #else
934   return PCQueueEmpty(sendMsgBuf);
935 #endif
936   return 1;
937 }
938
939 static int SendMsgBuf();
940
941 /* test if all processors recv queues are empty */
942 static int RecvQueueEmpty()
943 {
944   int i;
945   for (i=0; i<_Cmi_mynodesize; i++) {
946     CmiState cs=CmiGetStateN(i);
947     if (!PCQueueEmpty(cs->recv)) return 0;
948   }
949   return 1;
950 }
951
952 /**
953 CommunicationServer calls MPI to send messages in the queues and probe message from network.
954 */
955 static void CommunicationServer(int sleepTime)
956 {
957   int static count=0;
958 /*
959   count ++;
960   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
961 */
962   PumpMsgs();
963   CmiReleaseSentMessages();
964   SendMsgBuf();
965 /*
966   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
967 */
968   if (inexit == CmiMyNodeSize()) {
969     MACHSTATE(2, "CommunicationServer exiting {");
970 #if 0
971     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
972 #endif
973     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
974       CmiReleaseSentMessages();
975       SendMsgBuf();
976       PumpMsgs();
977     }
978     MACHSTATE(2, "CommunicationServer barrier begin {");
979
980
981 #if CMI_MPI_TRACE_USEREVENTS
982 #ifndef CMK_OPTIMIZE 
983 #if ! CMK_TRACE_IN_CHARM
984 #if CMK_TIMER_USE_XT3_DCLOCK
985     CpvAccess(projTraceStart) = dclock() - starttimer;
986 #else
987     CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
988 #endif
989 #endif
990 #endif
991 #endif
992
993     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
994       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
995
996 #if CMI_MPI_TRACE_USEREVENTS
997 #ifndef CMK_OPTIMIZE 
998 #if ! CMK_TRACE_IN_CHARM
999 #if CMK_TIMER_USE_XT3_DCLOCK
1000     traceUserBracketEvent(10, CpvAccess(projTraceStart), 
1001                           (dclock() - starttimer));
1002 #else
1003     traceUserBracketEvent(10, CpvAccess(projTraceStart),
1004                           (MPI_Wtime() - starttimer));
1005 #endif
1006 #endif
1007 #endif
1008 #endif
1009
1010     MACHSTATE(2, "} CommunicationServer barrier end");
1011 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1012     if (CmiMyNode() == 0){
1013       CmiPrintf("End of program\n");
1014     }
1015 #endif
1016     MACHSTATE(2, "} CommunicationServer EXIT");
1017 #if CMK_MPI_VMI
1018     /* vmi mpi always return exit status 255 when calling MPI_Finalize() which broke Make */
1019   exit(0);
1020 #else
1021     MPI_Finalize();
1022 #endif
1023     exit(0);
1024   }
1025 }
1026
1027 #endif
1028
1029 static void CommunicationServerThread(int sleepTime)
1030 {
1031 #if CMK_SMP
1032   CommunicationServer(sleepTime);
1033 #endif
1034 #if CMK_IMMEDIATE_MSG
1035   CmiHandleImmediate();
1036 #endif
1037 }
1038
1039 #if CMK_NODE_QUEUE_AVAILABLE
1040 char *CmiGetNonLocalNodeQ(void)
1041 {
1042   CmiState cs = CmiGetState();
1043   char *result = 0;
1044   CmiIdleLock_checkMessage(&cs->idle);
1045 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
1046     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
1047     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1048     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
1049     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1050     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
1051 /*  }  */
1052   return result;
1053 }
1054 #endif
1055
1056 void *CmiGetNonLocal(void)
1057 {
1058   static int count=0;
1059   CmiState cs = CmiGetState();
1060   void *msg;
1061   CmiIdleLock_checkMessage(&cs->idle);
1062   /* although it seems that lock is not needed, I found it crashes very often
1063      on mpi-smp without lock */
1064
1065 #if ! CMK_SMP
1066   CmiReleaseSentMessages();
1067   PumpMsgs();
1068 #endif
1069
1070   CmiLock(procState[cs->rank].recvLock);
1071   msg =  PCQueuePop(cs->recv);
1072   CmiUnlock(procState[cs->rank].recvLock);
1073
1074 /*
1075   if (msg) {
1076     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
1077   else {
1078     count++;
1079     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
1080   }
1081 */
1082 #if ! CMK_SMP
1083   if (no_outstanding_sends) {
1084     while (MsgQueueLen>0) {
1085       CmiReleaseSentMessages();
1086       PumpMsgs();
1087     }
1088   }
1089
1090   if(!msg) {
1091     CmiReleaseSentMessages();
1092     if (PumpMsgs())
1093       return  PCQueuePop(cs->recv);
1094     else
1095       return 0;
1096   }
1097 #endif
1098   return msg;
1099 }
1100
1101 /* called in non-smp mode */
1102 void CmiNotifyIdle(void)
1103 {
1104   CmiReleaseSentMessages();
1105   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
1106 }
1107
1108
1109 /********************************************************
1110     The call to probe immediate messages has been renamed to
1111     CmiMachineProgressImpl
1112 ******************************************************/
1113 /* user call to handle immediate message, only useful in non SMP version
1114    using polling method to schedule message.
1115 */
1116 /*
1117 #if CMK_IMMEDIATE_MSG
1118 void CmiProbeImmediateMsg()
1119 {
1120 #if !CMK_SMP
1121   PumpMsgs();
1122   CmiHandleImmediate();
1123 #endif
1124 }
1125 #endif
1126 */
1127
1128 /* Network progress function is used to poll the network when for
1129    messages. This flushes receive buffers on some  implementations*/
1130 void CmiMachineProgressImpl()
1131 {
1132 #if !CMK_SMP
1133     PumpMsgs();
1134 #if CMK_IMMEDIATE_MSG
1135     CmiHandleImmediate();
1136 #endif
1137 #else
1138     /*Not implemented yet. Communication server does not seem to be
1139       thread safe */
1140     /* CommunicationServerThread(0); */
1141 #endif
1142 }
1143
1144 /********************* MESSAGE SEND FUNCTIONS ******************/
1145
1146 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
1147
1148 static void CmiSendSelf(char *msg)
1149 {
1150 #if CMK_IMMEDIATE_MSG
1151     if (CmiIsImmediate(msg)) {
1152       /* CmiBecomeNonImmediate(msg); */
1153       CmiPushImmediateMsg(msg);
1154       CmiHandleImmediate();
1155       return;
1156     }
1157 #endif
1158     CQdCreate(CpvAccess(cQdState), 1);
1159     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
1160 }
1161
1162 void CmiSyncSendFn(int destPE, int size, char *msg)
1163 {
1164   CmiState cs = CmiGetState();
1165   char *dupmsg = (char *) CmiAlloc(size);
1166   memcpy(dupmsg, msg, size);
1167
1168   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
1169
1170   if (cs->pe==destPE) {
1171     CmiSendSelf(dupmsg);
1172   }
1173   else
1174     CmiAsyncSendFn_(destPE, size, dupmsg);
1175 }
1176
1177 #if CMK_SMP
1178
1179 /* called by communication thread in SMP */
1180 static int SendMsgBuf()
1181 {
1182   SMSG_LIST *msg_tmp;
1183   char *msg;
1184   int node, rank, size;
1185   int i;
1186   int sent = 0;
1187
1188   MACHSTATE(2,"SendMsgBuf begin {");
1189 #if 0
1190   for (i=0; i<_Cmi_mynodesize; i++)
1191   {
1192     while (!PCQueueEmpty(procState[i].sendMsgBuf))
1193     {
1194       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1195 #else
1196     /* single message sending queue */
1197     CmiLock(sendMsgBufLock);
1198     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1199     CmiUnlock(sendMsgBufLock);
1200     while (NULL != msg_tmp)
1201     {
1202 #endif
1203       node = msg_tmp->destpe;
1204       size = msg_tmp->size;
1205       msg = msg_tmp->msg;
1206       msg_tmp->next = 0;
1207       while (MsgQueueLen > request_max) {
1208         CmiReleaseSentMessages();
1209         PumpMsgs();
1210       }
1211       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1212       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1213       CMI_SET_CHECKSUM(msg, size);
1214
1215 #if MPI_POST_RECV_COUNT > 0
1216         if(size <= MPI_POST_RECV_SIZE){
1217
1218 #if CMI_MPI_TRACE_USEREVENTS
1219 #ifndef CMK_OPTIMIZE
1220 #if ! CMK_TRACE_IN_CHARM
1221 #if CMK_TIMER_USE_XT3_DCLOCK
1222     CpvAccess(projTraceStart) = dclock() - starttimer;
1223 #else
1224     CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1225 #endif
1226 #endif
1227 #endif
1228 #endif
1229             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1230                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1231
1232 #if CMI_MPI_TRACE_USEREVENTS
1233 #ifndef CMK_OPTIMIZE 
1234 #if ! CMK_TRACE_IN_CHARM
1235 #if CMK_TIMER_USE_XT3_DCLOCK
1236     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1237                           (dclock() - starttimer));
1238 #else
1239     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1240                           (MPI_Wtime() - starttimer));
1241 #endif
1242 #endif
1243 #endif
1244 #endif
1245             }
1246         else {
1247
1248 #if CMI_MPI_TRACE_USEREVENTS
1249 #ifndef CMK_OPTIMIZE
1250 #if ! CMK_TRACE_IN_CHARM
1251 #if CMK_TIMER_USE_XT3_DCLOCK
1252     CpvAccess(projTraceStart) = dclock() - starttimer;
1253 #else
1254     CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1255 #endif
1256 #endif
1257 #endif
1258 #endif
1259
1260             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1261                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1262
1263 #if CMI_MPI_TRACE_USEREVENTS
1264 #ifndef CMK_OPTIMIZE 
1265 #if ! CMK_TRACE_IN_CHARM
1266 #if CMK_TIMER_USE_XT3_DCLOCK
1267     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1268                           (dclock() - starttimer));
1269 #else
1270     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1271                           (MPI_Wtime() - starttimer));
1272 #endif
1273 #endif
1274 #endif
1275 #endif
1276
1277         }
1278 #else
1279
1280 #if CMI_MPI_TRACE_USEREVENTS
1281 #ifndef CMK_OPTIMIZE
1282 #if ! CMK_TRACE_IN_CHARM
1283 #if CMK_TIMER_USE_XT3_DCLOCK
1284         CpvAccess(projTraceStart) = dclock() - starttimer;
1285 #else
1286         CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1287 #endif
1288 #endif
1289 #endif
1290 #endif
1291
1292         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1293             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1294 #endif
1295
1296 #if CMI_MPI_TRACE_USEREVENTS
1297 #ifndef CMK_OPTIMIZE 
1298 #if ! CMK_TRACE_IN_CHARM
1299 #if CMK_TIMER_USE_XT3_DCLOCK
1300     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1301                           (dclock() - starttimer));
1302 #else
1303     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1304                           (MPI_Wtime() - starttimer));
1305 #endif
1306 #endif
1307 #endif
1308 #endif
1309
1310       MACHSTATE(3,"}MPI_send end");
1311       MsgQueueLen++;
1312       if(sent_msgs==0)
1313         sent_msgs = msg_tmp;
1314       else
1315         end_sent->next = msg_tmp;
1316       end_sent = msg_tmp;
1317       sent=1;
1318       CmiLock(sendMsgBufLock);
1319       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1320       CmiUnlock(sendMsgBufLock);
1321     }
1322 #if 0
1323   }
1324 #endif
1325   MACHSTATE(2,"}SendMsgBuf end ");
1326   return sent;
1327 }
1328
1329 void EnqueueMsg(void *m, int size, int node)
1330 {
1331   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1332   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1333   msg_tmp->msg = m;
1334   msg_tmp->size = size;
1335   msg_tmp->destpe = node;
1336   CmiLock(sendMsgBufLock);
1337   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1338   CmiUnlock(sendMsgBufLock);
1339   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1340 }
1341
1342 #endif
1343
1344 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1345 {
1346   CmiState cs = CmiGetState();
1347   SMSG_LIST *msg_tmp;
1348   CmiUInt2  rank, node;
1349
1350   if(destPE == cs->pe) {
1351     char *dupmsg = (char *) CmiAlloc(size);
1352     memcpy(dupmsg, msg, size);
1353     CmiSendSelf(dupmsg);
1354     return 0;
1355   }
1356   CQdCreate(CpvAccess(cQdState), 1);
1357 #if CMK_SMP
1358   node = CmiNodeOf(destPE);
1359   rank = CmiRankOf(destPE);
1360   if (node == CmiMyNode())  {
1361     CmiPushPE(rank, msg);
1362     return 0;
1363   }
1364   CMI_DEST_RANK(msg) = rank;
1365   EnqueueMsg(msg, size, node);
1366   return 0;
1367 #else
1368   /* non smp */
1369   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1370   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1371   msg_tmp->msg = msg;
1372   msg_tmp->next = 0;
1373   while (MsgQueueLen > request_max) {
1374         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1375         CmiReleaseSentMessages();
1376         PumpMsgs();
1377   }
1378   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1379   CMI_SET_CHECKSUM(msg, size);
1380
1381 #if MPI_POST_RECV_COUNT > 0
1382         if(size <= MPI_POST_RECV_SIZE){
1383
1384 #if CMI_MPI_TRACE_USEREVENTS
1385 #ifndef CMK_OPTIMIZE
1386 #if ! CMK_TRACE_IN_CHARM
1387 #if CMK_TIMER_USE_XT3_DCLOCK
1388     CpvAccess(projTraceStart) = dclock() - starttimer;
1389 #else
1390     CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1391 #endif
1392 #endif
1393 #endif
1394 #endif
1395
1396               if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1397                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1398
1399 #if CMI_MPI_TRACE_USEREVENTS
1400 #ifndef CMK_OPTIMIZE 
1401 #if ! CMK_TRACE_IN_CHARM
1402 #if CMK_TIMER_USE_XT3_DCLOCK
1403     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1404                           (dclock() - starttimer));
1405 #else
1406     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1407                           (MPI_Wtime() - starttimer));
1408 #endif
1409 #endif
1410 #endif
1411 #endif
1412             }
1413         else {
1414
1415 #if CMI_MPI_TRACE_USEREVENTS
1416 #ifndef CMK_OPTIMIZE
1417 #if ! CMK_TRACE_IN_CHARM
1418 #if CMK_TIMER_USE_XT3_DCLOCK
1419           CpvAccess(projTraceStart) = dclock() - starttimer;
1420 #else
1421           CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1422 #endif
1423 #endif
1424 #endif
1425 #endif
1426
1427              if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1428                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1429
1430 #if CMI_MPI_TRACE_USEREVENTS
1431 #ifndef CMK_OPTIMIZE 
1432 #if ! CMK_TRACE_IN_CHARM
1433 #if CMK_TIMER_USE_XT3_DCLOCK
1434     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1435                           (dclock() - starttimer));
1436 #else
1437     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1438                           (MPI_Wtime() - starttimer));
1439 #endif
1440 #endif
1441 #endif
1442 #endif
1443
1444         }
1445 #else
1446
1447 #if CMI_MPI_TRACE_USEREVENTS
1448 #ifndef CMK_OPTIMIZE
1449 #if ! CMK_TRACE_IN_CHARM
1450 #if CMK_TIMER_USE_XT3_DCLOCK
1451         CpvAccess(projTraceStart) = dclock() - starttimer;
1452 #else
1453         CpvAccess(projTraceStart) = MPI_Wtime() - starttimer;
1454 #endif
1455 #endif
1456 #endif
1457 #endif
1458
1459   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1460     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1461
1462 #if CMI_MPI_TRACE_USEREVENTS
1463 #ifndef CMK_OPTIMIZE 
1464 #if ! CMK_TRACE_IN_CHARM
1465 #if CMK_TIMER_USE_XT3_DCLOCK
1466     traceUserBracketEvent(40, CpvAccess(projTraceStart), 
1467                           (dclock() - starttimer));
1468 #else
1469     traceUserBracketEvent(40, CpvAccess(projTraceStart),
1470                           (MPI_Wtime() - starttimer));
1471 #endif
1472 #endif
1473 #endif
1474 #endif
1475
1476 #endif
1477
1478   MsgQueueLen++;
1479   if(sent_msgs==0)
1480     sent_msgs = msg_tmp;
1481   else
1482     end_sent->next = msg_tmp;
1483   end_sent = msg_tmp;
1484   return (CmiCommHandle) &(msg_tmp->req);
1485 #endif
1486 }
1487
1488 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1489 {
1490   CMI_SET_BROADCAST_ROOT(msg, 0);
1491   CmiAsyncSendFn_(destPE, size, msg);
1492 }
1493
1494 void CmiFreeSendFn(int destPE, int size, char *msg)
1495 {
1496   CmiState cs = CmiGetState();
1497   CMI_SET_BROADCAST_ROOT(msg, 0);
1498
1499   if (cs->pe==destPE) {
1500     CmiSendSelf(msg);
1501   } else {
1502     CmiAsyncSendFn_(destPE, size, msg);
1503   }
1504 }
1505
1506 /*********************** BROADCAST FUNCTIONS **********************/
1507
1508 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1509 void CmiSyncSendFn1(int destPE, int size, char *msg)
1510 {
1511   CmiState cs = CmiGetState();
1512   char *dupmsg = (char *) CmiAlloc(size);
1513   memcpy(dupmsg, msg, size);
1514   if (cs->pe==destPE)
1515     CmiSendSelf(dupmsg);
1516   else
1517     CmiAsyncSendFn_(destPE, size, dupmsg);
1518 }
1519
1520 /* send msg to its spanning children in broadcast. G. Zheng */
1521 void SendSpanningChildren(int size, char *msg)
1522 {
1523   CmiState cs = CmiGetState();
1524   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1525   int i;
1526
1527   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1528
1529   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1530     int p = cs->pe-startpe;
1531     if (p<0) p+=_Cmi_numpes;
1532     p = BROADCAST_SPANNING_FACTOR*p + i;
1533     if (p > _Cmi_numpes - 1) break;
1534     p += startpe;
1535     p = p%_Cmi_numpes;
1536     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1537     CmiSyncSendFn1(p, size, msg);
1538   }
1539 }
1540
1541 #include <math.h>
1542
1543 /* send msg along the hypercube in broadcast. (Sameer) */
1544 void SendHypercube(int size, char *msg)
1545 {
1546   CmiState cs = CmiGetState();
1547   int curcycle = CMI_GET_CYCLE(msg);
1548   int i;
1549
1550   double logp = CmiNumPes();
1551   logp = log(logp)/log(2.0);
1552   logp = ceil(logp);
1553
1554   /*  CmiPrintf("In hypercube\n"); */
1555
1556   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1557
1558   for (i = curcycle; i < logp; i++) {
1559     int p = cs->pe ^ (1 << i);
1560
1561     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1562
1563     if(p < CmiNumPes()) {
1564       CMI_SET_CYCLE(msg, i + 1);
1565       CmiSyncSendFn1(p, size, msg);
1566     }
1567   }
1568 }
1569
1570 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1571 {
1572   CmiState cs = CmiGetState();
1573 #if CMK_BROADCAST_SPANNING_TREE
1574   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1575   SendSpanningChildren(size, msg);
1576
1577 #elif CMK_BROADCAST_HYPERCUBE
1578   CMI_SET_CYCLE(msg, 0);
1579   SendHypercube(size, msg);
1580
1581 #else
1582   int i;
1583
1584   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1585     CmiSyncSendFn(i, size,msg) ;
1586   for ( i=0; i<cs->pe; i++ )
1587     CmiSyncSendFn(i, size,msg) ;
1588 #endif
1589
1590   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1591 }
1592
1593
1594 /*  FIXME: luckily async is never used  G. Zheng */
1595 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1596 {
1597   CmiState cs = CmiGetState();
1598   int i ;
1599
1600   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1601     CmiAsyncSendFn(i,size,msg) ;
1602   for ( i=0; i<cs->pe; i++ )
1603     CmiAsyncSendFn(i,size,msg) ;
1604
1605   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1606 CmiAbort("CmiAsyncBroadcastFn should never be called");
1607   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1608 }
1609
1610 void CmiFreeBroadcastFn(int size, char *msg)
1611 {
1612    CmiSyncBroadcastFn(size,msg);
1613    CmiFree(msg);
1614 }
1615
1616 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1617 {
1618
1619 #if CMK_BROADCAST_SPANNING_TREE
1620   CmiState cs = CmiGetState();
1621   CmiSyncSendFn(cs->pe, size,msg) ;
1622   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1623   SendSpanningChildren(size, msg);
1624
1625 #elif CMK_BROADCAST_HYPERCUBE
1626   CmiState cs = CmiGetState();
1627   CmiSyncSendFn(cs->pe, size,msg) ;
1628   CMI_SET_CYCLE(msg, 0);
1629   SendHypercube(size, msg);
1630
1631 #else
1632     int i ;
1633
1634   for ( i=0; i<_Cmi_numpes; i++ )
1635     CmiSyncSendFn(i,size,msg) ;
1636 #endif
1637
1638   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1639 }
1640
1641 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1642 {
1643   int i ;
1644
1645   for ( i=1; i<_Cmi_numpes; i++ )
1646     CmiAsyncSendFn(i,size,msg) ;
1647
1648   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1649
1650   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1651 }
1652
1653 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1654 {
1655
1656 #if CMK_BROADCAST_SPANNING_TREE
1657   CmiState cs = CmiGetState();
1658   CmiSyncSendFn(cs->pe, size,msg) ;
1659   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1660   SendSpanningChildren(size, msg);
1661
1662 #elif CMK_BROADCAST_HYPERCUBE
1663   CmiState cs = CmiGetState();
1664   CmiSyncSendFn(cs->pe, size,msg) ;
1665   CMI_SET_CYCLE(msg, 0);
1666   SendHypercube(size, msg);
1667
1668 #else
1669   int i ;
1670
1671   for ( i=0; i<_Cmi_numpes; i++ )
1672     CmiSyncSendFn(i,size,msg) ;
1673 #endif
1674   CmiFree(msg) ;
1675   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1676 }
1677
1678 #if CMK_NODE_QUEUE_AVAILABLE
1679
1680 static void CmiSendNodeSelf(char *msg)
1681 {
1682 #if CMK_IMMEDIATE_MSG
1683 #if 0
1684     if (CmiIsImmediate(msg) && !_immRunning) {
1685       /*CmiHandleImmediateMessage(msg); */
1686       CmiPushImmediateMsg(msg);
1687       CmiHandleImmediate();
1688       return;
1689     }
1690 #endif
1691     if (CmiIsImmediate(msg))
1692     {
1693       CmiPushImmediateMsg(msg);
1694       if (!_immRunning) CmiHandleImmediate();
1695       return;
1696     }
1697 #endif
1698     CQdCreate(CpvAccess(cQdState), 1);
1699     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1700     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1701     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1702 }
1703
1704 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1705 {
1706   int i;
1707   SMSG_LIST *msg_tmp;
1708   char *dupmsg;
1709
1710   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1711   switch (dstNode) {
1712   case NODE_BROADCAST_ALL:
1713     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1714   case NODE_BROADCAST_OTHERS:
1715     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1716     for (i=0; i<_Cmi_numnodes; i++)
1717       if (i!=_Cmi_mynode) {
1718         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1719       }
1720     break;
1721   default:
1722     dupmsg = (char *)CmiCopyMsg(msg,size);
1723     if(dstNode == _Cmi_mynode) {
1724       CmiSendNodeSelf(dupmsg);
1725     }
1726     else {
1727       CQdCreate(CpvAccess(cQdState), 1);
1728       EnqueueMsg(dupmsg, size, dstNode);
1729     }
1730   }
1731   return 0;
1732 }
1733
1734 void CmiSyncNodeSendFn(int p, int s, char *m)
1735 {
1736   CmiAsyncNodeSendFn(p, s, m);
1737 }
1738
1739 /* need */
1740 void CmiFreeNodeSendFn(int p, int s, char *m)
1741 {
1742   CmiAsyncNodeSendFn(p, s, m);
1743   CmiFree(m);
1744 }
1745
1746 /* need */
1747 void CmiSyncNodeBroadcastFn(int s, char *m)
1748 {
1749   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1750 }
1751
1752 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1753 {
1754 }
1755
1756 /* need */
1757 void CmiFreeNodeBroadcastFn(int s, char *m)
1758 {
1759   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1760   CmiFree(m);
1761 }
1762
1763 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1764 {
1765   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1766 }
1767
1768 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1769 {
1770   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1771 }
1772
1773 /* need */
1774 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1775 {
1776   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1777   CmiFree(m);
1778 }
1779 #endif
1780
1781 /************************** MAIN ***********************************/
1782 #define MPI_REQUEST_MAX 16      //1024*10
1783
1784 void ConverseExit(void)
1785 {
1786 #if ! CMK_SMP
1787   while(!CmiAllAsyncMsgsSent()) {
1788     PumpMsgs();
1789     CmiReleaseSentMessages();
1790   }
1791   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1792     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1793
1794   ConverseCommonExit();
1795 #if CMK_MPI_VMI
1796     /* vmi mpi always return exit status 255 */
1797   exit(0);
1798 #else
1799   MPI_Finalize();
1800 #endif
1801 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1802   if (CmiMyPe() == 0){
1803     CmiPrintf("End of program\n");
1804 #if MPI_POST_RECV_COUNT > 0
1805     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1806 #endif
1807 }
1808 #endif
1809   exit(0);
1810 #else
1811
1812   /* SMP version, communication thread will exit */
1813   ConverseCommonExit();
1814   /* atomic increment */
1815   CmiCommLock();
1816   inexit++;
1817   CmiCommUnlock();
1818   while (1) CmiYield();
1819 #endif
1820 }
1821
1822 static void registerMPITraceEvents() {
1823 #if CMI_MPI_TRACE_USEREVENTS
1824 #ifndef CMK_OPTIMIZE
1825 #if ! CMK_TRACE_IN_CHARM
1826     traceRegisterUserEvent("MPI_Barrier", 10);
1827     traceRegisterUserEvent("MPI_Send", 20);
1828     traceRegisterUserEvent("MPI_Recv", 30);
1829     traceRegisterUserEvent("MPI_Isend", 40);
1830     traceRegisterUserEvent("MPI_Irecv", 50);
1831 #endif
1832 #endif
1833 #endif
1834 }
1835
1836
1837 static char     **Cmi_argv;
1838 static char     **Cmi_argvcopy;
1839 static CmiStartFn Cmi_startfn;   /* The start function */
1840 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1841
1842 typedef struct {
1843   int sleepMs; /*Milliseconds to sleep while idle*/
1844   int nIdles; /*Number of times we've been idle in a row*/
1845   CmiState cs; /*Machine state*/
1846 } CmiIdleState;
1847
1848 static CmiIdleState *CmiNotifyGetState(void)
1849 {
1850   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1851   s->sleepMs=0;
1852   s->nIdles=0;
1853   s->cs=CmiGetState();
1854   return s;
1855 }
1856
1857 static void CmiNotifyBeginIdle(CmiIdleState *s)
1858 {
1859   s->sleepMs=0;
1860   s->nIdles=0;
1861 }
1862
1863 static void CmiNotifyStillIdle(CmiIdleState *s)
1864 {
1865 #if ! CMK_SMP
1866   CmiReleaseSentMessages();
1867   PumpMsgs();
1868 #else
1869 /*  CmiYield();  */
1870 #endif
1871
1872 #if 1
1873   {
1874   int nSpins=20; /*Number of times to spin before sleeping*/
1875   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1876   s->nIdles++;
1877   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1878     s->sleepMs+=2;
1879     if (s->sleepMs>10) s->sleepMs=10;
1880   }
1881   /*Comm. thread will listen on sockets-- just sleep*/
1882   if (s->sleepMs>0) {
1883     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1884     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1885     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1886   }
1887   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1888   }
1889 #endif
1890 }
1891
1892 #if MACHINE_DEBUG_LOG
1893 FILE *debugLog = NULL;
1894 #endif
1895
1896 static void ConverseRunPE(int everReturn)
1897 {
1898   CmiIdleState *s=CmiNotifyGetState();
1899   CmiState cs;
1900   char** CmiMyArgv;
1901   CmiNodeAllBarrier();
1902   cs = CmiGetState();
1903   CpvInitialize(void *,CmiLocalQueue);
1904   CpvAccess(CmiLocalQueue) = cs->localqueue;
1905
1906   if (CmiMyRank())
1907     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1908   else
1909     CmiMyArgv=Cmi_argv;
1910
1911   CthInit(CmiMyArgv);
1912
1913   ConverseCommonInit(CmiMyArgv);
1914
1915   /* initialize the network progress counter*/
1916   /* Network progress function is used to poll the network when for
1917      messages. This flushes receive buffers on some  implementations*/
1918   CpvInitialize(int , networkProgressCount);
1919   CpvAccess(networkProgressCount) = 0;
1920
1921 #if CMK_SMP
1922   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1923   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1924 #else
1925   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1926 #endif
1927
1928 #if MACHINE_DEBUG_LOG
1929   if (CmiMyRank() == 0) {
1930     char ln[200];
1931     sprintf(ln,"debugLog.%d",CmiMyNode());
1932     debugLog=fopen(ln,"w");
1933   }
1934 #endif
1935
1936   /* Converse initialization finishes, immediate messages can be processed.
1937      node barrier previously should take care of the node synchronization */
1938   _immediateReady = 1;
1939
1940   /* communication thread */
1941   if (CmiMyRank() == CmiMyNodeSize()) {
1942     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1943     while (1) CommunicationServerThread(5);
1944   }
1945   else {  /* worker thread */
1946   if (!everReturn) {
1947     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1948     if (Cmi_usrsched==0) CsdScheduler(-1);
1949     ConverseExit();
1950   }
1951   }
1952 }
1953
1954 #if CMK_VERSION_BLUEGENE
1955 struct BGLTorusManager;
1956 CpvDeclare(struct BGLTorusManager *, tmanager);
1957 #endif
1958
1959 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1960 {
1961   int n,i;
1962
1963 #if MACHINE_DEBUG
1964   debugLog=NULL;
1965 #endif
1966 #if CMK_USE_HP_MAIN_FIX
1967 #if FOR_CPLUS
1968   _main(argc,argv);
1969 #endif
1970 #endif
1971
1972 #if CMI_MPI_TRACE_USEREVENTS
1973 #ifndef CMK_OPTIMIZE
1974 #if ! CMK_TRACE_IN_CHARM
1975   CpvInitialize(double, projTraceStart);
1976   /* only PE 0 needs to care about registration (to generate sts file). */
1977   if (CmiMyPe() == 0) {
1978     registerMachineUserEventsFunction(&registerMPITraceEvents);
1979   }
1980 #endif
1981 #endif
1982 #endif
1983
1984   MPI_Init(&argc, &argv);
1985   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1986   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1987   /* processor per node */
1988   _Cmi_mynodesize = 1;
1989   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
1990     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1991 #if ! CMK_SMP
1992   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
1993     CmiAbort("+ppn cannot be used in non SMP version!\n");
1994 #endif
1995   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1996   if (idleblock && _Cmi_mynode == 0) {
1997     printf("Charm++: Running in idle blocking mode.\n");
1998   }
1999
2000 #if CMK_NO_OUTSTANDING_SENDS
2001   no_outstanding_sends=1;
2002 #endif
2003   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
2004     no_outstanding_sends = 1;
2005     if (_Cmi_mynode == 0)
2006       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
2007         no_outstanding_sends?"":" not");
2008   }
2009   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
2010   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
2011   Cmi_argvcopy = CmiCopyArgs(argv);
2012   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
2013   /* find dim = log2(numpes), to pretend we are a hypercube */
2014   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
2015     Cmi_dim++ ;
2016  /* CmiSpanTreeInit();*/
2017   request_max=MAX_QLEN;
2018   CmiGetArgInt(argv,"+requestmax",&request_max);
2019   /*printf("request max=%d\n", request_max);*/
2020
2021   /* checksum flag */
2022   if (CmiGetArgFlag(argv,"+checksum")) {
2023 #if !CMK_OPTIMIZE
2024     checksum_flag = 1;
2025     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
2026 #else
2027     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
2028 #endif
2029   }
2030
2031   if (CmiGetArgFlag(argv,"++debug"))
2032   {   /*Pause so user has a chance to start and attach debugger*/
2033     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
2034     if (!CmiGetArgFlag(argv,"++debug-no-pause"))
2035       sleep(10);
2036   }
2037
2038
2039 #if MPI_POST_RECV_COUNT > 0
2040
2041     /* Post some extra recvs to help out with incoming messages */
2042     /* On some MPIs the messages are unexpected and thus slow */
2043
2044     /* An array of request handles for posted recvs */
2045     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
2046
2047     /* An array of buffers for posted recvs */
2048     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
2049
2050     /* Post Recvs */
2051     for(i=0; i<MPI_POST_RECV_COUNT; i++){
2052         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
2053                     MPI_POST_RECV_SIZE,
2054                     MPI_BYTE,
2055                     MPI_ANY_SOURCE,
2056                     POST_RECV_TAG,
2057                     MPI_COMM_WORLD,
2058                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
2059           CmiAbort("MPI_Irecv failed\n");
2060     }
2061
2062 #endif
2063
2064
2065
2066   /* CmiTimerInit(); */
2067
2068 #if 0
2069   CthInit(argv);
2070   ConverseCommonInit(argv);
2071
2072   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
2073   if (initret==0) {
2074     fn(CmiGetArgc(argv), argv);
2075     if (usched==0) CsdScheduler(-1);
2076     ConverseExit();
2077   }
2078 #endif
2079
2080   CsvInitialize(CmiNodeState, NodeState);
2081   CmiNodeStateInit(&CsvAccess(NodeState));
2082
2083   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
2084
2085   for (i=0; i<_Cmi_mynodesize+1; i++) {
2086     /*    procState[i].sendMsgBuf = PCQueueCreate();   */
2087     procState[i].recvLock = CmiCreateLock();
2088   }
2089 #if CMK_SMP
2090   sendMsgBuf = PCQueueCreate();
2091   sendMsgBufLock = CmiCreateLock();
2092 #endif
2093
2094   /* Network progress function is used to poll the network when for
2095      messages. This flushes receive buffers on some  implementations*/
2096   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
2097   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
2098
2099 #if CMK_VERSION_BLUEGENE
2100   CpvInitialize(struct BGLTorusManager*, tmanager);
2101   CpvAccess(tmanager) = NULL;
2102 #endif
2103
2104   CmiStartThreads(argv);
2105   ConverseRunPE(initret);
2106 }
2107
2108 /***********************************************************************
2109  *
2110  * Abort function:
2111  *
2112  ************************************************************************/
2113
2114 void CmiAbort(const char *message)
2115 {
2116   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
2117         "Reason: %s\n",CmiMyPe(),message);
2118  /*  CmiError(message); */
2119   CmiPrintStackTrace(0);
2120   MPI_Abort(MPI_COMM_WORLD, 1);
2121 }
2122
2123
2124 #if 0
2125
2126 /* ****************************************************************** */
2127 /*    The following internal functions implement recd msg queue       */
2128 /* ****************************************************************** */
2129
2130 static void ** AllocBlock(unsigned int len)
2131 {
2132   void ** blk;
2133
2134   blk=(void **)CmiAlloc(len*sizeof(void *));
2135   if(blk==(void **)0) {
2136     CmiError("Cannot Allocate Memory!\n");
2137     MPI_Abort(MPI_COMM_WORLD, 1);
2138   }
2139   return blk;
2140 }
2141
2142 static void
2143 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
2144 {
2145   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
2146   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
2147 }
2148
2149 void recdQueueInit(void)
2150 {
2151   recdQueue_blk = AllocBlock(BLK_LEN);
2152   recdQueue_blk_len = BLK_LEN;
2153   recdQueue_first = 0;
2154   recdQueue_len = 0;
2155 }
2156
2157 void recdQueueAddToBack(void *element)
2158 {
2159 #if NODE_0_IS_CONVHOST
2160   inside_comm = 1;
2161 #endif
2162   if(recdQueue_len==recdQueue_blk_len) {
2163     void **blk;
2164     recdQueue_blk_len *= 3;
2165     blk = AllocBlock(recdQueue_blk_len);
2166     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
2167     CmiFree(recdQueue_blk);
2168     recdQueue_blk = blk;
2169     recdQueue_first = 0;
2170   }
2171   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
2172 #if NODE_0_IS_CONVHOST
2173   inside_comm = 0;
2174 #endif
2175 }
2176
2177
2178 void * recdQueueRemoveFromFront(void)
2179 {
2180   if(recdQueue_len) {
2181     void *element;
2182     element = recdQueue_blk[recdQueue_first++];
2183     recdQueue_first %= recdQueue_blk_len;
2184     recdQueue_len--;
2185     return element;
2186   }
2187   return 0;
2188 }
2189
2190 #endif
2191
2192 /*@}*/