Changes for rectangular broadcast.
[charm.git] / src / arch / bluegenel / bglmachine.C
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include <stack>
12
13 #include "rts.h"
14 #include "bgml.h"
15
16 #if CMK_PERSISTENT_COMM
17 #include "persist_impl.h"
18 #endif
19
20 #if 0
21 #define BGL_DEBUG CmiPrintf
22 #else
23 #define BGL_DEBUG // CmiPrintf
24 #endif
25
26 extern "C"
27 void        BG2S_UnOrdered_Send     (BG2S_t               * request,
28                                      const BGML_Callback_t* cb_info,
29                                      const BGQuad         * msginfo,
30                                      const char           * sndbuf,
31                                      unsigned               sndlen,
32                                      unsigned               destrank);
33
34 int phscount;
35 int vnpeer = -1;
36
37 inline char *ALIGN_16(char *p){
38   return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
39 }
40
41 PCQueue message_q;                   //queue to receive incoming messages
42 PCQueue broadcast_q;                 //queue to send broadcast messages
43
44 #define PROGRESS_PERIOD 8
45 #define PROGRESS_CYCLES 4000           //10k cycles
46
47 /*
48     To reduce the buffer used in broadcast and distribute the load from
49   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
50   spanning tree broadcast algorithm.
51     This will use the fourth short in message as an indicator of spanning tree
52   root.
53 */
54 #if CMK_SMP
55 #define CMK_BROADCAST_SPANNING_TREE    0
56 #else
57 #define CMK_BROADCAST_SPANNING_TREE    1
58 #define CMK_BROADCAST_HYPERCUBE        0
59 #endif /* CMK_SMP */
60
61 #define BROADCAST_SPANNING_FACTOR      4
62
63 #define MAX_OUTSTANDING  1024
64 #define MAX_POSTED 64
65 #define MAX_QLEN 1024
66 #define MAX_BYTES 1000000
67
68 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
69 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
70
71 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
72 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
73
74 /* FIXME: need a random number that everyone agrees ! */
75 #define CHARM_MAGIC_NUMBER               126
76
77 #if !CMK_OPTIMIZE
78 static int checksum_flag = 0;
79 extern "C" unsigned char computeCheckSum(unsigned char *data, int len);
80
81 #define CMI_SET_CHECKSUM(msg, len)      \
82         if (checksum_flag)  {   \
83           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
84           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
85         }
86
87 #define CMI_CHECK_CHECKSUM(msg, len)    \
88         if (checksum_flag)      \
89           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
90             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \
91             for(int count = 0; count < len; count++) { \
92                 printf("%2x", msg[count]);                 \
93             }                                             \
94             printf("------------------------------\n\n"); \
95             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
96           }
97 #else
98 #define CMI_SET_CHECKSUM(msg, len)
99 #define CMI_CHECK_CHECKSUM(msg, len)
100 #endif
101
102 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
103
104 #if CMK_BROADCAST_HYPERCUBE
105 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
106 #else
107 #  define CMI_SET_CYCLE(msg, cycle)
108 #endif
109
110 int               _Cmi_numpes;
111 int               _Cmi_mynode;    /* Which address space am I */
112 int               _Cmi_mynodesize;/* Number of processors in my address space */
113 int               _Cmi_numnodes;  /* Total number of address spaces */
114 static int        Cmi_nodestart; /* First processor in this address space */
115 CpvDeclare(void*, CmiLocalQueue);
116
117 int               idleblock = 0;
118
119 #include "machine-smp.c"
120 CsvDeclare(CmiNodeState, NodeState);
121 #include "immediate.c"
122
123 static inline void AdvanceCommunications(int max_out = MAX_OUTSTANDING);
124 static int outstanding_recvs = 0;
125
126
127 #if !CMK_SMP
128 /************ non SMP **************/
129 static struct CmiStateStruct Cmi_state;
130 int _Cmi_mype;
131 int _Cmi_myrank;
132
133 void CmiMemLock(void) {}
134 void CmiMemUnlock(void) {}
135
136 #define CmiGetState() (&Cmi_state)
137 #define CmiGetStateN(n) (&Cmi_state)
138
139 void CmiYield(void) { sleep(0); }
140
141 static void CmiStartThreads(char **argv)
142 {
143   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
144   _Cmi_mype = Cmi_nodestart;
145   _Cmi_myrank = 0;
146 }
147 #endif  /* !CMK_SMP */
148
149 int received_immediate = 0;
150
151 /*Add a message to this processor's receive queue, pe is a rank */
152 static void CmiPushPE(int pe,void *msg)
153 {
154   CmiState cs = CmiGetStateN(pe);
155   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
156 #if CMK_IMMEDIATE_MSG
157   if (CmiIsImmediate(msg)) {
158     /**(CmiUInt2 *)msg = pe;*/
159     received_immediate = 1;
160     CMI_DEST_RANK(msg) = pe;
161     CmiPushImmediateMsg(msg);
162     return;
163   }
164 #endif
165 #if CMK_SMP
166   CmiLock(procState[pe].recvLock);
167 #endif
168
169   PCQueuePush(cs->recv,(char *)msg);
170   //printf("%d: {%d} PCQueue length = %d, msg = %x\n", CmiMyPe(), bgx_in_interrupt, PCQueueLength(cs->recv), msg);
171   
172 #if CMK_SMP
173   CmiUnlock(procState[pe].recvLock);
174 #endif
175   CmiIdleLock_addMessage(&cs->idle);
176   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
177 }
178
179 #if CMK_NODE_QUEUE_AVAILABLE
180 /*Add a message to this processor's receive queue */
181 static void CmiPushNode(void *msg)
182 {
183   MACHSTATE(3,"Pushing message into NodeRecv queue");
184 #if CMK_IMMEDIATE_MSG
185   if (CmiIsImmediate(msg)) {
186     CMI_DEST_RANK(msg) = 0;
187     CmiPushImmediateMsg(msg);
188     return;
189   }
190 #endif
191   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
192   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
193   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
194   {
195   CmiState cs=CmiGetStateN(0);
196   CmiIdleLock_addMessage(&cs->idle);
197   }
198 }
199 #endif /* CMK_NODE_QUEUE_AVAILABLE */
200
201 #ifndef CmiMyPe
202 int CmiMyPe(void)
203 {
204   return CmiGetState()->pe;
205 }
206 #endif
207
208 #ifndef CmiMyRank
209 int CmiMyRank(void)
210 {
211   return CmiGetState()->rank;
212 }
213 #endif
214
215
216 static int msgQueueLen = 0;
217 static int msgQBytes = 0;
218 static int numPosted = 0;
219
220 static int request_max;
221 static int maxMessages;
222 static int maxBytes;
223 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
224 static int progress_cycles;
225
226 static int Cmi_dim;     /* hypercube dim of network */
227
228 static char     **Cmi_argv;
229 static char     **Cmi_argvcopy;
230 static CmiStartFn Cmi_startfn;   /* The start function */
231 static int        Cmi_usrsched;  /* Continue after start function finishes? */
232
233 extern "C" void ConverseCommonInit(char **argv);
234 extern "C" void ConverseCommonExit(void);
235 extern "C" void CthInit(char **argv);
236
237 static inline void SendMsgsUntil(int, int);
238 static void ConverseRunPE(int everReturn);
239 static void CommunicationServer(int sleepTime);
240 static void CommunicationServerThread(int sleepTime);
241
242 void SendSpanningChildren(int size, char *msg);
243 void SendHypercube(int size, char *msg);
244
245 typedef struct msg_list {
246   BG2S_t             send;
247   BGQuad             info;
248   BGML_Callback_t    cb;
249   char              *msg;
250   int                size;
251   int                destpe;
252   int               *pelist;
253
254 #if CMK_PERSISTENT_COMM
255   PersistentHandle phs;
256   int phscount;
257   int phsSize;
258 #endif
259 } SMSG_LIST;
260
261
262 typedef struct {
263   int sleepMs; /*Milliseconds to sleep while idle*/
264   int nIdles; /*Number of times we've been idle in a row*/
265   CmiState cs; /*Machine state*/
266 } CmiIdleState;
267
268 static CmiIdleState *CmiNotifyGetState(void)
269 {
270   CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
271   s->sleepMs=0;
272   s->nIdles=0;
273   s->cs=CmiGetState();
274   return s;
275 }
276
277 typedef struct ProcState {
278 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
279 CmiNodeLock  recvLock;              /* for cs->recv */
280 } ProcState;
281
282 static ProcState  *procState;
283
284 /* send done callback: sets the smsg entry to done */
285 static void send_done(void *data){
286   SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
287   CmiFree(msg_tmp->msg);
288   msgQBytes -= msg_tmp->size;
289   free(data);
290
291   msgQueueLen--;
292   numPosted --;
293 }
294
295 /* send done callback: sets the smsg entry to done */
296 static void send_multi_done(void *data){
297   SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
298   CmiFree(msg_tmp->msg);
299   msgQBytes -= msg_tmp->size;
300   CmiFree(msg_tmp->pelist);
301
302   free(data);
303
304   msgQueueLen--;
305   numPosted --;
306 }
307
308
309 //Called on receiving a persistent message
310 void persist_recv_done(void *clientdata) {
311
312   char *msg = (char *) clientdata;
313   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
314
315   ///  printf("[%d] persistent receive of size %d\n", CmiMyPe(), sndlen);
316   
317   //Cannot broadcast Persistent message
318
319   CMI_CHECK_CHECKSUM(msg, sndlen);
320   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
321     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
322     return;
323   }
324   
325   CmiReference(msg);
326
327 #if CMK_NODE_QUEUE_AVAILABLE
328   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
329     CmiPushNode(msg);
330   else
331 #endif
332     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
333 }
334
335 /* recv done callback: push the recved msg to recv queue */
336 static void recv_done(void *clientdata){
337
338   char *msg = (char *) clientdata;
339   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
340   
341   /* then we do what PumpMsgs used to do:
342    * push msg to recv queue */
343   
344   CMI_CHECK_CHECKSUM(msg, sndlen);
345   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
346     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
347     return;
348   }
349
350 #if CMK_NODE_QUEUE_AVAILABLE
351   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
352     CmiPushNode(msg);
353   else
354 #endif
355     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
356
357 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
358   if(CMI_BROADCAST_ROOT(msg) != 0) {
359     //printf ("%d: Receiving bcast message %d bytes\n", CmiMyPe(), sndlen);
360     PCQueuePush(broadcast_q, msg);
361   }
362 #endif
363
364
365   //rts_dcache_evict_normal();
366
367   outstanding_recvs --;
368 }
369
370
371 void     short_pkt_recv (const BGQuad     * info,
372                          unsigned           senderrank,
373                          const char       * buffer,
374                          const unsigned     sndlen)
375 {
376   outstanding_recvs ++;
377   int alloc_size = sndlen;
378   
379   //printf ("%d: Receiving short message %d bytes\n", CmiMyPe(), sndlen);
380   
381   char * new_buffer = (char *)CmiAlloc(alloc_size);
382   memcpy (new_buffer, buffer, sndlen);
383   recv_done (new_buffer);
384 }
385
386
387 BG2S_t * first_pkt_recv_done (const BGQuad      * info,
388                               unsigned            senderrank,
389                               const unsigned      sndlen,
390                               unsigned          * rcvlen,
391                               char             ** buffer,
392                               BGML_Callback_t   * cb 
393                               )
394 {
395   outstanding_recvs ++;
396   int alloc_size = sndlen + sizeof(BG2S_t) + 16;
397   
398   //printf ("%d: {%d} Receiving message %d bytes\n", CmiMyPe(), bgx_in_interrupt, sndlen);
399
400   /* printf ("Receiving %d bytes\n", sndlen); */
401   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
402                                    return NULL */
403
404   *buffer = (char *)CmiAlloc(alloc_size);
405   cb->function = recv_done;
406   cb->clientdata = *buffer;
407
408   return (BG2S_t *) ALIGN_16(*buffer + sndlen);
409 }
410
411
412 inline void sendBroadcastMessages() {
413   while(!PCQueueEmpty(broadcast_q)) {
414     char *msg = (char *) PCQueuePop(broadcast_q);
415
416 #if CMK_BROADCAST_SPANNING_TREE
417     SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
418 #elif CMK_BROADCAST_HYPERCUBE
419     SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
420 #endif
421     
422     //CMI_CHECK_CHECKSUM(msg, (((CmiMsgHeaderBasic *) msg)->size));
423   }
424 }
425
426 unsigned int *ranklist;
427
428 #include "bgltorus.h"
429 CpvDeclare(BGLTorusManager*, tmanager);
430
431 BGTsC_t        barrier;
432
433 CpvDeclare(unsigned, networkProgressCount);
434 int  networkProgressPeriod;
435
436 // -----------------------------------------
437 // Rectangular broadcast implementation 
438 // -----------------------------------------
439
440 #define MAX_COMM  256
441 static void * comm_table [MAX_COMM];
442
443 typedef struct rectbcast_msg {
444   BGTsRC_t           request;
445   BGML_Callback_t    cb;
446   char              *msg;
447 } RectBcastInfo;
448
449
450 static void bcast_done (void *data) {  
451   RectBcastInfo *rinfo = (RectBcastInfo *) data;  
452   CmiFree (rinfo->msg);
453   free (rinfo);
454 }
455
456 static  void *   getRectBcastRequest (unsigned comm) {
457   return comm_table [comm];
458 }
459
460
461 static  void *  bcast_recv     (unsigned               root,
462                                 unsigned               comm,
463                                 const unsigned         sndlen,
464                                 unsigned             * rcvlen,
465                                 char                ** rcvbuf,
466                                 BGML_Callback_t      * const cb) {
467   
468   int alloc_size = sndlen + sizeof(BGTsRC_t) + 16;
469   
470   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
471                                    return NULL */
472   
473   *rcvbuf       =  (char *)CmiAlloc(alloc_size);
474   cb->function  =   recv_done;
475   cb->clientdata = *rcvbuf;
476
477   return (BGTsRC_t *) ALIGN_16 (*rcvbuf + sndlen);
478   
479 }
480
481
482 extern "C"  
483 void bgl_machine_RectBcast (unsigned                 commid,
484                             const char             * sndbuf,
485                             unsigned                 sndlen) 
486 {
487   RectBcastInfo *rinfo  =   (RectBcastInfo *) malloc (sizeof(RectBcastInfo));   
488   rinfo->cb.function    =   bcast_done;
489   rinfo->cb.clientdata  =   rinfo;
490   
491   BGTsRC_AsyncBcast_start (commid, &rinfo->request, &rinfo->cb, sndbuf, sndlen);
492   
493 }
494
495 extern "C" 
496 void        bgl_machine_RectBcastInit  (unsigned               commID,
497                                         const BGTsRC_Geometry_t* geometry) {
498   
499   CmiAssert (commID < 256);
500   CmiAssert (comm_table [commID] == NULL);
501   
502   BGTsRC_t *request =  (BGTsRC_t *) malloc (sizeof (BGTsRC_t));
503   comm_table [commID] = request;
504   
505   BGTsRC_AsyncBcast_init  (request, commID,  geometry);
506 }
507
508
509
510
511 //--------------------------------------------------------------
512 //----- End Rectangular Broadcast Implementation ---------------
513 //--------------------------------------------------------------
514
515
516
517 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret){
518   int n, i;
519   
520   BGML_Messager_Init();
521   BG2S_Configure (short_pkt_recv, first_pkt_recv_done, NULL);
522   
523   //BGTr_Configure ( 1 /* use_coprocessor  */, 
524   //               1 /* protocolTreshold */, 
525   //               1 /* highPrecision    */); 
526   
527   BGTsC_Configure (NULL, getRectBcastRequest, bcast_recv);
528
529   _Cmi_numnodes = BGML_Messager_size();
530   _Cmi_mynode = BGML_Messager_rank();
531   
532   message_q = PCQueueCreate();
533   broadcast_q = PCQueueCreate();
534
535   unsigned rank = BGML_Messager_rank();
536   unsigned size = BGML_Messager_size();
537   
538   //vnpeer = BGML_Messager_vnpeerrank();
539   
540 #if 1
541   ranklist = (unsigned int *)malloc (size * sizeof(int));
542   for(int count = 0; count < size; count ++)
543     ranklist[count] = count;
544   
545   /* global barrier initialize */
546   BGTsC_Barrier_init (&barrier, 0, size, ranklist);
547 #endif
548   
549   CmiBarrier();    
550   CmiBarrier();    
551   CmiBarrier();    
552
553   /* processor per node */
554   _Cmi_mynodesize = 1;
555   CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
556 #if ! CMK_SMP
557   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
558     CmiAbort("+ppn cannot be used in non SMP version!\n");
559 #endif
560
561   idleblock = CmiGetArgFlag(argv, "+idleblocking");
562   if (idleblock && _Cmi_mynode == 0) {
563     CmiPrintf("Charm++: Running in idle blocking mode.\n");
564   }
565
566 #if CMK_NO_OUTSTANDING_SENDS
567   no_outstanding_sends=1;
568 #endif
569   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
570     no_outstanding_sends = 1;
571     if (_Cmi_mynode == 0)
572       CmiPrintf("Charm++: Will%s consume outstanding sends in scheduler loop\n",        no_outstanding_sends?"":" not");
573   }
574   
575   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
576   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
577   Cmi_argvcopy = CmiCopyArgs(argv);
578   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
579
580   CpvInitialize(BGLTorusManager*, tmanager);
581   CpvAccess(tmanager) = new BGLTorusManager();
582
583
584   /* find dim = log2(numpes), to pretend we are a hypercube */
585   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
586     Cmi_dim++ ;
587
588   request_max=MAX_POSTED;
589   CmiGetArgInt(argv,"+requestmax",&request_max);
590  
591   maxMessages = MAX_QLEN;
592   maxBytes = MAX_BYTES;
593
594   /* checksum flag */
595   if (CmiGetArgFlag(argv,"+checksum")) {
596 #if !CMK_OPTIMIZE
597     checksum_flag = 1;
598     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
599 #else
600     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
601 #endif
602   }
603
604   CsvInitialize(CmiNodeState, NodeState);
605   CmiNodeStateInit(&CsvAccess(NodeState));
606
607   procState = (ProcState *)CmiAlloc((_Cmi_mynodesize+1) * sizeof(ProcState));
608   for (i=0; i<_Cmi_mynodesize+1; i++) {
609 /*    procState[i].sendMsgBuf = PCQueueCreate();   */
610     procState[i].recvLock = CmiCreateLock();
611   }
612
613   /* Network progress function is used to poll the network when for
614      messages. This flushes receive buffers on some  implementations*/
615   networkProgressPeriod = PROGRESS_PERIOD;
616   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
617   
618   progress_cycles = PROGRESS_CYCLES;
619   CmiGetArgInt(argv, "+progressCycles", &progress_cycles);
620
621   CmiStartThreads(argv);
622   ConverseRunPE(initret);
623 }
624
625 static void ConverseRunPE(int everReturn)
626 {
627   CmiIdleState *s=CmiNotifyGetState();
628   CmiState cs;
629   char** CmiMyArgv;
630   CmiNodeAllBarrier();
631   cs = CmiGetState();
632   CpvInitialize(void *,CmiLocalQueue);
633   CpvAccess(CmiLocalQueue) = cs->localqueue;
634
635   if (CmiMyRank())
636     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
637   else   
638     CmiMyArgv=Cmi_argv;
639     
640   CthInit(CmiMyArgv);
641
642   ConverseCommonInit(CmiMyArgv);
643
644   /* initialize the network progress counter*/
645   /* Network progress function is used to poll the network when for
646      messages. This flushes receive buffers on some  implementations*/
647   CpvInitialize(int , networkProgressCount);
648   CpvAccess(networkProgressCount) = 0;
649
650 #if CMK_SMP
651   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
652   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
653 #else
654   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
655 #endif
656
657 #if MACHINE_DEBUG_LOG
658   if (CmiMyRank() == 0) {
659     char ln[200];
660     sprintf(ln,"debugLog.%d",CmiMyNode());
661     debugLog=fopen(ln,"w");
662   }
663 #endif
664
665   CmiBarrier();
666
667   /* Converse initialization finishes, immediate messages can be processed.
668      node barrier previously should take care of the node synchronization */
669   _immediateReady = 1;
670
671   /* communication thread */
672   if (CmiMyRank() == CmiMyNodeSize()) {
673     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
674     while (1) CommunicationServerThread(5);
675   }
676   else {  /* worker thread */
677     if (!everReturn) {
678       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
679       if (Cmi_usrsched==0) CsdScheduler(-1);
680       ConverseExit();
681     }
682   }
683 }
684
685 #if CMK_SMP
686 static int inexit = 0;
687
688 static int MsgQueueEmpty()
689 {
690   int i;
691 #if 0
692   for (i=0; i<_Cmi_mynodesize; i++)
693     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
694 #else
695   return PCQueueEmpty(sendMsgBuf);
696 #endif
697   return 1;
698 }
699
700 static int SendMsgBuf();
701
702 /* test if all processors recv queues are empty */
703 static int RecvQueueEmpty()
704 {
705   int i;
706   for (i=0; i<_Cmi_mynodesize; i++) {
707     CmiState cs=CmiGetStateN(i);
708     if (!PCQueueEmpty(cs->recv)) return 0;
709   }
710   return 1;
711 }
712
713 static void CommunicationServer(int sleepTime){
714   int static count=0;
715
716   SendMsgBuf(); 
717   AdvanceCommunications();
718 /*
719   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
720 */
721   if (inexit == CmiMyNodeSize()) {
722     MACHSTATE(2, "CommunicationServer exiting {");
723 #if 0
724     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
725 #endif
726     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
727       SendMsgBuf(); 
728       AdvanceCommunications();
729     }
730 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
731     if (CmiMyNode() == 0){
732       CmiPrintf("End of program\n");
733     }
734 #endif
735     MACHSTATE(2, "} CommunicationServer EXIT");
736     exit(0);   
737   }
738 }
739 #endif /* CMK_SMP */
740
741 static void CommunicationServerThread(int sleepTime){
742 #if CMK_SMP
743   CommunicationServer(sleepTime);
744 #endif
745 #if CMK_IMMEDIATE_MSG
746   CmiHandleImmediate();
747 #endif
748 }
749
750 extern "C" void BGML_Messager_dumpTimers();
751
752 void ConverseExit(void){
753   /* #if ! CMK_SMP */
754
755   while(msgQueueLen > 0 || outstanding_recvs > 0) {
756     AdvanceCommunications();
757   }
758
759   CmiBarrier();
760
761   ConverseCommonExit();  
762
763   if(CmiMyPe()%101 == 0)
764     BGML_Messager_dumpTimers();
765   
766   CmiFree(procState);
767   
768 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
769   if (CmiMyPe() == 0){
770     CmiPrintf("End of program\n");
771   }
772 #endif
773   
774   exit(0);
775 }
776
777 /* exit() called on any node would abort the whole program */
778 void CmiAbort(const char * message){
779   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
780         "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),msgQueueLen,outstanding_recvs,message);
781   CmiPrintStackTrace(0);
782
783   while(msgQueueLen > 0 || outstanding_recvs > 0) {
784     AdvanceCommunications();
785   }
786   
787   CmiBarrier(); 
788   exit(-1);
789 }
790
791 void *CmiGetNonLocal(){
792   static int count=0;
793   CmiState cs = CmiGetState();
794
795   void *msg = NULL;
796   CmiIdleLock_checkMessage(&cs->idle);
797   /* although it seems that lock is not needed, I found it crashes very often
798      on mpi-smp without lock */
799
800   AdvanceCommunications();
801   
802   //int length = PCQueueLength(cs->recv);
803   //if(length != 0)
804   //printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
805   
806   CmiLock(procState[cs->rank].recvLock);
807
808   //if(length > 0)
809   msg =  PCQueuePop(cs->recv); 
810   CmiUnlock(procState[cs->rank].recvLock);
811
812 #if !CMK_SMP
813   if (no_outstanding_sends) {
814     SendMsgsUntil(0, 0);
815   }
816   
817   if(!msg) {
818     AdvanceCommunications();
819     
820     //int length = PCQueueLength(cs->recv);
821     //if(length != 0)
822     //    printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
823
824     //if(length > 0)
825     return PCQueuePop(cs->recv);
826   }
827 #endif /* !CMK_SMP */
828   return msg;
829 }
830
831 static void CmiSendSelf(char *msg){
832 #if CMK_IMMEDIATE_MSG
833     if (CmiIsImmediate(msg)) {
834       /* CmiBecomeNonImmediate(msg); */
835       CmiPushImmediateMsg(msg);
836       CmiHandleImmediate();
837       return;
838     }
839 #endif
840     CQdCreate(CpvAccess(cQdState), 1);
841     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
842 }
843
844
845 #if CMK_PERSISTENT_COMM
846 #include "persistent.C"
847 #endif
848
849 inline void machineSend(SMSG_LIST *msg_tmp) {
850   
851   CQdCreate(CpvAccess(cQdState), 1);
852   numPosted ++;
853
854   if(msg_tmp->destpe == CmiMyPe())
855     CmiAbort("Sending to self\n");
856   
857 #if CMK_PERSISTENT_COMM
858   if(msg_tmp->phs) {
859     if(machineSendPersistentMsg(msg_tmp))
860       return;
861   }
862 #endif
863   
864   CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumPes());
865   msg_tmp->cb.function     =   send_done;
866   msg_tmp->cb.clientdata   =   msg_tmp;
867
868   BG2S_UnOrdered_Send (&msg_tmp->send, &msg_tmp->cb, &msg_tmp->info, 
869                        msg_tmp->msg, msg_tmp->size, msg_tmp->destpe);    
870 }
871
872 static inline void sendQueuedMessages() {
873   while(numPosted <= request_max && !PCQueueEmpty(message_q)) {
874     SMSG_LIST *msg_tmp = (SMSG_LIST *)PCQueuePop(message_q);    
875     machineSend(msg_tmp);
876   }
877 }
878
879
880 static int enableBatchSends = 1;
881
882 /* The general free send function
883  * Send is synchronous, and free msg after posted
884  */
885 inline void  CmiGeneralFreeSend(int destPE, int size, char* msg){
886   
887   CmiState cs = CmiGetState();
888   if(destPE==cs->pe){
889     CmiSendSelf(msg);
890     return;
891   }
892   
893   if(numPosted >= 16 /*|| !enableBatchSends*/)
894     SendMsgsUntil(maxMessages, maxBytes);
895   
896   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
897   msg_tmp->destpe = destPE;
898   msg_tmp->size = size;
899   msg_tmp->msg = msg;
900   
901 #if CMK_PERSISTENT_COMM
902   msg_tmp->phs = phs;
903   msg_tmp->phscount = phscount;
904   msg_tmp->phsSize = phsSize;
905 #endif
906
907   sendQueuedMessages();
908   
909   if(numPosted > request_max) {
910     PCQueuePush(message_q, (char *)msg_tmp);
911   }    
912   else 
913     machineSend(msg_tmp);    
914   
915   msgQBytes += size;
916   msgQueueLen++;
917   
918 }
919
920 extern "C"
921 void BG2S_AEMulticast (BG2S_t                 * sender,
922                        const BGML_Callback_t  * cb_info,
923                        const BGQuad           * msginfo,
924                        const char             * sndbuf,
925                        unsigned                 sndlen,
926                        unsigned               * destranks,
927                        unsigned                 nranks);
928
929
930 /* Multicast message to a list of destinations
931  */
932 inline void  machineMulticast(int npes, int *pelist, int size, char* msg){
933   
934   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
935   msg_tmp->destpe    = -1;      //multicast operation
936   msg_tmp->size      = size * npes;
937   msg_tmp->msg       = msg;
938   msg_tmp->pelist    = pelist;
939   msgQBytes         += size * npes;   
940   msgQueueLen ++;
941   
942   //Interrupt code to be implemented later
943   
944   msg_tmp->cb.function     =   send_multi_done;
945   msg_tmp->cb.clientdata  =   msg_tmp;
946   
947   BG2S_AEMulticast (&msg_tmp->send, &msg_tmp->cb, &msg_tmp->info, 
948                     msg_tmp->msg, msg_tmp->size, (unsigned *)pelist, npes); 
949 }
950
951 void CmiSyncSendFn(int destPE, int size, char *msg){
952   char *copymsg;
953   copymsg = (char *)CmiAlloc(size);
954   memcpy(copymsg,msg,size);
955   CmiFreeSendFn(destPE,size,copymsg);
956 }
957
958 void CmiFreeSendFn(int destPE, int size, char *msg){
959   CMI_SET_BROADCAST_ROOT(msg,0);
960   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
961   ((CmiMsgHeaderBasic *)msg)->size = size;  
962   CMI_SET_CHECKSUM(msg, size);
963
964   CmiGeneralFreeSend(destPE,size,msg);
965 }
966
967 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
968 void CmiSyncSendFn1(int destPE, int size, char *msg)
969 {
970   char *copymsg;
971   copymsg = (char *)CmiAlloc(size);
972   memcpy(copymsg, msg, size);
973
974   //  asm volatile("sync" ::: "memory");
975
976   CMI_MAGIC(copymsg) = CHARM_MAGIC_NUMBER;
977   ((CmiMsgHeaderBasic *)copymsg)->size = size;  
978   CMI_SET_CHECKSUM(copymsg, size);
979   
980   CmiGeneralFreeSend(destPE,size,copymsg);
981 }
982
983 /* send msg to its spanning children in broadcast. G. Zheng */
984 void SendSpanningChildren(int size, char *msg)
985 {
986   CmiState cs = CmiGetState();
987   int startpe = CMI_BROADCAST_ROOT(msg)-1;
988   int i;
989
990   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
991   int dist = cs->pe-startpe;
992   if(dist<0) dist+=_Cmi_numpes;
993   for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
994     int p = BROADCAST_SPANNING_FACTOR*dist + i;
995     if (p > _Cmi_numpes - 1) break;
996     p += startpe;
997     p = p%_Cmi_numpes;
998     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
999     CmiSyncSendFn1(p, size, msg);
1000   }
1001 }
1002
1003 /* send msg along the hypercube in broadcast. (Sameer) */
1004 void SendHypercube(int size, char *msg)
1005 {
1006   CmiState cs = CmiGetState();
1007   int curcycle = CMI_GET_CYCLE(msg);
1008   int i;
1009
1010   double logp = CmiNumPes();
1011   logp = log(logp)/log(2.0);
1012   logp = ceil(logp);
1013   
1014   /*  CmiPrintf("In hypercube\n"); */
1015   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
1016
1017   for (i = curcycle; i < logp; i++) {
1018     int p = cs->pe ^ (1 << i);
1019     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
1020     if(p < CmiNumPes()) {
1021       CMI_SET_CYCLE(msg, i + 1);
1022
1023       CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1024       CmiSyncSendFn1(p, size, msg);
1025     }
1026   }
1027 }
1028
1029 void CmiSyncBroadcastFn(int size, char *msg){
1030   char *copymsg;
1031   copymsg = (char *)CmiAlloc(size);
1032   memcpy(copymsg,msg,size);
1033   CmiFreeBroadcastFn(size,copymsg);
1034 }
1035
1036 void CmiFreeBroadcastFn(int size, char *msg){
1037   
1038   //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1039   /*
1040   printf("------------------------------\n\nSender %d :", CmiMyPe());
1041   
1042   for(int count = 0; count < size; count++) {
1043     printf("%2x", msg[count]);
1044   } 
1045   
1046   printf("------------------------------\n\n");
1047   */
1048
1049
1050   CmiState cs = CmiGetState();
1051 #if CMK_BROADCAST_SPANNING_TREE
1052   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1053   SendSpanningChildren(size, msg);
1054   CmiFree(msg);
1055 #elif CMK_BROADCAST_HYPERCUBE
1056   CMI_SET_CYCLE(msg, 0);
1057   SendHypercube(size, msg);
1058   CmiFree(msg);
1059 #else
1060   int i;
1061
1062   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
1063     CmiSyncSendFn(i,size,msg);
1064   
1065   for ( i=0; i<cs->pe; i++ ) 
1066     CmiSyncSendFn(i,size,msg);
1067
1068   CmiFree(msg);
1069 #endif
1070   //SendMsgsUntil(0,0);
1071 }
1072
1073 void CmiSyncBroadcastAllFn(int size, char *msg){
1074   char *copymsg;
1075   copymsg = (char *)CmiAlloc(size);
1076   memcpy(copymsg,msg,size);
1077   CmiFreeBroadcastAllFn(size,copymsg);
1078 }
1079
1080 void CmiFreeBroadcastAllFn(int size, char *msg){
1081   
1082   //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1083
1084   CmiState cs = CmiGetState();
1085 #if CMK_BROADCAST_SPANNING_TREE
1086   CmiSyncSendFn(cs->pe,size,msg);
1087   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1088   SendSpanningChildren(size, msg);
1089   CmiFree(msg);
1090 #elif CMK_BROADCAST_HYPERCUBE
1091   CmiSyncSendFn(cs->pe,size,msg);
1092   CMI_SET_CYCLE(msg, 0);
1093   SendHypercube(size, msg);
1094   CmiFree(msg);
1095 #else
1096   int i ;
1097
1098   CmiSyncSendFn(CmiMyPe(), size, msg);
1099   
1100   for ( i=0; i<_Cmi_numpes; i++ ) {
1101     if(i== CmiMyPe())
1102       continue;
1103
1104     CmiSyncSendFn(i,size,msg);
1105   }
1106   CmiFree(msg);
1107 #endif
1108   
1109   //SendMsgsUntil(0,0);
1110 }
1111
1112 static unsigned long long lastProgress = 0;
1113
1114 static inline void AdvanceCommunications(int max_out){
1115   sendBroadcastMessages();
1116
1117   while(msgQueueLen > maxMessages && msgQBytes > maxBytes){
1118     while(BGML_Messager_advance()>0) ;
1119     sendQueuedMessages();
1120   }
1121   
1122   int target = outstanding_recvs - max_out;
1123   while(target > 0){
1124     while(BGML_Messager_advance()>0) ;
1125     target = outstanding_recvs - max_out;
1126   }
1127   
1128   while(BGML_Messager_advance()>0);
1129
1130   sendBroadcastMessages();
1131
1132 #if CMK_IMMEDIATE_MSG
1133   if(received_immediate) {
1134     CmiHandleImmediate();
1135   }
1136   received_immediate = 0;
1137 #endif
1138   
1139   sendQueuedMessages();
1140   //while(BGML_Messager_advance()>0);
1141 }
1142
1143 static inline void SendMsgsUntil(int targetm, int targetb){
1144
1145   sendBroadcastMessages();
1146   
1147   while(msgQueueLen>targetm && msgQBytes > targetb){
1148     while(BGML_Messager_advance()>0) ;
1149     sendQueuedMessages();
1150   }
1151   
1152   while ( BGML_Messager_advance() > 0 );
1153   
1154   sendBroadcastMessages();
1155 }
1156
1157 void CmiNotifyIdle(){  
1158   AdvanceCommunications();
1159
1160   CmiState cs = CmiGetStateN(pe);
1161   //int length = PCQueueLength(cs->recv);
1162   //if(length != 0)
1163   //printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
1164 }
1165
1166 static void CmiNotifyBeginIdle(CmiIdleState *s)
1167 {
1168   s->sleepMs=0;
1169   s->nIdles=0;
1170 }
1171
1172 static void CmiNotifyStillIdle(CmiIdleState *s)
1173
1174 #if ! CMK_SMP
1175   AdvanceCommunications();
1176 #else
1177 /*  CmiYield();  */
1178 #endif
1179
1180 #if 1
1181   {
1182   int nSpins=20; /*Number of times to spin before sleeping*/
1183   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1184   s->nIdles++;
1185   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1186     s->sleepMs+=2;
1187     if (s->sleepMs>10) s->sleepMs=10;
1188   }
1189   /*Comm. thread will listen on sockets-- just sleep*/
1190   if (s->sleepMs>0) {
1191     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1192     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1193     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1194   }       
1195   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1196   }
1197 #endif
1198 }
1199
1200 /*==========================================================*/
1201 /*==========================================================*/
1202 /*==========================================================*/
1203
1204 /************ Recommended routines ***********************/
1205 /************ You dont have to implement these but they are supported
1206  in the converse syntax and some rare programs may crash. But most
1207  programs dont need them. *************/
1208
1209 CmiCommHandle CmiAsyncSendFn(int, int, char *){
1210   CmiAbort("CmiAsyncSendFn not implemented.");
1211   return (CmiCommHandle) 0;
1212 }
1213 CmiCommHandle CmiAsyncBroadcastFn(int, char *){
1214   CmiAbort("CmiAsyncBroadcastFn not implemented.");
1215   return (CmiCommHandle) 0;
1216 }
1217 CmiCommHandle CmiAsyncBroadcastAllFn(int, char *){
1218   CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1219   return (CmiCommHandle) 0;
1220 }
1221
1222 int           CmiAsyncMsgSent(CmiCommHandle handle){
1223   CmiAbort("CmiAsyncMsgSent not implemented.");
1224   return 0;
1225 }
1226 void          CmiReleaseCommHandle(CmiCommHandle handle){
1227   CmiAbort("CmiReleaseCommHandle not implemented.");
1228 }
1229
1230
1231 /*==========================================================*/
1232 /*==========================================================*/
1233 /*==========================================================*/
1234
1235 /* Optional routines which could use common code which is shared with
1236    other machine layer implementations. */
1237
1238 /* MULTICAST/VECTOR SENDING FUNCTIONS
1239
1240  * In relations to some flags, some other delivery functions may be needed.
1241  */
1242
1243 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1244
1245 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg){
1246   char *copymsg;
1247   copymsg = (char *)CmiAlloc(size);
1248   memcpy(copymsg,msg,size);
1249   CmiFreeListSendFn(npes, pes, size, msg);
1250 }
1251
1252 static int iteration_count = 0;
1253
1254 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1255   CMI_SET_BROADCAST_ROOT(msg,0);  
1256   CMI_MAGIC(msg) = CHARM_MAGIC_NUMBER;
1257   ((CmiMsgHeaderBasic *)msg)->size = size;  
1258   CMI_SET_CHECKSUM(msg, size);
1259   
1260   //printf("%d: In Free List Send Fn\n", CmiMyPe());
1261   
1262   CmiBecomeImmediate(msg);
1263   iteration_count ++;
1264   /*
1265   if(iteration_count == 9500) {
1266     
1267     char pbuf[10000];
1268     char tbuf[64];
1269
1270     sprintf(pbuf, "PE %d sending %d messages of size %d to :", CmiMyPe(), npes, size);
1271     for(int pcount = 0; pcount < npes; pcount++) {
1272       sprintf(tbuf," %d,", pes[pcount]);
1273       strcat(pbuf, tbuf);
1274     }
1275
1276     printf("\n%s\n", pbuf);
1277   }
1278   */
1279
1280   int new_npes = 0;
1281
1282   int i, count = 0, my_loc = -1;
1283   for(i=0; i<npes; i++) {
1284     if(pes[i] == CmiMyPe() || pes[i] == vnpeer) {
1285       CmiSyncSend(pes[i], size, msg);
1286       my_loc = i;
1287     }
1288   }
1289   
1290 #if 0
1291   int *pelist  = (int *) CmiAlloc (sizeof(int) * npes);
1292   if(iteration_count > 700) {
1293
1294     CmiAssert (vnpeer < 0);
1295     
1296     if(my_loc >= 0) {
1297       for(i=0; i<my_loc; i++) {
1298         int loc = (19*my_loc + 7 * (i+1))% (npes-1);
1299         pelist[loc] = pes[i];
1300       }
1301       
1302       for(i ++; i < npes; i ++) {
1303         int loc = (19*my_loc + 7 * i) % (npes-1);
1304         pelist[loc] = pes[i];
1305       }
1306
1307       new_npes = npes - 1;
1308     }
1309     else {
1310       for(i=0; i<npes; i++) {
1311         int loc = (19 + 7 * (i+1)) % npes;
1312         pelist[i] = pes[i];
1313       }
1314       new_npes = npes;
1315     }
1316     
1317     if(new_npes <= request_max) {
1318       machineMulticast (new_npes, pelist, size, msg);
1319       //SendMsgsUntil(0,0);
1320       AdvanceCommunications();
1321       return;
1322     }
1323   }
1324   CmiFree (pelist);
1325 #endif  
1326
1327
1328   for(i=0;i<npes;i++) {
1329     if(pes[i] == CmiMyPe() || pes[i] == vnpeer);
1330     else if(i < npes - 1){
1331       CmiReference(msg);
1332       CmiGeneralFreeSend(pes[i], size, msg);
1333       
1334       //CmiSyncSend(pes[i], size, msg);
1335     }
1336     
1337 #if CMK_PERSISTENT_COMM
1338     if(phs) 
1339       phscount ++;
1340 #endif
1341   }
1342   
1343   if (npes  && (pes[npes-1] != CmiMyPe() && pes[npes-1] != vnpeer))
1344     CmiSyncSendAndFree(pes[npes-1], size, msg);
1345   else 
1346     CmiFree(msg);
1347   
1348   phscount = 0;
1349   
1350   //  SendMsgsUntil(0,0);
1351   SendMsgsUntil(maxMessages, maxBytes);
1352   AdvanceCommunications();
1353 }
1354
1355 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg){
1356   CmiAbort("CmiAsyncListSendFn not implemented.");
1357   return (CmiCommHandle) 0;
1358 }
1359 #endif
1360
1361 /** NODE SENDING FUNCTIONS
1362
1363  * If there is a node queue, and we consider also nodes as entity (tipically in
1364  * SMP versions), these functions are needed.
1365  */
1366
1367 #if CMK_NODE_QUEUE_AVAILABLE
1368
1369 void          CmiSyncNodeSendFn(int, int, char *);
1370 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1371 void          CmiFreeNodeSendFn(int, int, char *);
1372
1373 void          CmiSyncNodeBroadcastFn(int, char *);
1374 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1375 void          CmiFreeNodeBroadcastFn(int, char *);
1376
1377 void          CmiSyncNodeBroadcastAllFn(int, char *);
1378 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1379 void          CmiFreeNodeBroadcastAllFn(int, char *);
1380
1381 #endif
1382
1383
1384 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
1385
1386 int _Cmi_numpes;
1387 int _Cmi_mynodesize;
1388 int _Cmi_mynode;
1389 int _Cmi_numnodes;
1390
1391 int CmiMyPe();
1392 int CmiMyRank();
1393 int CmiNodeFirst(int node);
1394 int CmiNodeSize(int node);
1395 int CmiNodeOf(int pe);
1396 int CmiRankOf(int pe);
1397
1398 /* optional, these functions are implemented in "machine-smp.c", so including
1399    this file avoid the necessity to reimplement them.
1400  */
1401 void CmiNodeBarrier(void);
1402 void CmiNodeAllBarrier(void);
1403 CmiNodeLock CmiCreateLock();
1404 void CmiDestroyLock(CmiNodeLock lock);
1405
1406 #endif
1407
1408 /** IMMEDIATE MESSAGES
1409
1410  * If immediate messages are supported, the following function is needed. There
1411  * is an exeption if the machine progress is also defined (see later for this).
1412
1413  * Moreover, the file "immediate.c" should be included, otherwise all its
1414  * functions and variables have to be redefined.
1415 */
1416
1417 #if CMK_CCS_AVAILABLE
1418
1419 #include "immediate.c"
1420
1421 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1422 void CmiProbeImmediateMsg();
1423 #endif
1424
1425 #endif
1426
1427
1428 /** MACHINE PROGRESS DEFINED
1429
1430  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1431  * to be pulled out of the network manually. For this reason the following
1432  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1433  * not be defined anymore.
1434  */
1435
1436 #if CMK_MACHINE_PROGRESS_DEFINED
1437
1438
1439
1440 void CmiMachineProgressImpl()
1441 {
1442
1443   unsigned long long new_time = rts_get_timebase();  
1444   if(new_time < lastProgress + progress_cycles) {
1445     return;
1446   }
1447   lastProgress = new_time;
1448
1449 #if !CMK_SMP
1450   AdvanceCommunications();
1451 #else
1452   /*Not implemented yet. Communication server does not seem to be
1453     thread safe */
1454   /* CommunicationServerThread(0); */
1455 #endif  
1456 }
1457
1458 #endif
1459
1460 /* Dummy implementation */
1461 extern "C" void CmiBarrier()
1462 {
1463   BGTsC_Barrier(0);
1464   //BGTr_Barrier(1);
1465 }
1466