got rid of //
[charm.git] / src / arch / mpi / machine.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <sys/time.h>
10 #include <assert.h>
11 #include <errno.h>
12 #include "converse.h"
13 #include <mpi.h>
14
15 /*Support for ++debug: */
16 #include <unistd.h> /*For getpid()*/
17 #include <stdlib.h> /*For sleep()*/
18
19 #ifndef CMK_SMP
20 #if defined(CMK_SHARED_VARS_POSIX_THREADS_SMP)
21 # define CMK_SMP 1
22 #endif
23 #endif
24
25 #include "machine.h"
26
27 #include "pcqueue.h"
28
29 #define FLIPBIT(node,bitnumber) (node ^ (1 << bitnumber))
30 #define MAX_QLEN 200
31
32 /*
33     To reduce the buffer used in broadcast and distribute the load from 
34   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of 
35   spanning tree broadcast algorithm.
36     This will use the fourth short in message as an indicator of spanning tree
37   root.
38 */
39 #if CMK_SMP
40 #define CMK_BROADCAST_SPANNING_TREE    0
41 #else
42 #define CMK_BROADCAST_SPANNING_TREE    1
43 #define CMK_BROADCAST_HYPERCUBE        0
44 #endif
45
46 #define BROADCAST_SPANNING_FACTOR      4
47
48 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
49 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
50
51 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
52
53 #if CMK_BROADCAST_SPANNING_TREE
54 #  define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
55 #else
56 #  define CMI_SET_BROADCAST_ROOT(msg, root)
57 #endif
58
59 #if CMK_BROADCAST_HYPERCUBE
60 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
61 #else
62 #  define CMI_SET_CYCLE(msg, cycle)
63 #endif
64
65
66 #define TAG     1375
67
68 int Cmi_numpes;
69 int               Cmi_mynode;    /* Which address space am I */
70 int               Cmi_mynodesize;/* Number of processors in my address space */
71 int               Cmi_numnodes;  /* Total number of address spaces */
72 int               Cmi_numpes;    /* Total number of processors */
73 static int        Cmi_nodestart; /* First processor in this address space */ 
74 CpvDeclare(void*, CmiLocalQueue);
75
76 #define BLK_LEN  512
77
78 #if CMK_NODE_QUEUE_AVAILABLE
79 #define DGRAM_NODEMESSAGE   (0xFB)
80
81 #define NODE_BROADCAST_OTHERS (-1)
82 #define NODE_BROADCAST_ALL    (-2)
83 #endif
84
85 #if 0
86 static void **recdQueue_blk;
87 static unsigned int recdQueue_blk_len;
88 static unsigned int recdQueue_first;
89 static unsigned int recdQueue_len;
90 static void recdQueueInit(void);
91 static void recdQueueAddToBack(void *element);
92 static void *recdQueueRemoveFromFront(void);
93 #endif
94
95 static void ConverseRunPE(int everReturn);
96 static void CommunicationServer(int sleepTime);
97
98 typedef struct msg_list {
99      MPI_Request req;
100      char *msg;
101      struct msg_list *next;
102      int size, destpe;
103 } SMSG_LIST;
104
105 static int MsgQueueLen=0;
106 static int request_max;
107
108 static SMSG_LIST *sent_msgs=0;
109 static SMSG_LIST *end_sent=0;
110
111 static int Cmi_dim;
112
113 #if NODE_0_IS_CONVHOST
114 int inside_comm = 0;
115 #endif
116
117 double starttimer;
118
119 void CmiAbort(const char *message);
120 static void PerrorExit(const char *msg);
121
122 void SendSpanningChildren(int size, char *msg);
123 void SendHypercube(int size, char *msg);
124
125 static void PerrorExit(const char *msg)
126 {
127   perror(msg);
128   exit(1);
129 }
130
131
132 char *CopyMsg(char *msg, int len)
133 {
134   char *copy = (char *)CmiAlloc(len);
135   if (!copy)
136       fprintf(stderr, "Out of memory\n");
137   memcpy(copy, msg, len);
138   return copy;
139 }
140
141 /**************************  TIMER FUNCTIONS **************************/
142
143 void CmiTimerInit(void)
144 {
145   starttimer = PMPI_Wtime();
146 }
147
148 double CmiTimer(void)
149 {
150   return PMPI_Wtime() - starttimer;
151 }
152
153 double CmiWallTimer(void)
154 {
155   return PMPI_Wtime() - starttimer;
156 }
157
158 double CmiCpuTimer(void)
159 {
160   return PMPI_Wtime() - starttimer;
161 }
162
163 static PCQueue  *msgBuf;
164
165 /************************************************************
166  * 
167  * Processor state structure
168  *
169  ************************************************************/
170
171 #if CMK_NODE_QUEUE_AVAILABLE
172 CsvStaticDeclare(CmiNodeLock, CmiNodeRecvLock);
173 CsvStaticDeclare(PCQueue, NodeRecv);
174 #endif
175
176 #include "machine-smp.c"
177
178 #if ! CMK_SMP
179 /************ non SMP **************/
180 static struct CmiStateStruct Cmi_state;
181 int Cmi_mype;
182 int Cmi_myrank;
183
184 void CmiMemLock(void) {}
185 void CmiMemUnlock(void) {}
186
187 #define CmiGetState() (&Cmi_state)
188 #define CmiGetStateN(n) (&Cmi_state)
189
190 void CmiYield(void) { sleep(0); }
191
192 static void CmiStartThreads(char **argv)
193 {
194   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
195   Cmi_mype = Cmi_nodestart;
196   Cmi_myrank = 0;
197 }      
198 #endif
199
200 /*Add a message to this processor's receive queue */
201 static void CmiPushPE(int pe,void *msg)
202 {
203   CmiState cs=CmiGetStateN(pe);
204   MACHSTATE1(2,"Pushing message into %d's queue",pe);
205   CmiIdleLock_addMessage(&cs->idle); 
206   PCQueuePush(cs->recv,msg);
207 }
208
209 #ifndef CmiMyPe
210 int CmiMyPe(void)
211 {
212   return CmiGetState()->pe;
213 }
214 #endif
215
216 #ifndef CmiMyRank
217 int CmiMyRank(void)
218 {
219   return CmiGetState()->rank;
220 }
221 #endif
222
223 #ifndef CmiNodeFirst
224 int CmiNodeFirst(int node) { return node*Cmi_mynodesize; }
225 int CmiNodeSize(int node)  { return Cmi_mynodesize; }
226 #endif
227
228 #ifndef CmiNodeOf
229 int CmiNodeOf(int pe)      { return (pe/Cmi_mynodesize); }
230 int CmiRankOf(int pe)      { return pe%Cmi_mynodesize; }
231 #endif
232
233 static int CmiAllAsyncMsgsSent(void)
234 {
235    SMSG_LIST *msg_tmp = sent_msgs;
236    MPI_Status sts;
237    int done;
238      
239    while(msg_tmp!=0) {
240     done = 0;
241     if (MPI_SUCCESS != PMPI_Test(&(msg_tmp->req), &done, &sts)) 
242       CmiAbort("CmiAllAsyncMsgsSent: PMPI_Test failed!\n");
243     if(!done)
244       return 0;
245     msg_tmp = msg_tmp->next;
246     MsgQueueLen--;
247    }
248    return 1;
249 }
250
251 int CmiAsyncMsgSent(CmiCommHandle c) {
252      
253   SMSG_LIST *msg_tmp = sent_msgs;
254   int done;
255   MPI_Status sts;
256
257   while ((msg_tmp) && ((CmiCommHandle)&(msg_tmp->req) != c))
258     msg_tmp = msg_tmp->next;
259   if(msg_tmp) {
260     done = 0;
261     if (MPI_SUCCESS != PMPI_Test(&(msg_tmp->req), &done, &sts)) 
262       CmiAbort("CmiAsyncMsgSent: PMPI_Test failed!\n");
263     return ((done)?1:0);
264   } else {
265     return 1;
266   }
267 }
268
269 void CmiReleaseCommHandle(CmiCommHandle c)
270 {
271   return;
272 }
273
274
275 static void CmiReleaseSentMessages(void)
276 {
277   SMSG_LIST *msg_tmp=sent_msgs;
278   SMSG_LIST *prev=0;
279   SMSG_LIST *temp;
280   int done;
281   MPI_Status sts;
282   int locked = 0;
283      
284   while(msg_tmp!=0) {
285     done =0;
286     if(PMPI_Test(&(msg_tmp->req), &done, &sts) != MPI_SUCCESS)
287       CmiAbort("CmiReleaseSentMessages: PMPI_Test failed!\n");
288     if(done) {
289       MsgQueueLen--;
290       /* Release the message */
291       temp = msg_tmp->next;
292       if(prev==0)  /* first message */
293         sent_msgs = temp;
294       else
295         prev->next = temp;
296       CmiFree(msg_tmp->msg);
297       CmiFree(msg_tmp);
298       msg_tmp = temp;
299     } else {
300       prev = msg_tmp;
301       msg_tmp = msg_tmp->next;
302     }
303   }
304   end_sent = prev;
305 }
306
307 static int PumpMsgs(void)
308 {
309   int nbytes, flg, res;
310   char *msg;
311   MPI_Status sts;
312   int recd=0;
313
314   while(1) {
315     flg = 0;
316     res = PMPI_Iprobe(MPI_ANY_SOURCE, TAG, MPI_COMM_WORLD, &flg, &sts);
317     if(res != MPI_SUCCESS)
318       CmiAbort("PMPI_Iprobe failed\n");
319     if(!flg)
320       return recd;
321     recd = 1;
322     PMPI_Get_count(&sts, MPI_BYTE, &nbytes);
323     msg = (char *) CmiAlloc(nbytes);
324     if (MPI_SUCCESS != PMPI_Recv(msg,nbytes,MPI_BYTE,sts.MPI_SOURCE,TAG, MPI_COMM_WORLD,&sts)) 
325       CmiAbort("PumpMsgs: PMPI_Recv failed!\n");
326
327 #if CMK_NODE_QUEUE_AVAILABLE
328     if (CMI_DEST_RANK(msg)==DGRAM_NODEMESSAGE)
329       PCQueuePush(CsvAccess(NodeRecv), msg);
330     else
331 #endif
332       CmiPushPE(CMI_DEST_RANK(msg), msg);
333
334 #if CMK_BROADCAST_SPANNING_TREE
335     if (CMI_BROADCAST_ROOT(msg))
336       SendSpanningChildren(nbytes, msg);
337 #elif CMK_BROADCAST_HYPERCUBE
338     if (CMI_GET_CYCLE(msg))
339       SendHypercube(nbytes, msg);
340 #endif
341   }
342 }
343
344 /********************* MESSAGE RECEIVE FUNCTIONS ******************/
345
346 static int inexit = 0;
347
348 static int MsgQueueEmpty()
349 {
350   int i;
351   for (i=0; i<Cmi_mynodesize; i++)
352     if (!PCQueueEmpty(msgBuf[i])) return 0;
353   return 1;
354 }
355
356 #if CMK_SMP
357 /**
358 CommunicationServer calls MPI to send messages in the queues and probe message from network.
359 */
360 static void CommunicationServer(int sleepTime)
361 {
362   CmiReleaseSentMessages();
363   SendMsgBuf(); 
364   if (!PumpMsgs()) CmiYield(); 
365   if (inexit == 1) {
366     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
367       CmiReleaseSentMessages();
368       SendMsgBuf(); 
369       PumpMsgs();
370     }
371     if (MPI_SUCCESS != PMPI_Barrier(MPI_COMM_WORLD))
372       CmiAbort("ConverseExit: PMPI_Barrier failed!\n");
373     PMPI_Finalize();
374 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
375     if (CmiMyNode() == 0){
376       CmiPrintf("End of program\n");
377     }
378 #endif
379     exit(0);   
380   }
381 }
382 #endif
383
384 #if CMK_NODE_QUEUE_AVAILABLE
385 char *CmiGetNonLocalNodeQ(void)
386 {
387   char *result = 0;
388   if(!PCQueueEmpty(CsvAccess(NodeRecv))) {
389     CmiLock(CsvAccess(CmiNodeRecvLock));
390     result = (char *) PCQueuePop(CsvAccess(NodeRecv));
391     CmiUnlock(CsvAccess(CmiNodeRecvLock));
392   }
393   return result;
394 }
395 #endif
396
397 void *CmiGetNonLocal(void)
398 {
399   CmiState cs = CmiGetState();
400   void *msg;
401   CmiIdleLock_checkMessage(&cs->idle);
402   msg =  PCQueuePop(cs->recv); 
403 #if ! CMK_SMP
404   if(!msg) {
405     CmiReleaseSentMessages();
406     if (PumpMsgs())
407       return  PCQueuePop(cs->recv);
408     else
409       return 0;
410   }
411 #endif
412   return msg;
413 }
414
415 void CmiNotifyIdle(void)
416 {
417   CmiReleaseSentMessages();
418   PumpMsgs();
419 }
420  
421 /********************* MESSAGE SEND FUNCTIONS ******************/
422
423 void CmiSyncSendFn(int destPE, int size, char *msg)
424 {
425   CmiState cs = CmiGetState();
426   char *dupmsg = (char *) CmiAlloc(size);
427   memcpy(dupmsg, msg, size);
428
429   CMI_SET_BROADCAST_ROOT(dupmsg, 0);
430
431   if (cs->pe==destPE) {
432     CQdCreate(CpvAccess(cQdState), 1);
433     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
434   }
435   else
436     CmiAsyncSendFn(destPE, size, dupmsg);
437 }
438
439 static int SendMsgBuf()
440 {
441   SMSG_LIST *msg_tmp;
442   char *msg;
443   int node, rank, size;
444   int i;
445
446   for (i=0; i<Cmi_mynodesize; i++)
447   {
448     while (!PCQueueEmpty(msgBuf[i]))
449     {
450       msg_tmp = (SMSG_LIST *)PCQueuePop(msgBuf[i]);
451       node = msg_tmp->destpe;
452       size = msg_tmp->size;
453       msg = msg_tmp->msg;
454       msg_tmp->next = 0;
455       while (MsgQueueLen > request_max) {
456         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
457         CmiReleaseSentMessages();
458         PumpMsgs();
459       }
460       if (MPI_SUCCESS != PMPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(msg_tmp->req))) 
461         CmiAbort("CmiAsyncSendFn: PMPI_Isend failed!\n");
462       MsgQueueLen++;
463       if(sent_msgs==0)
464         sent_msgs = msg_tmp;
465       else
466         end_sent->next = msg_tmp;
467       end_sent = msg_tmp;
468     }
469   } 
470 }
471
472 #if CMK_SMP
473 #define EnqueueMsg(m, size, node)    {  \
474   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));  \
475   msg_tmp->msg = m;     \
476   msg_tmp->size = size; \
477   msg_tmp->destpe = node;       \
478   PCQueuePush(msgBuf[CmiMyRank()],(char *)msg_tmp);     \
479   }
480 #endif
481
482 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg)
483 {
484   CmiState cs = CmiGetState();
485   SMSG_LIST *msg_tmp;
486   CmiUInt2  rank, node;
487      
488   CQdCreate(CpvAccess(cQdState), 1);
489   if(destPE == cs->pe) {
490     char *dupmsg = (char *) CmiAlloc(size);
491     memcpy(dupmsg, msg, size);
492     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
493     return 0;
494   }
495 #if CMK_SMP
496   node = CmiNodeOf(destPE);
497   rank = CmiRankOf(destPE);
498   if (node == CmiMyNode())  {
499     CmiPushPE(rank, msg);
500     return 0;
501   }
502   CMI_DEST_RANK(msg) = rank;
503   EnqueueMsg(msg, size, node);
504   return 0;
505 #else
506   msg_tmp = (SMSG_LIST *) CmiAlloc(sizeof(SMSG_LIST));
507   msg_tmp->msg = msg;
508   msg_tmp->next = 0;
509   while (MsgQueueLen > request_max) {
510         /*printf("Waiting for %d messages to be sent\n", MsgQueueLen);*/
511         CmiReleaseSentMessages();
512         PumpMsgs();
513   }
514   if (MPI_SUCCESS != PMPI_Isend((void *)msg,size,MPI_BYTE,destPE,TAG,MPI_COMM_WORLD,&(msg_tmp->req))) 
515     CmiAbort("CmiAsyncSendFn: PMPI_Isend failed!\n");
516   MsgQueueLen++;
517   if(sent_msgs==0)
518     sent_msgs = msg_tmp;
519   else
520     end_sent->next = msg_tmp;
521   end_sent = msg_tmp;
522   return (CmiCommHandle) &(msg_tmp->req);
523 #endif
524 }
525
526 void CmiFreeSendFn(int destPE, int size, char *msg)
527 {
528   CmiState cs = CmiGetState();
529   CMI_SET_BROADCAST_ROOT(msg, 0);
530
531   if (cs->pe==destPE) {
532     CQdCreate(CpvAccess(cQdState), 1);
533     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
534   } else {
535     CmiAsyncSendFn(destPE, size, msg);
536   }
537 }
538
539 /*********************** BROADCAST FUNCTIONS **********************/
540
541 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
542 void CmiSyncSendFn1(int destPE, int size, char *msg)
543 {
544   CmiState cs = CmiGetState();
545   char *dupmsg = (char *) CmiAlloc(size);
546   memcpy(dupmsg, msg, size);
547   if (cs->pe==destPE) {
548     CQdCreate(CpvAccess(cQdState), 1);
549     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
550   }
551   else
552     CmiAsyncSendFn(destPE, size, dupmsg);
553 }
554
555 /* send msg to its spanning children in broadcast. G. Zheng */
556 void SendSpanningChildren(int size, char *msg)
557 {
558   CmiState cs = CmiGetState();
559   int startpe = CMI_BROADCAST_ROOT(msg)-1;
560   int i;
561
562   assert(startpe>=0 && startpe<Cmi_numpes);
563
564   for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
565     int p = cs->pe-startpe;
566     if (p<0) p+=Cmi_numpes;
567     p = BROADCAST_SPANNING_FACTOR*p + i;
568     if (p > Cmi_numpes - 1) break;
569     p += startpe;
570     p = p%Cmi_numpes;
571     assert(p>=0 && p<Cmi_numpes && p!=cs->pe);
572     CmiSyncSendFn1(p, size, msg);
573   }
574 }
575
576 #include <math.h>
577
578 /* send msg along the hypercube in broadcast. (Sameer) */
579 void SendHypercube(int size, char *msg)
580 {
581   CmiState cs = CmiGetState();
582   int curcycle = CMI_GET_CYCLE(msg);
583   int i;
584
585   double logp = CmiNumPes();
586   logp = log(logp)/log(2.0);
587   logp = ceil(logp);
588   
589   /*  CmiPrintf("In hypercube\n"); */
590
591   /* assert(startpe>=0 && startpe<Cmi_numpes); */
592
593   for (i = curcycle; i < logp; i++) {
594     int p = cs->pe ^ (1 << i);
595     
596     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
597
598     if(p < CmiNumPes()) {
599       CMI_SET_CYCLE(msg, i + 1);
600       CmiSyncSendFn1(p, size, msg);
601     }
602   }
603 }
604
605 void CmiSyncBroadcastFn(int size, char *msg)     /* ALL_EXCEPT_ME  */
606 {
607   CmiState cs = CmiGetState();
608 #if CMK_BROADCAST_SPANNING_TREE
609   CMI_SET_BROADCAST_ROOT(msg, Cmi_mype+1);
610   SendSpanningChildren(size, msg);
611   
612 #elif CMK_BROADCAST_HYPERCUBE
613   CMI_SET_CYCLE(msg, 0);
614   SendHypercube(size, msg);
615     
616 #else
617   int i;
618
619   for ( i=cs->pe+1; i<Cmi_numpes; i++ ) 
620     CmiSyncSendFn(i, size,msg) ;
621   for ( i=0; i<cs->pe; i++ ) 
622     CmiSyncSendFn(i, size,msg) ;
623 #endif
624
625   /*CmiPrintf("In  SyncBroadcast broadcast\n");*/
626 }
627
628
629 /*  FIXME: luckily async is never used  G. Zheng */
630 CmiCommHandle CmiAsyncBroadcastFn(int size, char *msg)  
631 {
632   CmiState cs = CmiGetState();
633   int i ;
634
635   for ( i=cs->pe+1; i<Cmi_numpes; i++ ) 
636     CmiAsyncSendFn(i,size,msg) ;
637   for ( i=0; i<cs->pe; i++ ) 
638     CmiAsyncSendFn(i,size,msg) ;
639
640   /*CmiPrintf("In  AsyncBroadcast broadcast\n");*/
641   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
642 }
643
644 void CmiFreeBroadcastFn(int size, char *msg)
645 {
646    CmiSyncBroadcastFn(size,msg);
647    CmiFree(msg);
648 }
649  
650 void CmiSyncBroadcastAllFn(int size, char *msg)        /* All including me */
651 {
652  
653 #if CMK_BROADCAST_SPANNING_TREE
654   CmiState cs = CmiGetState();
655   CmiSyncSendFn(cs->pe, size,msg) ;
656   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
657   SendSpanningChildren(size, msg);
658
659 #elif CMK_BROADCAST_HYPERCUBE
660   CmiState cs = CmiGetState();
661   CmiSyncSendFn(cs->pe, size,msg) ;
662   CMI_SET_CYCLE(msg, 0);
663   SendHypercube(size, msg);
664
665 #else
666     int i ;
667      
668   for ( i=0; i<Cmi_numpes; i++ ) 
669     CmiSyncSendFn(i,size,msg) ;
670 #endif
671
672   /*CmiPrintf("In  SyncBroadcastAll broadcast\n");*/
673 }
674
675 CmiCommHandle CmiAsyncBroadcastAllFn(int size, char *msg)  
676 {
677   int i ;
678
679   for ( i=1; i<Cmi_numpes; i++ ) 
680     CmiAsyncSendFn(i,size,msg) ;
681
682   /*CmiPrintf("In  AsyncBroadcastAll broadcast\n");*/
683     
684   return (CmiCommHandle) (CmiAllAsyncMsgsSent());
685 }
686
687 void CmiFreeBroadcastAllFn(int size, char *msg)  /* All including me */
688 {
689
690 #if CMK_BROADCAST_SPANNING_TREE
691   CmiState cs = CmiGetState();
692   CmiSyncSendFn(cs->pe, size,msg) ;
693   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
694   SendSpanningChildren(size, msg);
695
696 #elif CMK_BROADCAST_HYPERCUBE
697   CmiState cs = CmiGetState();
698   CmiSyncSendFn(cs->pe, size,msg) ;
699   CMI_SET_CYCLE(msg, 0);
700   SendHypercube(size, msg);
701
702 #else
703   int i ;
704      
705   for ( i=0; i<Cmi_numpes; i++ ) 
706     CmiSyncSendFn(i,size,msg) ;
707 #endif
708   CmiFree(msg) ;
709   /*CmiPrintf("In FreeBroadcastAll broadcast\n");*/
710 }
711
712 #if CMK_NODE_QUEUE_AVAILABLE
713
714 CmiCommHandle CmiAsyncNodeSendFn(int dstNode, int size, char *msg)
715 {
716   int i;
717   SMSG_LIST *msg_tmp;
718   char *dupmsg;
719      
720   CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
721   switch (dstNode) {
722   case NODE_BROADCAST_ALL:
723     CQdCreate(CpvAccess(cQdState), 1);
724     PCQueuePush(CsvAccess(NodeRecv),(char *)CopyMsg(msg,size));
725   case NODE_BROADCAST_OTHERS:
726     CQdCreate(CpvAccess(cQdState), Cmi_mynode-1);
727     for (i=0; i<Cmi_numnodes; i++)
728       if (i!=Cmi_mynode) {
729         EnqueueMsg((char *)CopyMsg(msg,size), size, i);
730       }
731     break;
732   default:
733     CQdCreate(CpvAccess(cQdState), 1);
734     dupmsg = (char *) CmiAlloc(size);
735     memcpy(dupmsg, msg, size);
736     if(dstNode == Cmi_mynode) {
737       PCQueuePush(CsvAccess(NodeRecv), dupmsg);
738       return 0;
739     }
740     else {
741       EnqueueMsg(dupmsg, size, dstNode);
742     }
743   }
744   return 0;
745 }
746
747 void CmiSyncNodeSendFn(int p, int s, char *m)
748 {
749 }
750
751 /* need */
752 void CmiFreeNodeSendFn(int p, int s, char *m)
753 {
754   CmiAsyncNodeSendFn(p, s, m);
755   CmiFree(m);
756 }
757
758 /* need */
759 void CmiSyncNodeBroadcastFn(int s, char *m)
760 {
761   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
762 }
763
764 CmiCommHandle CmiAsyncNodeBroadcastFn(int s, char *m)
765 {
766 }
767
768 /* need */
769 void CmiFreeNodeBroadcastFn(int s, char *m)
770 {
771   CmiAsyncNodeSendFn(NODE_BROADCAST_OTHERS, s, m);
772   CmiFree(m);
773 }
774
775 void CmiSyncNodeBroadcastAllFn(int s, char *m)
776 {
777   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
778 }
779
780 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int s, char *m)
781 {
782 }
783
784 /* need */
785 void CmiFreeNodeBroadcastAllFn(int s, char *m)
786 {
787   CmiAsyncNodeSendFn(NODE_BROADCAST_ALL, s, m);
788   CmiFree(m);
789 }
790 #endif
791
792 /************************** MAIN ***********************************/
793 #define MPI_REQUEST_MAX 1024*10 
794
795 void ConverseExit(void)
796 {
797 #if ! CMK_SMP
798   while(!CmiAllAsyncMsgsSent()) {
799     PumpMsgs();
800     CmiReleaseSentMessages();
801   }
802   if (MPI_SUCCESS != PMPI_Barrier(MPI_COMM_WORLD)) 
803     CmiAbort("ConverseExit: PMPI_Barrier failed!\n");
804   ConverseCommonExit();
805   PMPI_Finalize();
806 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
807   if (CmiMyPe() == 0){
808     CmiPrintf("End of program\n");
809   }
810 #endif
811   exit(0);
812 #else
813   /* SMP version, communication thread will exit */
814   ConverseCommonExit();
815   inexit = 1;
816   while (1) CmiYield();
817 #endif
818 }
819
820 static char     **Cmi_argv;
821 static CmiStartFn Cmi_startfn;   /* The start function */
822 static int        Cmi_usrsched;  /* Continue after start function finishes? */
823
824 typedef struct {
825   int sleepMs; /*Milliseconds to sleep while idle*/
826   int nIdles; /*Number of times we've been idle in a row*/
827   CmiState cs; /*Machine state*/
828 } CmiIdleState;
829
830 static CmiIdleState *CmiNotifyGetState(void)
831 {
832   CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
833   s->sleepMs=0;
834   s->nIdles=0;
835   s->cs=CmiGetState();
836   return s;
837 }
838
839 static void CmiNotifyBeginIdle(CmiIdleState *s)
840 {
841   s->sleepMs=0;
842   s->nIdles=0;
843 }
844     
845 static void CmiNotifyStillIdle(CmiIdleState *s)
846
847 #if ! CMK_SMP
848   CmiReleaseSentMessages();
849   PumpMsgs();
850 #else
851   CmiYield();
852 #endif
853
854 #if 0
855   int nSpins=20; /*Number of times to spin before sleeping*/
856   s->nIdles++;
857   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
858     s->sleepMs+=2;
859     if (s->sleepMs>10) s->sleepMs=10;
860   }
861   /*Comm. thread will listen on sockets-- just sleep*/
862   if (s->sleepMs>0) {
863     MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
864     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
865     MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
866   }       
867 #endif
868 }
869
870 static void ConverseRunPE(int everReturn)
871 {
872   CmiIdleState *s=CmiNotifyGetState();
873   CmiState cs;
874   char** CmiMyArgv;
875   CmiNodeBarrier();
876   cs = CmiGetState();
877   CpvInitialize(void *,CmiLocalQueue);
878   CpvAccess(CmiLocalQueue) = cs->localqueue;
879   CmiMyArgv=CmiCopyArgs(Cmi_argv);
880   CthInit(CmiMyArgv);
881 #if MACHINE_DEBUG_LOG
882   {
883     char ln[200];
884     sprintf(ln,"debugLog.%d",CmiMyPe());
885     debugLog=fopen(ln,"w");
886   }
887 #endif
888
889   ConverseCommonInit(CmiMyArgv);
890
891 #if CMK_SMP
892   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,CmiNotifyBeginIdle,(void *)s);
893   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyStillIdle,(void *)s);
894 #else
895   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
896 #endif
897
898   if (!everReturn) {
899     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
900     if (Cmi_usrsched==0) CsdScheduler(-1);
901     ConverseExit();
902   }
903 }
904
905 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret)
906 {
907   int n,i ;
908 #if CMK_USE_HP_MAIN_FIX
909 #if FOR_CPLUS
910   _main(argc,argv);
911 #endif
912 #endif
913   
914   PMPI_Init(&argc, &argv);
915   PMPI_Comm_size(MPI_COMM_WORLD, &Cmi_numnodes);
916   PMPI_Comm_rank(MPI_COMM_WORLD, &Cmi_mynode);
917   /* processor per node */
918   Cmi_mynodesize = 1;
919   CmiGetArgInt(argv,"+ppn", &Cmi_mynodesize);
920 #if ! CMK_SMP
921   if (Cmi_mynodesize > 1 && Cmi_mynode == 0) 
922     CmiAbort("+ppn cannot be used in non SMP version!\n");
923 #endif
924   Cmi_numpes = Cmi_numnodes * Cmi_mynodesize;
925   Cmi_nodestart = Cmi_mynode * Cmi_mynodesize;
926   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
927   /* find dim = log2(numpes), to pretend we are a hypercube */
928   for ( Cmi_dim=0,n=Cmi_numpes; n>1; n/=2 )
929     Cmi_dim++ ;
930  /* CmiSpanTreeInit();*/
931   i=0;
932   request_max=MAX_QLEN;
933   CmiGetArgInt(argv,"+requestmax",&request_max);
934   /*printf("request max=%d\n", request_max);*/
935   if (CmiGetArgFlag(argv,"++debug"))
936   {   /*Pause so user has a chance to start and attach debugger*/
937     printf("CHARMDEBUG> Processor %d has PID %d\n",CmiMyNode(),getpid());
938     if (!CmiGetArgFlag(argv,"++debug-no-pause"))
939       sleep(10);
940   }
941
942   CmiTimerInit();
943 #if CMK_SMP
944   msgBuf = (PCQueue *)malloc(Cmi_mynodesize * sizeof(PCQueue));
945   for (i=0; i<Cmi_mynodesize; i++)
946     msgBuf[i] = PCQueueCreate();
947 #endif
948
949 #if 0
950   CthInit(argv);
951   ConverseCommonInit(argv);
952   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,CmiNotifyIdle,NULL);
953   if (initret==0) {
954     fn(CmiGetArgc(argv), argv);
955     if (usched==0) CsdScheduler(-1);
956     ConverseExit();
957   }
958 #endif
959   CmiStartThreads(argv);
960   ConverseRunPE(initret);
961 }
962
963 /***********************************************************************
964  *
965  * Abort function:
966  *
967  ************************************************************************/
968
969 void CmiAbort(const char *message)
970 {
971   CmiError(message);
972   PMPI_Abort(MPI_COMM_WORLD, 1);
973 }
974
975
976 #if 0
977
978 /* ****************************************************************** */
979 /*    The following internal functions implement recd msg queue       */
980 /* ****************************************************************** */
981
982 static void ** AllocBlock(unsigned int len)
983 {
984   void ** blk;
985
986   blk=(void **)CmiAlloc(len*sizeof(void *));
987   if(blk==(void **)0) {
988     CmiError("Cannot Allocate Memory!\n");
989     PMPI_Abort(MPI_COMM_WORLD, 1);
990   }
991   return blk;
992 }
993
994 static void 
995 SpillBlock(void **srcblk, void **destblk, unsigned int first, unsigned int len)
996 {
997   memcpy(destblk, &srcblk[first], (len-first)*sizeof(void *));
998   memcpy(&destblk[len-first],srcblk,first*sizeof(void *));
999 }
1000
1001 void recdQueueInit(void)
1002 {
1003   recdQueue_blk = AllocBlock(BLK_LEN);
1004   recdQueue_blk_len = BLK_LEN;
1005   recdQueue_first = 0;
1006   recdQueue_len = 0;
1007 }
1008
1009 void recdQueueAddToBack(void *element)
1010 {
1011 #if NODE_0_IS_CONVHOST
1012   inside_comm = 1;
1013 #endif
1014   if(recdQueue_len==recdQueue_blk_len) {
1015     void **blk;
1016     recdQueue_blk_len *= 3;
1017     blk = AllocBlock(recdQueue_blk_len);
1018     SpillBlock(recdQueue_blk, blk, recdQueue_first, recdQueue_len);
1019     CmiFree(recdQueue_blk);
1020     recdQueue_blk = blk;
1021     recdQueue_first = 0;
1022   }
1023   recdQueue_blk[(recdQueue_first+recdQueue_len++)%recdQueue_blk_len] = element;
1024 #if NODE_0_IS_CONVHOST
1025   inside_comm = 0;
1026 #endif
1027 }
1028
1029
1030 void * recdQueueRemoveFromFront(void)
1031 {
1032   if(recdQueue_len) {
1033     void *element;
1034     element = recdQueue_blk[recdQueue_first++];
1035     recdQueue_first %= recdQueue_blk_len;
1036     recdQueue_len--;
1037     return element;
1038   }
1039   return 0;
1040 }
1041
1042 #endif