compressed more than 300 lines of Cheewai's event trace code, which otherwise made...
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 /** @file
9  * MPI based machine layer
10  * @ingroup Machine
11  */
12 /*@{*/
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include "converse.h"
17 #include <mpi.h>
18 #if CMK_TIMER_USE_XT3_DCLOCK
19 #include <catamount/dclock.h>
20 #endif
21
22 #ifdef AMPI
23 #  warning "We got the AMPI version of mpi.h, instead of the system version--"
24 #  warning "   Try doing an 'rm charm/include/mpi.h' and building again."
25 #  error "Can't build Charm++ using AMPI version of mpi.h header"
26 #endif
27
28 /*Support for ++debug: */
29 #include <unistd.h> /*For getpid()*/
30 #include <stdlib.h> /*For sleep()*/
31
32 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
33 #define CMK_SMP 1
34 #endif
35
36 #include "machine.h"
37
38 #include "pcqueue.h"
39
40 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
41
42 #if CMK_VERSION_BLUEGENE
43 #define MAX_QLEN 8
44 #define NETWORK_PROGRESS_PERIOD_DEFAULT 16
45 #else
46 #define NETWORK_PROGRESS_PERIOD_DEFAULT 0
47 #define MAX_QLEN 200
48 #endif
49
50 #if CMI_MPI_TRACE_USEREVENTS && !defined(CMK_OPTIMIZE) && ! CMK_TRACE_IN_CHARM
51 CpvStaticDeclare(double, projTraceStart);
52 # define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
53 # define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
54 #else
55 # define  START_EVENT()
56 # define  END_EVENT(x)
57 #endif
58
59 /*
60     To reduce the buffer used in broadcast and distribute the load from
61   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
62   spanning tree broadcast algorithm.
63     This will use the fourth short in message as an indicator of spanning tree
64   root.
65 */
66 #if CMK_SMP
67 #define CMK_BROADCAST_SPANNING_TREE    0
68 #else
69 #define CMK_BROADCAST_SPANNING_TREE    1
70 #define CMK_BROADCAST_HYPERCUBE        0
71 #endif
72
73 #define BROADCAST_SPANNING_FACTOR      4
74
75 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
76 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
77
78 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
79 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
80
81 /* FIXME: need a random number that everyone agrees ! */
82 #define CHARM_MAGIC_NUMBER               126
83
84 #if !CMK_OPTIMIZE
85 static int checksum_flag = 0;
86 #define CMI_SET_CHECKSUM(msg, len)      \
87         if (checksum_flag)  {   \
88           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
89           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
90         }
91 #define CMI_CHECK_CHECKSUM(msg, len)    \
92         if (checksum_flag)      \
93           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
94             CmiAbort("Fatal error: checksum doesn't agree!\n");
95 #else
96 #define CMI_SET_CHECKSUM(msg, len)
97 #define CMI_CHECK_CHECKSUM(msg, len)
98 #endif
99
100 #if CMK_BROADCAST_SPANNING_TREE
101 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
102 #else
103 #  define CMI_SET_BROADCAST_ROOT(msg, root)
104 #endif
105
106 #if CMK_BROADCAST_HYPERCUBE
107 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
108 #else
109 #  define CMI_SET_CYCLE(msg, cycle)
110 #endif
111
112
113 /** 
114     If MPI_POST_RECV is defined, we provide default values for size 
115     and number of posted recieves. If MPI_POST_RECV_COUNT is set
116     then a default value for MPI_POST_RECV_SIZE is used if not specified
117     by the user.
118 */
119 #ifdef MPI_POST_RECV
120 #define MPI_POST_RECV_COUNT 10
121 #undef MPI_POST_RECV
122 #endif
123 #if MPI_POST_RECV_COUNT > 0
124 #warning "Using MPI posted receives which have not yet been tested"
125 #ifndef MPI_POST_RECV_SIZE
126 #define MPI_POST_RECV_SIZE 200
127 #endif
128 // #undef  MPI_POST_RECV_DEBUG 
129 CpvDeclare(unsigned long long, Cmi_posted_recv_total);
130 CpvDeclare(unsigned long long, Cmi_unposted_recv_total);
131 CpvDeclare(MPI_Request*, CmiPostedRecvRequests); /* An array of request handles for posted recvs */
132 CpvDeclare(char*,CmiPostedRecvBuffers);
133 #endif
134
135
136 /*
137  to avoid MPI's in order delivery, changing MPI Tag all the time
138 */
139 #define TAG     1375
140
141 #if MPI_POST_RECV_COUNT > 0
142 #define POST_RECV_TAG TAG+1
143 #define BARRIER_ZERO_TAG TAG
144 #else
145 #define BARRIER_ZERO_TAG     1375
146 #endif
147
148 /*
149 static int mpi_tag = TAG;
150 #define NEW_MPI_TAG     mpi_tag++; if (mpi_tag == MPI_TAG_UB) mpi_tag=TAG;
151 */
152
153 int               _Cmi_numpes;
154 int               _Cmi_mynode;    /* Which address space am I */
155 int               _Cmi_mynodesize;/* Number of processors in my address space */
156 int               _Cmi_numnodes;  /* Total number of address spaces */
157 int               _Cmi_numpes;    /* Total number of processors */
158 static int        Cmi_nodestart; /* First processor in this address space */
159 CpvDeclare(void*, CmiLocalQueue);
160
161 /*Network progress utility variables. Period controls the rate at
162   which the network poll is called */
163 CpvDeclare(unsigned , networkProgressCount);
164 int networkProgressPeriod;
165
166 int               idleblock = 0;
167
168 #define BLK_LEN  512
169
170 #if CMK_NODE_QUEUE_AVAILABLE
171 #define DGRAM_NODEMESSAGE   (0xFB)
172
173 #define NODE_BROADCAST_OTHERS (-1)
174 #define NODE_BROADCAST_ALL    (-2)
175 #endif
176
177 #if 0
178 static void **recdQueue_blk;
179 static unsigned int recdQueue_blk_len;
180 static unsigned int recdQueue_first;
181 static unsigned int recdQueue_len;
182 static void recdQueueInit(void);
183 static void recdQueueAddToBack(void *element);
184 static void *recdQueueRemoveFromFront(void);
185 #endif
186
187 static void ConverseRunPE(int everReturn);
188 static void CommunicationServer(int sleepTime);
189 static void CommunicationServerThread(int sleepTime);
190
191 typedef struct msg_list {
192      char *msg;
193      struct msg_list *next;
194      int size, destpe;
195      MPI_Request req;
196 } SMSG_LIST;
197
198 int MsgQueueLen=0;
199 static int request_max;
200
201 static SMSG_LIST *sent_msgs=0;
202 static SMSG_LIST *end_sent=0;
203
204 static int Cmi_dim;
205
206 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
207
208 #if NODE_0_IS_CONVHOST
209 int inside_comm = 0;
210 #endif
211
212 void CmiAbort(const char *message);
213 static void PerrorExit(const char *msg);
214
215 void SendSpanningChildren(int size, char *msg);
216 void SendHypercube(int size, char *msg);
217
218 static void PerrorExit(const char *msg)
219 {
220   perror(msg);
221   exit(1);
222 }
223
224 extern unsigned char computeCheckSum(unsigned char *data, int len);
225
226 /**************************  TIMER FUNCTIONS **************************/
227
228 #if CMK_TIMER_USE_SPECIAL || CMK_TIMER_USE_XT3_DCLOCK
229
230 /* MPI calls are not threadsafe, even the timer on some machines */
231 static CmiNodeLock  timerLock = 0;
232 static double starttimer = 0;
233 static int _is_global = 0;
234
235 int CmiTimerIsSynchronized()
236 {
237   int  flag;
238   void *v;
239
240   /*  check if it using synchronized timer */
241   if (MPI_SUCCESS != MPI_Attr_get(MPI_COMM_WORLD, MPI_WTIME_IS_GLOBAL, &v, &flag))
242     printf("MPI_WTIME_IS_GLOBAL not valid!\n");
243   if (flag) {
244     _is_global = *(int*)v;
245     if (_is_global && CmiMyPe() == 0)
246       printf("Charm++> MPI timer is synchronized!\n");
247   }
248   return _is_global;
249 }
250
251 void CmiTimerInit()
252 {
253   _is_global = CmiTimerIsSynchronized();
254
255   if (CmiMyRank() == 0) {
256     if (_is_global) {
257       double minTimer;
258 #if CMK_TIMER_USE_XT3_DCLOCK
259       starttimer = dclock();
260 #else
261       starttimer = MPI_Wtime();
262 #endif
263
264       MPI_Allreduce(&starttimer, &minTimer, 1, MPI_DOUBLE, MPI_MIN,
265                                   MPI_COMM_WORLD );
266       starttimer = minTimer;
267     }
268     else {
269       /* we don't have a synchronous timer, set our own start time */
270       CmiBarrier();
271       CmiBarrier();
272       CmiBarrier();
273 #if CMK_TIMER_USE_XT3_DCLOCK
274       starttimer = dclock();
275 #else
276       starttimer = MPI_Wtime();
277 #endif
278     }
279   }
280   CmiNodeAllBarrier();          /* for smp */
281 /*  timerLock = CmiCreateLock(); */
282 }
283
284 double CmiTimer(void)
285 {
286   double t;
287 #if CMK_SMP
288   if (timerLock) CmiLock(timerLock);
289 #endif
290 #if CMK_TIMER_USE_XT3_DCLOCK
291   t = dclock() - starttimer;
292 #else
293   t = MPI_Wtime() - starttimer;
294 #endif
295 #if CMK_SMP
296   if (timerLock) CmiUnlock(timerLock);
297 #endif
298   return t;
299 }
300
301 double CmiWallTimer(void)
302 {
303   double t;
304 #if CMK_SMP
305   if (timerLock) CmiLock(timerLock);
306 #endif
307 #if CMK_TIMER_USE_XT3_DCLOCK
308   t = dclock() - starttimer;
309 #else
310   t = MPI_Wtime() - starttimer;
311 #endif
312 #if CMK_SMP
313   if (timerLock) CmiUnlock(timerLock);
314 #endif
315   return t;
316 }
317
318 double CmiCpuTimer(void)
319 {
320   double t;
321 #if CMK_SMP
322   if (timerLock) CmiLock(timerLock);
323 #endif
324 #if CMK_TIMER_USE_XT3_DCLOCK
325   t = dclock() - starttimer;
326 #else
327   t = MPI_Wtime() - starttimer;
328 #endif
329 #if CMK_SMP
330   if (timerLock) CmiUnlock(timerLock);
331 #endif
332   return t;
333 }
334
335 #endif
336
337 void CmiBarrier()
338 {
339   if (CmiMyRank() == 0) {
340
341     START_EVENT();
342
343     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
344         CmiAbort("Timernit: MPI_Barrier failed!\n");
345
346     END_EVENT(10);
347   }
348 }
349
350 /* CmiBarrierZero make sure node 0 is the last one exiting the barrier */
351 void CmiBarrierZero()
352 {
353   int i;
354   if (CmiMyRank() == 0) {
355     char msg[1];
356     MPI_Status sts;
357     if (CmiMyNode() == 0)  {
358       for (i=0; i<CmiNumNodes()-1; i++) {
359          START_EVENT();
360
361          if (MPI_SUCCESS != MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,BARRIER_ZERO_TAG, MPI_COMM_WORLD,&sts))
362             CmiPrintf("MPI_Recv failed!\n");
363
364          END_EVENT(30);
365       }
366     }
367     else {
368       START_EVENT();
369
370       if (MPI_SUCCESS != MPI_Send((void *)msg,1,MPI_BYTE,0,BARRIER_ZERO_TAG,MPI_COMM_WORLD))
371          printf("MPI_Send failed!\n");
372
373       END_EVENT(20);
374     }
375   }
376   CmiNodeAllBarrier();
377 }
378
379 typedef struct ProcState {
380 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
381 CmiNodeLock  recvLock;              /* for cs->recv */
382 } ProcState;
383
384 static ProcState  *procState;
385
386 #if CMK_SMP
387
388 static PCQueue sendMsgBuf;
389 static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
390
391 #endif
392
393 /************************************************************
394  *
395  * Processor state structure
396  *
397  ************************************************************/
398
399 /* fake Cmi_charmrun_fd */
400 static int Cmi_charmrun_fd = 0;
401 #include "machine-smp.c"
402
403 CsvDeclare(CmiNodeState, NodeState);
404
405 #include "immediate.c"
406
407 #if ! CMK_SMP
408 /************ non SMP **************/
409 static struct CmiStateStruct Cmi_state;
410 int _Cmi_mype;
411 int _Cmi_myrank;
412
413 void CmiMemLock() {}
414 void CmiMemUnlock() {}
415
416 #define CmiGetState() (&Cmi_state)
417 #define CmiGetStateN(n) (&Cmi_state)
418
419 void CmiYield(void) { sleep(0); }
420
421 static void CmiStartThreads(char **argv)
422 {
423   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
424   _Cmi_mype = Cmi_nodestart;
425   _Cmi_myrank = 0;
426 }
427 #endif  /* non smp */
428
429 /*Add a message to this processor's receive queue, pe is a rank */
430 static void CmiPushPE(int pe,void *msg)
431 {
432   CmiState cs = CmiGetStateN(pe);
433   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
434 #if CMK_IMMEDIATE_MSG
435   if (CmiIsImmediate(msg)) {
436 /*
437 CmiPrintf("[node %d] Immediate Message hdl: %d rank: %d {{. \n", CmiMyNode(), CmiGetHandler(msg), pe);
438     CmiHandleMessage(msg);
439 CmiPrintf("[node %d] Immediate Message done.}} \n", CmiMyNode());
440 */
441     /**(CmiUInt2 *)msg = pe;*/
442     CMI_DEST_RANK(msg) = pe;
443     CmiPushImmediateMsg(msg);
444     return;
445   }
446 #endif
447
448 #if CMK_SMP
449   CmiLock(procState[pe].recvLock);
450 #endif
451   PCQueuePush(cs->recv,msg);
452 #if CMK_SMP
453   CmiUnlock(procState[pe].recvLock);
454 #endif
455   CmiIdleLock_addMessage(&cs->idle);
456   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
457 }
458
459 #if CMK_NODE_QUEUE_AVAILABLE
460 /*Add a message to this processor's receive queue */
461 static void CmiPushNode(void *msg)
462 {
463   MACHSTATE(3,"Pushing message into NodeRecv queue");
464 #if CMK_IMMEDIATE_MSG
465   if (CmiIsImmediate(msg)) {
466     CMI_DEST_RANK(msg) = 0;
467     CmiPushImmediateMsg(msg);
468     return;
469   }
470 #endif
471   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
472   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
473   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
474   {
475   CmiState cs=CmiGetStateN(0);
476   CmiIdleLock_addMessage(&cs->idle);
477   }
478 }
479 #endif
480
481 #ifndef CmiMyPe
482 int CmiMyPe(void)
483 {
484   return CmiGetState()->pe;
485 }
486 #endif
487
488 #ifndef CmiMyRank
489 int CmiMyRank(void)
490 {
491   return CmiGetState()->rank;
492 }
493 #endif
494
495 #ifndef CmiNodeFirst
496 int CmiNodeFirst(int node) { return node*_Cmi_mynodesize; }
497 int CmiNodeSize(int node)  { return _Cmi_mynodesize; }
498 #endif
499
500 #ifndef CmiNodeOf
501 int CmiNodeOf(int pe)      { return (pe/_Cmi_mynodesize); }
502 int CmiRankOf(int pe)      { return pe%_Cmi_mynodesize; }
503 #endif
504
505 static size_t CmiAllAsyncMsgsSent(void)
506 {
507    SMSG_LIST *msg_tmp = sent_msgs;
508    MPI_Status sts;
509    int done;
510
511    while(msg_tmp!=0) {
512     done = 0;
513     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
514       CmiAbort("CmiAllAsyncMsgsSent: MPI_Test failed!\n");
515     if(!done)
516       return 0;
517     msg_tmp = msg_tmp->next;
518 /*    MsgQueueLen--; ????? */
519    }
520    return 1;
521 }
522
523 int CmiAsyncMsgSent(CmiCommHandle c) {
524
525   SMSG_LIST *msg_tmp = sent_msgs;
526   int done;
527   MPI_Status sts;
528
529   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
530     msg_tmp = msg_tmp->next;
531   if(msg_tmp) {
532     done = 0;
533     if (MPI_SUCCESS != MPI_Test(&(msg_tmp->req), &done, &sts))
534       CmiAbort("CmiAsyncMsgSent: MPI_Test failed!\n");
535     return ((done)?1:0);
536   } else {
537     return 1;
538   }
539 }
540
541 void CmiReleaseCommHandle(CmiCommHandle c)
542 {
543   return;
544 }
545
546 #if CMK_VERSION_BLUEGENE
547 extern void MPID_Progress_test();
548 #endif
549
550 void CmiReleaseSentMessages(void)
551 {
552   SMSG_LIST *msg_tmp=sent_msgs;
553   SMSG_LIST *prev=0;
554   SMSG_LIST *temp;
555   int done;
556   MPI_Status sts;
557
558
559 #if CMK_VERSION_BLUEGENE
560   MPID_Progress_test();
561 #endif
562
563   MACHSTATE1(2,"CmiReleaseSentMessages begin on %d {", CmiMyPe());
564   while(msg_tmp!=0) {
565     done =0;
566     if(MPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
567       CmiAbort("CmiReleaseSentMessages: MPI_Test failed!\n");
568     if(done) {
569       MACHSTATE2(3,"CmiReleaseSentMessages release one %d to %d", CmiMyPe(), msg_tmp->destpe);
570       MsgQueueLen--;
571       /* Release the message */
572       temp = msg_tmp->next;
573       if(prev==0)  /* first message */
574         sent_msgs = temp;
575       else
576         prev->next = temp;
577       CmiFree(msg_tmp->msg);
578       CmiFree(msg_tmp);
579       msg_tmp = temp;
580     } else {
581       prev = msg_tmp;
582       msg_tmp = msg_tmp->next;
583     }
584   }
585   end_sent = prev;
586   MACHSTATE(2,"} CmiReleaseSentMessages end");
587 }
588
589 int PumpMsgs(void)
590 {
591   int nbytes, flg, res;
592   char *msg;
593   MPI_Status sts;
594   int recd=0;
595
596 #if CMK_VERSION_BLUEGENE
597   MPID_Progress_test();
598 #endif
599
600   MACHSTATE(2,"PumpMsgs begin {");
601
602   while(1) {
603     /* First check posted recvs then do  probe unmatched outstanding messages */
604 #if MPI_POST_RECV_COUNT > 0 
605     int completed_index=-1;
606     if(MPI_SUCCESS != MPI_Testany(MPI_POST_RECV_COUNT, CpvAccess(CmiPostedRecvRequests), &completed_index, &flg, &sts))
607         CmiAbort("PumpMsgs: MPI_Testany failed!\n");
608     if(flg){
609         if (MPI_SUCCESS != MPI_Get_count(&sts, MPI_BYTE, &nbytes))
610             CmiAbort("PumpMsgs: MPI_Get_count failed!\n");
611
612
613         //        CmiPrintf("Received a posted message of %d bytes\n",nbytes);
614
615         recd = 1;
616         msg = (char *) CmiAlloc(nbytes);
617         memcpy(msg,&(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE]),nbytes);
618         /* and repost the recv */
619
620         START_EVENT();
621
622         if (MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[completed_index*MPI_POST_RECV_SIZE])   ,
623             MPI_POST_RECV_SIZE,
624             MPI_BYTE,
625             MPI_ANY_SOURCE,
626             POST_RECV_TAG,
627             MPI_COMM_WORLD,
628             &(CpvAccess(CmiPostedRecvRequests)[completed_index])  ))
629                 CmiAbort("PumpMsgs: MPI_Irecv failed!\n");
630
631         END_EVENT(50);
632
633         CpvAccess(Cmi_posted_recv_total)++;
634     }
635     else {
636         res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
637         if(res != MPI_SUCCESS)
638         CmiAbort("MPI_Iprobe failed\n");
639         if(!flg) break;
640         recd = 1;
641         MPI_Get_count(&sts, MPI_BYTE, &nbytes);
642         msg = (char *) CmiAlloc(nbytes);
643
644         START_EVENT();
645
646         if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
647             CmiAbort("PumpMsgs: MPI_Recv failed!\n");
648
649         END_EVENT(30);
650
651         CpvAccess(Cmi_unposted_recv_total)++;
652     }
653 #else
654     /* Original version */
655     res = MPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
656     if(res != MPI_SUCCESS)
657       CmiAbort("MPI_Iprobe failed\n");
658
659     if(!flg) break;
660     recd = 1;
661     MPI_Get_count(&sts, MPI_BYTE, &nbytes);
662     msg = (char *) CmiAlloc(nbytes);
663
664     START_EVENT();
665
666     if (MPI_SUCCESS != MPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,sts.MPI_TAG, MPI_COMM_WORLD,&sts))
667       CmiAbort("PumpMsgs: MPI_Recv failed!\n");
668
669     END_EVENT(30);
670 #endif
671
672     MACHSTATE2(3,"PumpMsgs recv one from node:%d to rank:%d", sts.MPI_SOURCE, CMI_DEST_RANK(msg));
673     CMI_CHECK_CHECKSUM(msg, nbytes);
674     if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
675       CmiPrintf("Charm++ Abort: Non Charm++ Message Received of size %d. \n", nbytes);
676       CmiFree(msg);
677       CmiAbort("Abort!\n");
678       continue;
679     }
680 #if CMK_NODE_QUEUE_AVAILABLE
681     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
682       CmiPushNode(msg);
683     else
684 #endif
685       CmiPushPE(CMI_DEST_RANK(msg), msg);
686
687 #if CMK_BROADCAST_SPANNING_TREE
688     if (CMI_BROADCAST_ROOT(msg))
689       SendSpanningChildren(nbytes, msg);
690 #elif CMK_BROADCAST_HYPERCUBE
691     if (CMI_GET_CYCLE(msg))
692       SendHypercube(nbytes, msg);
693 #endif
694   }
695 #if CMK_IMMEDIATE_MSG && !CMK_SMP
696   CmiHandleImmediate();
697 #endif
698   MACHSTATE(2,"} PumpMsgs end ");
699   return recd;
700 }
701
702 /* blocking version */
703 static void PumpMsgsBlocking(void)
704 {
705   static int maxbytes = 20000000;
706   static char *buf = NULL;
707   int nbytes, flg;
708   MPI_Status sts;
709   char *msg;
710   int recd=0;
711
712   if (!PCQueueEmpty(CmiGetState()->recv)) return;
713   if (!CdsFifo_Empty(CpvAccess(CmiLocalQueue))) return;
714   if (!CqsEmpty(CpvAccess(CsdSchedQueue))) return;
715   if (sent_msgs)  return;
716
717 #if 0
718   CmiPrintf("[%d] PumpMsgsBlocking. \n", CmiMyPe());
719 #endif
720
721   if (buf == NULL) {
722     buf = (char *) CmiAlloc(maxbytes);
723     _MEMCHECK(buf);
724   }
725
726
727 #if MPI_POST_RECV_COUNT > 0
728 #warning "Using MPI posted receives and PumpMsgsBlocking() will break"
729 CmiAbort("Unsupported use of PumpMsgsBlocking. This call should be extended to check posted recvs, cancel them all, and then wait on any incoming message, and then re-post the recvs");
730 #endif
731
732   START_EVENT();
733
734   if (MPI_SUCCESS != MPI_Recv(buf,maxbytes,MPI_BYTE,MPI_ANY_SOURCE,TAG, MPI_COMM_WORLD,&sts))
735       CmiAbort("PumpMsgs: PMP_Recv failed!\n");
736
737   END_EVENT(30);
738
739    MPI_Get_count(&sts, MPI_BYTE, &nbytes);
740    msg = (char *) CmiAlloc(nbytes);
741    memcpy(msg, buf, nbytes);
742
743 #if CMK_NODE_QUEUE_AVAILABLE
744    if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
745       CmiPushNode(msg);
746    else
747 #endif
748       CmiPushPE(CMI_DEST_RANK(msg), msg);
749
750 #if CMK_BROADCAST_SPANNING_TREE
751    if (CMI_BROADCAST_ROOT(msg))
752       SendSpanningChildren(nbytes, msg);
753 #elif CMK_BROADCAST_HYPERCUBE
754    if (CMI_GET_CYCLE(msg))
755       SendHypercube(nbytes, msg);
756 #endif
757 }
758
759 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
760
761 #if CMK_SMP
762
763 static int inexit = 0;
764
765 static int MsgQueueEmpty()
766 {
767   int i;
768 #if 0
769   for (i=0; i<_Cmi_mynodesize; i++)
770     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
771 #else
772   return PCQueueEmpty(sendMsgBuf);
773 #endif
774   return 1;
775 }
776
777 static int SendMsgBuf();
778
779 /* test if all processors recv queues are empty */
780 static int RecvQueueEmpty()
781 {
782   int i;
783   for (i=0; i<_Cmi_mynodesize; i++) {
784     CmiState cs=CmiGetStateN(i);
785     if (!PCQueueEmpty(cs->recv)) return 0;
786   }
787   return 1;
788 }
789
790 /**
791 CommunicationServer calls MPI to send messages in the queues and probe message from network.
792 */
793 static void CommunicationServer(int sleepTime)
794 {
795   int static count=0;
796 /*
797   count ++;
798   if (count % 10000000==0) MACHSTATE(3, "Entering CommunicationServer {");
799 */
800   PumpMsgs();
801   CmiReleaseSentMessages();
802   SendMsgBuf();
803 /*
804   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
805 */
806   if (inexit == CmiMyNodeSize()) {
807     MACHSTATE(2, "CommunicationServer exiting {");
808 #if 0
809     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
810 #endif
811     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
812       CmiReleaseSentMessages();
813       SendMsgBuf();
814       PumpMsgs();
815     }
816     MACHSTATE(2, "CommunicationServer barrier begin {");
817
818     START_EVENT();
819
820     if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
821       CmiAbort("ConverseExit: MPI_Barrier failed!\n");
822
823     END_EVENT(10);
824
825     MACHSTATE(2, "} CommunicationServer barrier end");
826 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
827     if (CmiMyNode() == 0){
828       CmiPrintf("End of program\n");
829     }
830 #endif
831     MACHSTATE(2, "} CommunicationServer EXIT");
832 #if CMK_MPI_VMI
833     /* vmi mpi always return exit status 255 when calling MPI_Finalize() which broke Make */
834   exit(0);
835 #else
836     MPI_Finalize();
837 #endif
838     exit(0);
839   }
840 }
841
842 #endif
843
844 static void CommunicationServerThread(int sleepTime)
845 {
846 #if CMK_SMP
847   CommunicationServer(sleepTime);
848 #endif
849 #if CMK_IMMEDIATE_MSG
850   CmiHandleImmediate();
851 #endif
852 }
853
854 #if CMK_NODE_QUEUE_AVAILABLE
855 char *CmiGetNonLocalNodeQ(void)
856 {
857   CmiState cs = CmiGetState();
858   char *result = 0;
859   CmiIdleLock_checkMessage(&cs->idle);
860 /*  if(!PCQueueEmpty(CsvAccess(NodeState).NodeRecv)) {  */
861     MACHSTATE1(3,"CmiGetNonLocalNodeQ begin %d {", CmiMyPe());
862     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
863     result = (char *) PCQueuePop(CsvAccess(NodeState).NodeRecv);
864     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
865     MACHSTATE1(3,"} CmiGetNonLocalNodeQ end %d ", CmiMyPe());
866 /*  }  */
867   return result;
868 }
869 #endif
870
871 void *CmiGetNonLocal(void)
872 {
873   static int count=0;
874   CmiState cs = CmiGetState();
875   void *msg;
876   CmiIdleLock_checkMessage(&cs->idle);
877   /* although it seems that lock is not needed, I found it crashes very often
878      on mpi-smp without lock */
879
880 #if ! CMK_SMP
881   CmiReleaseSentMessages();
882   PumpMsgs();
883 #endif
884
885   CmiLock(procState[cs->rank].recvLock);
886   msg =  PCQueuePop(cs->recv);
887   CmiUnlock(procState[cs->rank].recvLock);
888
889 /*
890   if (msg) {
891     MACHSTATE2(3,"CmiGetNonLocal done on pe %d for queue %p", CmiMyPe(), cs->recv); }
892   else {
893     count++;
894     if (count%1000000==0) MACHSTATE2(3,"CmiGetNonLocal empty on pe %d for queue %p", CmiMyPe(), cs->recv);
895   }
896 */
897 #if ! CMK_SMP
898   if (no_outstanding_sends) {
899     while (MsgQueueLen>0) {
900       CmiReleaseSentMessages();
901       PumpMsgs();
902     }
903   }
904
905   if(!msg) {
906     CmiReleaseSentMessages();
907     if (PumpMsgs())
908       return  PCQueuePop(cs->recv);
909     else
910       return 0;
911   }
912 #endif
913   return msg;
914 }
915
916 /* called in non-smp mode */
917 void CmiNotifyIdle(void)
918 {
919   CmiReleaseSentMessages();
920   if (!PumpMsgs() && idleblock) PumpMsgsBlocking();
921 }
922
923
924 /********************************************************
925     The call to probe immediate messages has been renamed to
926     CmiMachineProgressImpl
927 ******************************************************/
928 /* user call to handle immediate message, only useful in non SMP version
929    using polling method to schedule message.
930 */
931 /*
932 #if CMK_IMMEDIATE_MSG
933 void CmiProbeImmediateMsg()
934 {
935 #if !CMK_SMP
936   PumpMsgs();
937   CmiHandleImmediate();
938 #endif
939 }
940 #endif
941 */
942
943 /* Network progress function is used to poll the network when for
944    messages. This flushes receive buffers on some  implementations*/
945 void CmiMachineProgressImpl()
946 {
947 #if !CMK_SMP
948     PumpMsgs();
949 #if CMK_IMMEDIATE_MSG
950     CmiHandleImmediate();
951 #endif
952 #else
953     /*Not implemented yet. Communication server does not seem to be
954       thread safe */
955     /* CommunicationServerThread(0); */
956 #endif
957 }
958
959 /********************* MESSAGE SEND FUNCTIONS ******************/
960
961 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg);
962
963 static void CmiSendSelf(char *msg)
964 {
965 #if CMK_IMMEDIATE_MSG
966     if (CmiIsImmediate(msg)) {
967       /* CmiBecomeNonImmediate(msg); */
968       CmiPushImmediateMsg(msg);
969       CmiHandleImmediate();
970       return;
971     }
972 #endif
973     CQdCreate(CpvAccess(cQdState), 1);
974     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
975 }
976
977 void CmiSyncSendFn(int destPE, int size, char *msg)
978 {
979   CmiState cs = CmiGetState();
980   char *dupmsg = (char *) CmiAlloc(size);
981   memcpy(dupmsg, msg, size);
982
983   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
984
985   if (cs->pe==destPE) {
986     CmiSendSelf(dupmsg);
987   }
988   else
989     CmiAsyncSendFn_(destPE, size, dupmsg);
990 }
991
992 #if CMK_SMP
993
994 /* called by communication thread in SMP */
995 static int SendMsgBuf()
996 {
997   SMSG_LIST *msg_tmp;
998   char *msg;
999   int node, rank, size;
1000   int i;
1001   int sent = 0;
1002
1003   MACHSTATE(2,"SendMsgBuf begin {");
1004 #if 0
1005   for (i=0; i<_Cmi_mynodesize; i++)
1006   {
1007     while (!PCQueueEmpty(procState[i].sendMsgBuf))
1008     {
1009       msg_tmp = (SMSG_LIST *)PCQueuePop(procState[i].sendMsgBuf);
1010 #else
1011     /* single message sending queue */
1012     CmiLock(sendMsgBufLock);
1013     msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1014     CmiUnlock(sendMsgBufLock);
1015     while (NULL != msg_tmp)
1016     {
1017 #endif
1018       node = msg_tmp->destpe;
1019       size = msg_tmp->size;
1020       msg = msg_tmp->msg;
1021       msg_tmp->next = 0;
1022       while (MsgQueueLen > request_max) {
1023         CmiReleaseSentMessages();
1024         PumpMsgs();
1025       }
1026       MACHSTATE2(3,"MPI_send to node %d rank: %d{", node, CMI_DEST_RANK(msg));
1027       CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1028       CMI_SET_CHECKSUM(msg, size);
1029
1030 #if MPI_POST_RECV_COUNT > 0
1031         if(size <= MPI_POST_RECV_SIZE){
1032
1033           START_EVENT();
1034           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1035                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1036
1037           STOP_EVENT(40);
1038         }
1039         else {
1040             START_EVENT();
1041             if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1042                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1043             STOP_EVENT(40);
1044         }
1045 #else
1046         START_EVENT();
1047         if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1048             CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1049         END_EVENT(40);
1050 #endif
1051
1052       MACHSTATE(3,"}MPI_send end");
1053       MsgQueueLen++;
1054       if(sent_msgs==0)
1055         sent_msgs = msg_tmp;
1056       else
1057         end_sent->next = msg_tmp;
1058       end_sent = msg_tmp;
1059       sent=1;
1060       CmiLock(sendMsgBufLock);
1061       msg_tmp = (SMSG_LIST *)PCQueuePop(sendMsgBuf);
1062       CmiUnlock(sendMsgBufLock);
1063     }
1064 #if 0
1065   }
1066 #endif
1067   MACHSTATE(2,"}SendMsgBuf end ");
1068   return sent;
1069 }
1070
1071 void EnqueueMsg(void *m, int size, int node)
1072 {
1073   SMSG_LIST *msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1074   MACHSTATE1(3,"EnqueueMsg to node %d {{ ", node);
1075   msg_tmp->msg = m;
1076   msg_tmp->size = size;
1077   msg_tmp->destpe = node;
1078   CmiLock(sendMsgBufLock);
1079   PCQueuePush(sendMsgBuf,(char *)msg_tmp);
1080   CmiUnlock(sendMsgBufLock);
1081   MACHSTATE3(3,"}} EnqueueMsg to %d finish with queue %p len: %d", node, sendMsgBuf, PCQueueLength(sendMsgBuf));
1082 }
1083
1084 #endif
1085
1086 CmiCommHandle CmiAsyncSendFn_(int destPE, int size, char *msg)
1087 {
1088   CmiState cs = CmiGetState();
1089   SMSG_LIST *msg_tmp;
1090   CmiUInt2  rank, node;
1091
1092   if(destPE == cs->pe) {
1093     char *dupmsg = (char *) CmiAlloc(size);
1094     memcpy(dupmsg, msg, size);
1095     CmiSendSelf(dupmsg);
1096     return 0;
1097   }
1098   CQdCreate(CpvAccess(cQdState), 1);
1099 #if CMK_SMP
1100   node = CmiNodeOf(destPE);
1101   rank = CmiRankOf(destPE);
1102   if (node == CmiMyNode())  {
1103     CmiPushPE(rank, msg);
1104     return 0;
1105   }
1106   CMI_DEST_RANK(msg) = rank;
1107   EnqueueMsg(msg, size, node);
1108   return 0;
1109 #else
1110   /* non smp */
1111   CMI_DEST_RANK(msg) = 0;       /* rank is always 0 */
1112   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
1113   msg_tmp->msg = msg;
1114   msg_tmp->next = 0;
1115   while (MsgQueueLen > request_max) {
1116         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
1117         CmiReleaseSentMessages();
1118         PumpMsgs();
1119   }
1120   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1121   CMI_SET_CHECKSUM(msg, size);
1122
1123 #if MPI_POST_RECV_COUNT > 0
1124         if(size <= MPI_POST_RECV_SIZE){
1125
1126           START_EVENT();
1127           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,POST_RECV_TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1128                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1129           END_EVENT(40);
1130         }
1131         else {
1132           START_EVENT();
1133           if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1134                 CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1135           END_EVENT(40);
1136         }
1137 #else
1138   START_EVENT();
1139   if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req)))
1140     CmiAbort("CmiAsyncSendFn: MPI_Isend failed!\n");
1141   END_EVENT(40);
1142 #endif
1143
1144   MsgQueueLen++;
1145   if(sent_msgs==0)
1146     sent_msgs = msg_tmp;
1147   else
1148     end_sent->next = msg_tmp;
1149   end_sent = msg_tmp;
1150   return (CmiCommHandle) &(msg_tmp->req);
1151 #endif              /* non-smp */
1152 }
1153
1154 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
1155 {
1156   CMI_SET_BROADCAST_ROOT(msg, 0);
1157   CmiAsyncSendFn_(destPE, size, msg);
1158 }
1159
1160 void CmiFreeSendFn(int destPE, int size, char *msg)
1161 {
1162   CmiState cs = CmiGetState();
1163   CMI_SET_BROADCAST_ROOT(msg, 0);
1164
1165   if (cs->pe==destPE) {
1166     CmiSendSelf(msg);
1167   } else {
1168     CmiAsyncSendFn_(destPE, size, msg);
1169   }
1170 }
1171
1172 /*********************** BROADCAST FUNCTIONS **********************/
1173
1174 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
1175 void CmiSyncSendFn1(int destPE, int size, char *msg)
1176 {
1177   CmiState cs = CmiGetState();
1178   char *dupmsg = (char *) CmiAlloc(size);
1179   memcpy(dupmsg, msg, size);
1180   if (cs->pe==destPE)
1181     CmiSendSelf(dupmsg);
1182   else
1183     CmiAsyncSendFn_(destPE, size, dupmsg);
1184 }
1185
1186 /* send msg to its spanning children in broadcast. G. Zheng */
1187 void SendSpanningChildren(int size, char *msg)
1188 {
1189   CmiState cs = CmiGetState();
1190   int startpe = CMI_BROADCAST_ROOT(msg)-1;
1191   int i;
1192
1193   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
1194
1195   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1196     int p = cs->pe-startpe;
1197     if (p<0) p+=_Cmi_numpes;
1198     p = BROADCAST_SPANNING_FACTOR*p + i;
1199     if (p > _Cmi_numpes - 1) break;
1200     p += startpe;
1201     p = p%_Cmi_numpes;
1202     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1203     CmiSyncSendFn1(p, size, msg);
1204   }
1205 }
1206
1207 #include <math.h>
1208
1209 /* send msg along the hypercube in broadcast. (Sameer) */
1210 void SendHypercube(int size, char *msg)
1211 {
1212   CmiState cs = CmiGetState();
1213   int curcycle = CMI_GET_CYCLE(msg);
1214   int i;
1215
1216   double logp = CmiNumPes();
1217   logp = log(logp)/log(2.0);
1218   logp = ceil(logp);
1219
1220   /*  CmiPrintf("In hypercube\n"); */
1221
1222   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1223
1224   for (i = curcycle; i < logp; i++) {
1225     int p = cs->pe ^ (1 << i);
1226
1227     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1228
1229     if(p < CmiNumPes()) {
1230       CMI_SET_CYCLE(msg, i + 1);
1231       CmiSyncSendFn1(p, size, msg);
1232     }
1233   }
1234 }
1235
1236 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
1237 {
1238   CmiState cs = CmiGetState();
1239 #if CMK_BROADCAST_SPANNING_TREE
1240   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1241   SendSpanningChildren(size, msg);
1242
1243 #elif CMK_BROADCAST_HYPERCUBE
1244   CMI_SET_CYCLE(msg, 0);
1245   SendHypercube(size, msg);
1246
1247 #else
1248   int i;
1249
1250   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1251     CmiSyncSendFn(i, size,msg) ;
1252   for ( i=0; i<cs->pe; i++ )
1253     CmiSyncSendFn(i, size,msg) ;
1254 #endif
1255
1256   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
1257 }
1258
1259
1260 /*  FIXME: luckily async is never used  G. Zheng */
1261 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)
1262 {
1263   CmiState cs = CmiGetState();
1264   int i ;
1265
1266   for ( i=cs->pe+1; i<_Cmi_numpes; i++ )
1267     CmiAsyncSendFn(i,size,msg) ;
1268   for ( i=0; i<cs->pe; i++ )
1269     CmiAsyncSendFn(i,size,msg) ;
1270
1271   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
1272   CmiAbort("CmiAsyncBroadcastFn should never be called");
1273   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1274 }
1275
1276 void CmiFreeBroadcastFn(int size, char *msg)
1277 {
1278    CmiSyncBroadcastFn(size,msg);
1279    CmiFree(msg);
1280 }
1281
1282 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
1283 {
1284
1285 #if CMK_BROADCAST_SPANNING_TREE
1286   CmiState cs = CmiGetState();
1287   CmiSyncSendFn(cs->pe, size,msg) ;
1288   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1289   SendSpanningChildren(size, msg);
1290
1291 #elif CMK_BROADCAST_HYPERCUBE
1292   CmiState cs = CmiGetState();
1293   CmiSyncSendFn(cs->pe, size,msg) ;
1294   CMI_SET_CYCLE(msg, 0);
1295   SendHypercube(size, msg);
1296
1297 #else
1298     int i ;
1299
1300   for ( i=0; i<_Cmi_numpes; i++ )
1301     CmiSyncSendFn(i,size,msg) ;
1302 #endif
1303
1304   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
1305 }
1306
1307 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)
1308 {
1309   int i ;
1310
1311   for ( i=1; i<_Cmi_numpes; i++ )
1312     CmiAsyncSendFn(i,size,msg) ;
1313
1314   CmiAbort("In  AsyncBroadcastAll broadcast\n");
1315
1316   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
1317 }
1318
1319 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
1320 {
1321
1322 #if CMK_BROADCAST_SPANNING_TREE
1323   CmiState cs = CmiGetState();
1324   CmiSyncSendFn(cs->pe, size,msg) ;
1325   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1326   SendSpanningChildren(size, msg);
1327
1328 #elif CMK_BROADCAST_HYPERCUBE
1329   CmiState cs = CmiGetState();
1330   CmiSyncSendFn(cs->pe, size,msg) ;
1331   CMI_SET_CYCLE(msg, 0);
1332   SendHypercube(size, msg);
1333
1334 #else
1335   int i ;
1336
1337   for ( i=0; i<_Cmi_numpes; i++ )
1338     CmiSyncSendFn(i,size,msg) ;
1339 #endif
1340   CmiFree(msg) ;
1341   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
1342 }
1343
1344 #if CMK_NODE_QUEUE_AVAILABLE
1345
1346 static void CmiSendNodeSelf(char *msg)
1347 {
1348 #if CMK_IMMEDIATE_MSG
1349 #if 0
1350     if (CmiIsImmediate(msg) && !_immRunning) {
1351       /*CmiHandleImmediateMessage(msg); */
1352       CmiPushImmediateMsg(msg);
1353       CmiHandleImmediate();
1354       return;
1355     }
1356 #endif
1357     if (CmiIsImmediate(msg))
1358     {
1359       CmiPushImmediateMsg(msg);
1360       if (!_immRunning) CmiHandleImmediate();
1361       return;
1362     }
1363 #endif
1364     CQdCreate(CpvAccess(cQdState), 1);
1365     CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
1366     PCQueuePush(CsvAccess(NodeState).NodeRecv, msg);
1367     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
1368 }
1369
1370 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
1371 {
1372   int i;
1373   SMSG_LIST *msg_tmp;
1374   char *dupmsg;
1375
1376   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
1377   switch (dstNode) {
1378   case NODE_BROADCAST_ALL:
1379     CmiSendNodeSelf((char *)CmiCopyMsg(msg,size));
1380   case NODE_BROADCAST_OTHERS:
1381     CQdCreate(CpvAccess(cQdState), _Cmi_numnodes-1);
1382     for (i=0; i<_Cmi_numnodes; i++)
1383       if (i!=_Cmi_mynode) {
1384         EnqueueMsg((char *)CmiCopyMsg(msg,size), size, i);
1385       }
1386     break;
1387   default:
1388     dupmsg = (char *)CmiCopyMsg(msg,size);
1389     if(dstNode == _Cmi_mynode) {
1390       CmiSendNodeSelf(dupmsg);
1391     }
1392     else {
1393       CQdCreate(CpvAccess(cQdState), 1);
1394       EnqueueMsg(dupmsg, size, dstNode);
1395     }
1396   }
1397   return 0;
1398 }
1399
1400 void CmiSyncNodeSendFn(int p, int s, char *m)
1401 {
1402   CmiAsyncNodeSendFn(p, s, m);
1403 }
1404
1405 /* need */
1406 void CmiFreeNodeSendFn(int p, int s, char *m)
1407 {
1408   CmiAsyncNodeSendFn(p, s, m);
1409   CmiFree(m);
1410 }
1411
1412 /* need */
1413 void CmiSyncNodeBroadcastFn(int s, char *m)
1414 {
1415   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1416 }
1417
1418 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
1419 {
1420 }
1421
1422 /* need */
1423 void CmiFreeNodeBroadcastFn(int s, char *m)
1424 {
1425   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
1426   CmiFree(m);
1427 }
1428
1429 void CmiSyncNodeBroadcastAllFn(int s, char *m)
1430 {
1431   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1432 }
1433
1434 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
1435 {
1436   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1437 }
1438
1439 /* need */
1440 void CmiFreeNodeBroadcastAllFn(int s, char *m)
1441 {
1442   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
1443   CmiFree(m);
1444 }
1445 #endif
1446
1447 /************************** MAIN ***********************************/
1448 #define MPI_REQUEST_MAX 16      //1024*10
1449
1450 void ConverseExit(void)
1451 {
1452 #if ! CMK_SMP
1453   while(!CmiAllAsyncMsgsSent()) {
1454     PumpMsgs();
1455     CmiReleaseSentMessages();
1456   }
1457   if (MPI_SUCCESS != MPI_Barrier(MPI_COMM_WORLD))
1458     CmiAbort("ConverseExit: MPI_Barrier failed!\n");
1459
1460   ConverseCommonExit();
1461 #if CMK_MPI_VMI
1462     /* vmi mpi always return exit status 255 */
1463   exit(0);
1464 #else
1465   MPI_Finalize();
1466 #endif
1467 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
1468   if (CmiMyPe() == 0){
1469     CmiPrintf("End of program\n");
1470 #if MPI_POST_RECV_COUNT > 0
1471     CmiPrintf("%llu posted receives,  %llu unposted receives\n", CpvAccess(Cmi_posted_recv_total), CpvAccess(Cmi_unposted_recv_total));
1472 #endif
1473 }
1474 #endif
1475   exit(0);
1476 #else
1477
1478   /* SMP version, communication thread will exit */
1479   ConverseCommonExit();
1480   /* atomic increment */
1481   CmiCommLock();
1482   inexit++;
1483   CmiCommUnlock();
1484   while (1) CmiYield();
1485 #endif
1486 }
1487
1488 static void registerMPITraceEvents() {
1489 #if CMI_MPI_TRACE_USEREVENTS
1490 #ifndef CMK_OPTIMIZE
1491 #if ! CMK_TRACE_IN_CHARM
1492     traceRegisterUserEvent("MPI_Barrier", 10);
1493     traceRegisterUserEvent("MPI_Send", 20);
1494     traceRegisterUserEvent("MPI_Recv", 30);
1495     traceRegisterUserEvent("MPI_Isend", 40);
1496     traceRegisterUserEvent("MPI_Irecv", 50);
1497 #endif
1498 #endif
1499 #endif
1500 }
1501
1502
1503 static char     **Cmi_argv;
1504 static char     **Cmi_argvcopy;
1505 static CmiStartFn Cmi_startfn;   /* The start function */
1506 static int        Cmi_usrsched;  /* Continue after start function finishes? */
1507
1508 typedef struct {
1509   int sleepMs; /*Milliseconds to sleep while idle*/
1510   int nIdles; /*Number of times we've been idle in a row*/
1511   CmiState cs; /*Machine state*/
1512 } CmiIdleState;
1513
1514 static CmiIdleState *CmiNotifyGetState(void)
1515 {
1516   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
1517   s->sleepMs=0;
1518   s->nIdles=0;
1519   s->cs=CmiGetState();
1520   return s;
1521 }
1522
1523 static void CmiNotifyBeginIdle(CmiIdleState *s)
1524 {
1525   s->sleepMs=0;
1526   s->nIdles=0;
1527 }
1528
1529 static void CmiNotifyStillIdle(CmiIdleState *s)
1530 {
1531 #if ! CMK_SMP
1532   CmiReleaseSentMessages();
1533   PumpMsgs();
1534 #else
1535 /*  CmiYield();  */
1536 #endif
1537
1538 #if 1
1539   {
1540   int nSpins=20; /*Number of times to spin before sleeping*/
1541   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1542   s->nIdles++;
1543   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1544     s->sleepMs+=2;
1545     if (s->sleepMs>10) s->sleepMs=10;
1546   }
1547   /*Comm. thread will listen on sockets-- just sleep*/
1548   if (s->sleepMs>0) {
1549     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1550     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1551     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1552   }
1553   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1554   }
1555 #endif
1556 }
1557
1558 #if MACHINE_DEBUG_LOG
1559 FILE *debugLog = NULL;
1560 #endif
1561
1562 static void ConverseRunPE(int everReturn)
1563 {
1564   CmiIdleState *s=CmiNotifyGetState();
1565   CmiState cs;
1566   char** CmiMyArgv;
1567
1568   CmiNodeAllBarrier();
1569
1570   cs = CmiGetState();
1571   CpvInitialize(void *,CmiLocalQueue);
1572   CpvAccess(CmiLocalQueue) = cs->localqueue;
1573
1574   if (CmiMyRank())
1575     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
1576   else
1577     CmiMyArgv=Cmi_argv;
1578
1579   CthInit(CmiMyArgv);
1580
1581   ConverseCommonInit(CmiMyArgv);
1582
1583   /* initialize the network progress counter*/
1584   /* Network progress function is used to poll the network when for
1585      messages. This flushes receive buffers on some  implementations*/
1586   CpvInitialize(int , networkProgressCount);
1587   CpvAccess(networkProgressCount) = 0;
1588
1589 #if CMK_SMP
1590   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1591   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1592 #else
1593   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
1594 #endif
1595
1596 #if MACHINE_DEBUG_LOG
1597   if (CmiMyRank() == 0) {
1598     char ln[200];
1599     sprintf(ln,"debugLog.%d",CmiMyNode());
1600     debugLog=fopen(ln,"w");
1601   }
1602 #endif
1603
1604   /* Converse initialization finishes, immediate messages can be processed.
1605      node barrier previously should take care of the node synchronization */
1606   _immediateReady = 1;
1607
1608   /* communication thread */
1609   if (CmiMyRank() == CmiMyNodeSize()) {
1610     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1611     while (1) CommunicationServerThread(5);
1612   }
1613   else {  /* worker thread */
1614   if (!everReturn) {
1615     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
1616     if (Cmi_usrsched==0) CsdScheduler(-1);
1617     ConverseExit();
1618   }
1619   }
1620 }
1621
1622 #if CMK_VERSION_BLUEGENE
1623 struct BGLTorusManager;
1624 CpvDeclare(struct BGLTorusManager *, tmanager);
1625 #endif
1626
1627 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
1628 {
1629   int n,i;
1630
1631 #if MACHINE_DEBUG
1632   debugLog=NULL;
1633 #endif
1634 #if CMK_USE_HP_MAIN_FIX
1635 #if FOR_CPLUS
1636   _main(argc,argv);
1637 #endif
1638 #endif
1639
1640 #if CMI_MPI_TRACE_USEREVENTS
1641 #ifndef CMK_OPTIMIZE
1642 #if ! CMK_TRACE_IN_CHARM
1643   CpvInitialize(double, projTraceStart);
1644   /* only PE 0 needs to care about registration (to generate sts file). */
1645   if (CmiMyPe() == 0) {
1646     registerMachineUserEventsFunction(&registerMPITraceEvents);
1647   }
1648 #endif
1649 #endif
1650 #endif
1651
1652   MPI_Init(&argc, &argv);
1653   MPI_Comm_size(MPI_COMM_WORLD, &_Cmi_numnodes);
1654   MPI_Comm_rank(MPI_COMM_WORLD, &_Cmi_mynode);
1655   /* processor per node */
1656   _Cmi_mynodesize = 1;
1657   if (!CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize))
1658     CmiGetArgInt(argv,"++ppn", &_Cmi_mynodesize);
1659 #if ! CMK_SMP
1660   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
1661     CmiAbort("+ppn cannot be used in non SMP version!\n");
1662 #endif
1663   idleblock = CmiGetArgFlag(argv, "+idleblocking");
1664   if (idleblock && _Cmi_mynode == 0) {
1665     printf("Charm++: Running in idle blocking mode.\n");
1666   }
1667
1668 #if CMK_NO_OUTSTANDING_SENDS
1669   no_outstanding_sends=1;
1670 #endif
1671   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
1672     no_outstanding_sends = 1;
1673     if (_Cmi_mynode == 0)
1674       printf("Charm++: Will%s consume outstanding sends in scheduler loop\n",
1675         no_outstanding_sends?"":" not");
1676   }
1677   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
1678   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
1679   Cmi_argvcopy = CmiCopyArgs(argv);
1680   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
1681   /* find dim = log2(numpes), to pretend we are a hypercube */
1682   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
1683     Cmi_dim++ ;
1684  /* CmiSpanTreeInit();*/
1685   request_max=MAX_QLEN;
1686   CmiGetArgInt(argv,"+requestmax",&request_max);
1687   /*printf("request max=%d\n", request_max);*/
1688
1689   /* checksum flag */
1690   if (CmiGetArgFlag(argv,"+checksum")) {
1691 #if !CMK_OPTIMIZE
1692     checksum_flag = 1;
1693     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
1694 #else
1695     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
1696 #endif
1697   }
1698
1699   if (CmiGetArgFlag(argv,"++debug"))
1700   {   /*Pause so user has a chance to start and attach debugger*/
1701     printf("CHARMDEBUG> Processor %d has PID %d\n",_Cmi_mynode,getpid());
1702     fflush(stdout);
1703     if (!CmiGetArgFlag(argv,"++debug-no-pause"))
1704       sleep(10);
1705   }
1706
1707 #if MPI_POST_RECV_COUNT > 0
1708
1709     /* Post some extra recvs to help out with incoming messages */
1710     /* On some MPIs the messages are unexpected and thus slow */
1711
1712     /* An array of request handles for posted recvs */
1713     CpvAccess(CmiPostedRecvRequests) = (MPI_Request*)malloc(sizeof(MPI_Request)*MPI_POST_RECV_COUNT);
1714
1715     /* An array of buffers for posted recvs */
1716     CpvAccess(CmiPostedRecvBuffers) = (char*)malloc(MPI_POST_RECV_COUNT*MPI_POST_RECV_SIZE);
1717
1718     /* Post Recvs */
1719     for(i=0; i<MPI_POST_RECV_COUNT; i++){
1720         if(MPI_SUCCESS != MPI_Irecv(  &(CpvAccess(CmiPostedRecvBuffers)[i*MPI_POST_RECV_SIZE])  ,
1721                     MPI_POST_RECV_SIZE,
1722                     MPI_BYTE,
1723                     MPI_ANY_SOURCE,
1724                     POST_RECV_TAG,
1725                     MPI_COMM_WORLD,
1726                     &(CpvAccess(CmiPostedRecvRequests)[i])  ))
1727           CmiAbort("MPI_Irecv failed\n");
1728     }
1729
1730 #endif
1731
1732
1733
1734   /* CmiTimerInit(); */
1735
1736 #if 0
1737   CthInit(argv);
1738   ConverseCommonInit(argv);
1739
1740   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
1741   if (initret==0) {
1742     fn(CmiGetArgc(argv), argv);
1743     if (usched==0) CsdScheduler(-1);
1744     ConverseExit();
1745   }
1746 #endif
1747
1748   CsvInitialize(CmiNodeState, NodeState);
1749   CmiNodeStateInit(&CsvAccess(NodeState));
1750
1751   procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
1752
1753   for (i=0; i<_Cmi_mynodesize+1; i++) {
1754     /*    procState[i].sendMsgBuf = PCQueueCreate();   */
1755     procState[i].recvLock = CmiCreateLock();
1756   }
1757 #if CMK_SMP
1758   sendMsgBuf = PCQueueCreate();
1759   sendMsgBufLock = CmiCreateLock();
1760 #endif
1761
1762   /* Network progress function is used to poll the network when for
1763      messages. This flushes receive buffers on some  implementations*/
1764   networkProgressPeriod = NETWORK_PROGRESS_PERIOD_DEFAULT;
1765   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
1766
1767 #if CMK_VERSION_BLUEGENE
1768   CpvInitialize(struct BGLTorusManager*, tmanager);
1769   CpvAccess(tmanager) = NULL;
1770 #endif
1771
1772   CmiStartThreads(argv);
1773   ConverseRunPE(initret);
1774 }
1775
1776 /***********************************************************************
1777  *
1778  * Abort function:
1779  *
1780  ************************************************************************/
1781
1782 void CmiAbort(const char *message)
1783 {
1784   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
1785         "Reason: %s\n",CmiMyPe(),message);
1786  /*  CmiError(message); */
1787   CmiPrintStackTrace(0);
1788   MPI_Abort(MPI_COMM_WORLD, 1);
1789 }
1790
1791
1792 #if 0
1793
1794 /* ****************************************************************** */
1795 /*    The following internal functions implement recd msg queue       */
1796 /* ****************************************************************** */
1797
1798 static void ** AllocBlock(unsigned int len)
1799 {
1800   void ** blk;
1801
1802   blk=(void **)CmiAlloc(len*sizeof(void *));
1803   if(blk==(void **)0) {
1804     CmiError("Cannot Allocate Memory!\n");
1805     MPI_Abort(MPI_COMM_WORLD, 1);
1806   }
1807   return blk;
1808 }
1809
1810 static void
1811 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
1812 {
1813   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
1814   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
1815 }
1816
1817 void recdQueueInit(void)
1818 {
1819   recdQueue_blk = AllocBlock(BLK_LEN);
1820   recdQueue_blk_len = BLK_LEN;
1821   recdQueue_first = 0;
1822   recdQueue_len = 0;
1823 }
1824
1825 void recdQueueAddToBack(void *element)
1826 {
1827 #if NODE_0_IS_CONVHOST
1828   inside_comm = 1;
1829 #endif
1830   if(recdQueue_len==recdQueue_blk_len) {
1831     void **blk;
1832     recdQueue_blk_len *= 3;
1833     blk = AllocBlock(recdQueue_blk_len);
1834     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
1835     CmiFree(recdQueue_blk);
1836     recdQueue_blk = blk;
1837     recdQueue_first = 0;
1838   }
1839   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
1840 #if NODE_0_IS_CONVHOST
1841   inside_comm = 0;
1842 #endif
1843 }
1844
1845
1846 void * recdQueueRemoveFromFront(void)
1847 {
1848   if(recdQueue_len) {
1849     void *element;
1850     element = recdQueue_blk[recdQueue_first++];
1851     recdQueue_first %= recdQueue_blk_len;
1852     recdQueue_len--;
1853     return element;
1854   }
1855   return 0;
1856 }
1857
1858 #endif
1859
1860 /*@}*/