New version with interrupt/thread support.
[charm.git] / src / arch / bluegenel / bglmachine.C
1
2 #include <stdio.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <math.h>
7 #include <string.h>
8 #include "machine.h"
9 #include "converse.h"
10 #include "pcqueue.h"
11 #include <stack>
12
13 #define EAGER_MESSAGE_SIZE 100
14
15 #include "rts.h"
16 #include "BLMPI_EagerProtocol.h"
17 #include "BLMPI_RzvProtocol.h"
18 #include "BGLML_Torus.h"
19 #include "BGLML_Tree.h"
20
21 #if CMK_PERSISTENT_COMM
22 #include "BLRMA_Put.h"
23 #endif
24
25 #if CMK_PERSISTENT_COMM
26 #include "persist_impl.h"
27 #endif
28
29 #if 0
30 #define BGL_DEBUG CmiPrintf
31 #else
32 #define BGL_DEBUG // CmiPrintf
33 #endif
34
35 int phscount;
36
37 inline char *ALIGN_16(char *p){
38   return((char *)((((unsigned long)p)+0xf)&0xfffffff0));
39 }
40
41 static char *_msgr_buf = NULL;
42 static BGLML_Messager_t* _msgr = NULL;
43 static void ** _recvArray;
44
45 PCQueue message_q;                   //queue to receive incoming messages
46 PCQueue broadcast_q;                 //queue to send broadcast messages
47
48 #define PROGRESS_PERIOD 8
49 #define PROGRESS_CYCLES 4000           //10k cycles
50
51 /*
52     To reduce the buffer used in broadcast and distribute the load from
53   broadcasting node, define CMK_BROADCAST_SPANNING_TREE enforce the use of
54   spanning tree broadcast algorithm.
55     This will use the fourth short in message as an indicator of spanning tree
56   root.
57 */
58 #if CMK_SMP
59 #define CMK_BROADCAST_SPANNING_TREE    0
60 #else
61 #define CMK_BROADCAST_SPANNING_TREE    1
62 #define CMK_BROADCAST_HYPERCUBE        0
63 #endif /* CMK_SMP */
64
65 #define BROADCAST_SPANNING_FACTOR      4
66
67 #define MAX_OUTSTANDING 1    //1024    
68 #define MAX_POSTED 8
69 #define MAX_QLEN 128
70 #define MAX_BYTES 2000000
71
72
73 #define CMI_BROADCAST_ROOT(msg)          ((CmiMsgHeaderBasic *)msg)->root
74 #define CMI_GET_CYCLE(msg)               ((CmiMsgHeaderBasic *)msg)->root
75
76 #define CMI_DEST_RANK(msg)               ((CmiMsgHeaderBasic *)msg)->rank
77 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
78
79 /* FIXME: need a random number that everyone agrees ! */
80 #define CHARM_MAGIC_NUMBER               126
81
82 #if !CMK_OPTIMIZE
83 static int checksum_flag = 0;
84 extern "C" unsigned char computeCheckSum(unsigned char *data, int len);
85 #define CMI_SET_CHECKSUM(msg, len)      \
86         if (checksum_flag)  {   \
87           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
88           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
89         }
90 #define CMI_CHECK_CHECKSUM(msg, len)    \
91         if (checksum_flag)      \
92           if (computeCheckSum((unsigned char*)msg, len) != 0)  { \
93             printf("\n\n------------------------------\n\nReceiver %d size %d:", CmiMyPe(), len); \    
94             for(int count = 0; count < len; count++) { \
95                 printf("%2x", msg[count]);                 \
96             }                                             \    
97             printf("------------------------------\n\n"); \
98             CmiAbort("Fatal error: checksum doesn't agree!\n"); \
99           }
100 #else
101 #define CMI_SET_CHECKSUM(msg, len)
102 #define CMI_CHECK_CHECKSUM(msg, len)
103 #endif
104
105 #define CMI_SET_BROADCAST_ROOT(msg, root)  CMI_BROADCAST_ROOT(msg) = (root);
106
107 #if CMK_BROADCAST_HYPERCUBE
108 #  define CMI_SET_CYCLE(msg, cycle)  CMI_GET_CYCLE(msg) = (cycle);
109 #else
110 #  define CMI_SET_CYCLE(msg, cycle)
111 #endif
112
113 int               _Cmi_numpes;
114 int               _Cmi_mynode;    /* Which address space am I */
115 int               _Cmi_mynodesize;/* Number of processors in my address space */
116 int               _Cmi_numnodes;  /* Total number of address spaces */
117 static int        Cmi_nodestart; /* First processor in this address space */
118 CpvDeclare(void*, CmiLocalQueue);
119
120 int               idleblock = 0;
121
122 #include "machine-smp.c"
123 CsvDeclare(CmiNodeState, NodeState);
124 #include "immediate.c"
125
126 static inline void AdvanceCommunications(int max_out = MAX_OUTSTANDING);
127 static int outstanding_recvs = 0;
128
129
130 #if !CMK_SMP
131 /************ non SMP **************/
132 static struct CmiStateStruct Cmi_state;
133 int _Cmi_mype;
134 int _Cmi_myrank;
135
136 void CmiMemLock(void) {}
137 void CmiMemUnlock(void) {}
138
139 #if BGL_VERSION_CONTROLX
140 /* Code to support interrupts in BGL */
141
142 volatile int bgx_in_interrupt;
143 volatile int bgx_lock;
144 rts_mutex_t torus_lock = RTS_MUTEX_INITIALIZER;
145
146 static int interrupts_enabled;
147
148 extern "C" void * BGX_InterruptThread(void *) {
149   
150   while (1) {
151     rts_mutex_lock(&torus_lock);
152     
153     //printf("Here Goes\n");
154
155     bgx_in_interrupt = 1;
156     /*
157     int target = outstanding_recvs - MAX_OUTSTANDING;
158     while(target > 0){
159       while(BGLML_Messager_advance(_msgr)>0) ;
160       target = outstanding_recvs - MAX_OUTSTANDING;
161     }
162     */
163
164     //while(BGLML_Messager_advance(_msgr) > 0);
165     AdvanceCommunications();
166     bgx_in_interrupt = 0;
167     
168     rts_mutex_unlock(&torus_lock);    
169     rts_rearm_torus_device();
170   }
171   
172   return NULL;
173 }
174
175 extern "C" void BGX_BeginCriticalSection() {
176   //CmiAssert(bgx_in_interrupt == 0);
177
178   if(bgx_in_interrupt)
179     return;
180
181   CmiAssert(bgx_lock == 0);
182
183   rts_mutex_lock(&torus_lock);
184   bgx_lock = 1;  
185 }
186
187 extern "C" void BGX_EndCriticalSection() {
188   //CmiAssert(bgx_in_interrupt == 0);
189   
190   if(bgx_in_interrupt)
191     return;
192   
193   CmiAssert(bgx_lock == 1);
194   
195   rts_mutex_unlock(&torus_lock);
196   bgx_lock = 0;  
197 }
198
199 #else
200
201 #define   BGX_BeginCriticalSection()
202 #define   BGX_EndCriticalSection()
203
204 #endif
205
206
207 #define CmiGetState() (&Cmi_state)
208 #define CmiGetStateN(n) (&Cmi_state)
209
210 void CmiYield(void) { sleep(0); }
211
212 static void CmiStartThreads(char **argv)
213 {
214   CmiStateInit(Cmi_nodestart, 0, &Cmi_state);
215   _Cmi_mype = Cmi_nodestart;
216   _Cmi_myrank = 0;
217 }
218 #endif  /* !CMK_SMP */
219
220 int received_immediate = 0;
221
222 /*Add a message to this processor's receive queue, pe is a rank */
223 static void CmiPushPE(int pe,void *msg)
224 {
225   CmiState cs = CmiGetStateN(pe);
226   MACHSTATE2(3,"Pushing message into rank %d's queue %p{",pe, cs->recv);
227 #if CMK_IMMEDIATE_MSG
228   if (CmiIsImmediate(msg)) {
229     /**(CmiUInt2 *)msg = pe;*/
230     received_immediate = 1;
231     CMI_DEST_RANK(msg) = pe;
232     CmiPushImmediateMsg(msg);
233     return;
234   }
235 #endif
236 #if CMK_SMP
237   CmiLock(procState[pe].recvLock);
238 #endif
239
240   PCQueuePush(cs->recv,(char *)msg);
241   //printf("%d: {%d} PCQueue length = %d, msg = %x\n", CmiMyPe(), bgx_in_interrupt, PCQueueLength(cs->recv), msg);
242   
243 #if CMK_SMP
244   CmiUnlock(procState[pe].recvLock);
245 #endif
246   CmiIdleLock_addMessage(&cs->idle);
247   MACHSTATE1(3,"} Pushing message into rank %d's queue done",pe);
248 }
249
250 #if CMK_NODE_QUEUE_AVAILABLE
251 /*Add a message to this processor's receive queue */
252 static void CmiPushNode(void *msg)
253 {
254   MACHSTATE(3,"Pushing message into NodeRecv queue");
255 #if CMK_IMMEDIATE_MSG
256   if (CmiIsImmediate(msg)) {
257     CMI_DEST_RANK(msg) = 0;
258     CmiPushImmediateMsg(msg);
259     return;
260   }
261 #endif
262   CmiLock(CsvAccess(NodeState).CmiNodeRecvLock);
263   PCQueuePush(CsvAccess(NodeState).NodeRecv,msg);
264   CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
265   {
266   CmiState cs=CmiGetStateN(0);
267   CmiIdleLock_addMessage(&cs->idle);
268   }
269 }
270 #endif /* CMK_NODE_QUEUE_AVAILABLE */
271
272 #ifndef CmiMyPe
273 int CmiMyPe(void)
274 {
275   return CmiGetState()->pe;
276 }
277 #endif
278
279 #ifndef CmiMyRank
280 int CmiMyRank(void)
281 {
282   return CmiGetState()->rank;
283 }
284 #endif
285
286
287 static int msgQueueLen = 0;
288 static int msgQBytes = 0;
289 static int numPosted = 0;
290
291 static int request_max;
292 static int maxMessages;
293 static int maxBytes;
294 static int no_outstanding_sends=0; /*FLAG: consume outstanding Isends in scheduler loop*/
295 static int progress_cycles;
296
297 static int Cmi_dim;     /* hypercube dim of network */
298
299 static char     **Cmi_argv;
300 static char     **Cmi_argvcopy;
301 static CmiStartFn Cmi_startfn;   /* The start function */
302 static int        Cmi_usrsched;  /* Continue after start function finishes? */
303
304 extern "C" void ConverseCommonInit(char **argv);
305 extern "C" void ConverseCommonExit(void);
306 extern "C" void CthInit(char **argv);
307
308 static inline void SendMsgsUntil(int, int);
309 static void ConverseRunPE(int everReturn);
310 static void CommunicationServer(int sleepTime);
311 static void CommunicationServerThread(int sleepTime);
312
313 void SendSpanningChildren(int size, char *msg);
314 void SendHypercube(int size, char *msg);
315
316 typedef struct msg_list {
317   BGLQuad info;
318   char *msg;
319   char *send_buf;
320   int size;
321   int destpe;
322   //struct msg_list *next;
323 #if CMK_PERSISTENT_COMM
324   PersistentHandle phs;
325   int phscount;
326   int phsSize;
327 #endif
328 } SMSG_LIST;
329
330
331 typedef struct {
332   int sleepMs; /*Milliseconds to sleep while idle*/
333   int nIdles; /*Number of times we've been idle in a row*/
334   CmiState cs; /*Machine state*/
335 } CmiIdleState;
336
337 static CmiIdleState *CmiNotifyGetState(void)
338 {
339   CmiIdleState *s=(CmiIdleState *)CmiAlloc(sizeof(CmiIdleState));
340   s->sleepMs=0;
341   s->nIdles=0;
342   s->cs=CmiGetState();
343   return s;
344 }
345
346 typedef struct ProcState {
347 /* PCQueue      sendMsgBuf; */      /* per processor message sending queue */
348 CmiNodeLock  recvLock;              /* for cs->recv */
349 } ProcState;
350
351 static ProcState  *procState;
352
353 /* send done callback: sets the smsg entry to done */
354 static void send_done(void *data){
355   SMSG_LIST *msg_tmp = (SMSG_LIST *)(data);
356   CmiFree(msg_tmp->msg);
357
358   msgQBytes -= msg_tmp->size;
359
360   if(msg_tmp->send_buf)
361     CmiFree(msg_tmp->send_buf);
362
363   free(data);
364   data=NULL;
365
366   msgQueueLen--;
367   numPosted --;
368 }
369
370 //Called on receiving a persistent message
371 void persist_recv_done(void *clientdata) {
372
373   char *msg = (char *) clientdata;
374   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
375
376   ///  printf("[%d] persistent receive of size %d\n", CmiMyPe(), sndlen);
377   
378   //Cannot broadcast Persistent message
379
380   CMI_CHECK_CHECKSUM(msg, sndlen);
381   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
382     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
383     return;
384   }
385   
386   CmiReference(msg);
387
388 #if CMK_NODE_QUEUE_AVAILABLE
389   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
390     CmiPushNode(msg);
391   else
392 #endif
393     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
394 }
395
396 /* recv done callback: push the recved msg to recv queue */
397 static void recv_done(void *clientdata){
398
399   char *msg = (char *) clientdata;
400   int sndlen = ((CmiMsgHeaderBasic *) msg)->size;
401   
402   /* then we do what PumpMsgs used to do:
403    * push msg to recv queue */
404   
405   CMI_CHECK_CHECKSUM(msg, sndlen);
406   if (CMI_MAGIC(msg) != CHARM_MAGIC_NUMBER) { /* received a non-charm msg */
407     CmiAbort("Charm++ Warning: Non Charm++ Message Received. \n");
408     return;
409   }
410
411 #if CMK_NODE_QUEUE_AVAILABLE
412   if (CMI_DEST_RANK(msg) == DGRAM_NODEMESSAGE)
413     CmiPushNode(msg);
414   else
415 #endif
416     CmiPushPE(CMI_DEST_RANK(msg), (void *)msg);
417
418 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
419   if(CMI_BROADCAST_ROOT(msg) != 0) {
420     //printf ("%d: Receiving bcast message %d bytes\n", CmiMyPe(), sndlen);
421     PCQueuePush(broadcast_q, msg);
422   }
423 #endif
424
425
426   //rts_dcache_evict_normal();
427
428   outstanding_recvs --;
429 }
430
431 /* first packet recv callback, gets recv_done for the whole msg */
432 BLMPI_Eager_Recv_t * first_pkt_eager_recv_done (
433                                           const BGLQuad    * msginfo,
434                                           unsigned           senderrank,
435                                           const unsigned     sndlen,
436                                           unsigned         * rcvlen,
437                                           char            ** buffer,
438                                           void           (** cb_done)(void *),
439                                           void            ** clientdata
440                                          )
441 {
442   outstanding_recvs ++;
443
444   int alloc_size = sndlen + sizeof(BLMPI_Eager_Recv_t) + 16;
445   
446   //printf ("%d: {%d} Receiving message %d bytes\n", CmiMyPe(), bgx_in_interrupt, sndlen);
447
448   /* printf ("Receiving %d bytes\n", sndlen); */
449   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
450                                    return NULL */
451
452   *buffer = (char *)CmiAlloc(alloc_size);
453   *cb_done = recv_done;
454   *clientdata = *buffer;
455
456   return (BLMPI_Eager_Recv_t *)ALIGN_16(*buffer + sndlen);
457 }
458
459 BLMPI_Rzv_Recv_t * first_pkt_rzv_recv_done   (
460                                           const BGLQuad    * msginfo,
461                                           unsigned           senderrank,
462                                           const unsigned     sndlen,
463                                           unsigned         * rcvlen,
464                                           char            ** buffer,
465                                           void           (** cb_done)(void *),
466                                           void            ** clientdata
467                                          )
468 {
469   outstanding_recvs++;
470   /* printf ("Receiving %d bytes\n", sndlen); */
471   *rcvlen = sndlen>0?sndlen:1;  /* to avoid malloc(0) which might
472                                    return NULL */
473
474   int alloc_size = sndlen + sizeof(BLMPI_Rzv_Recv_t) + 16;
475
476   *buffer = (char *)CmiAlloc(alloc_size);
477   *cb_done = recv_done;
478   *clientdata = *buffer;
479   return (BLMPI_Rzv_Recv_t*) ALIGN_16(*buffer + sndlen);
480 }
481
482
483 inline void sendBroadcastMessages() {
484   while(!PCQueueEmpty(broadcast_q)) {
485     char *msg = (char *) PCQueuePop(broadcast_q);
486
487 #if CMK_BROADCAST_SPANNING_TREE
488     SendSpanningChildren(((CmiMsgHeaderBasic *) msg)->size, msg);
489 #elif CMK_BROADCAST_HYPERCUBE
490     SendHypercube(((CmiMsgHeaderBasic *) msg)->size, msg);
491 #endif
492     
493     //CMI_CHECK_CHECKSUM(msg, (((CmiMsgHeaderBasic *) msg)->size));
494   }
495 }
496
497 #include "bgltorus.h"
498 CpvDeclare(BGLTorusManager*, tmanager);
499
500 void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched, int initret){
501   int n, i;
502
503   _msgr_buf = (char *)CmiAlloc(sizeof(BGLML_Messager_t)+16);
504   _msgr = (BGLML_Messager_t *)ALIGN_16(_msgr_buf);
505   BGLML_Messager_Init(_msgr, BGL_AppMutexs, BGL_AppBarriers);
506
507   _Cmi_numnodes = BGLML_Messager_size(_msgr);
508   _Cmi_mynode = BGLML_Messager_rank(_msgr);
509
510   message_q = PCQueueCreate();
511   broadcast_q = PCQueueCreate();
512
513   unsigned rank = BGLML_Messager_rank(_msgr);
514   unsigned size = BGLML_Messager_size(_msgr);
515
516   unsigned char * actual = (unsigned char *)malloc (size);
517   unsigned char * expect = (unsigned char *)malloc (size);
518   BGLML_Torus_SimpleBarrierInit(_msgr, 20, expect, actual);
519   _recvArray = (void **) malloc (sizeof(void *) * _Cmi_numnodes);
520   BLMPI_Eager_Init(_msgr, first_pkt_eager_recv_done, _recvArray, 11, 12);
521   BLMPI_Rzv_Init(_msgr, first_pkt_rzv_recv_done, 14, 15, 3, 13);
522   
523 #if CMK_PERSISTENT_COMM
524   BLRMA_Put_Init(_msgr, 1, 2);
525   phs =  NULL;
526 #endif
527
528   CmiBarrier();    
529
530   /* processor per node */
531   _Cmi_mynodesize = 1;
532   CmiGetArgInt(argv,"+ppn", &_Cmi_mynodesize);
533 #if ! CMK_SMP
534   if (_Cmi_mynodesize > 1 && _Cmi_mynode == 0)
535     CmiAbort("+ppn cannot be used in non SMP version!\n");
536 #endif
537
538   idleblock = CmiGetArgFlag(argv, "+idleblocking");
539   if (idleblock && _Cmi_mynode == 0) {
540     CmiPrintf("Charm++: Running in idle blocking mode.\n");
541   }
542
543 #if CMK_NO_OUTSTANDING_SENDS
544   no_outstanding_sends=1;
545 #endif
546   if (CmiGetArgFlag(argv,"+no_outstanding_sends")) {
547     no_outstanding_sends = 1;
548     if (_Cmi_mynode == 0)
549       CmiPrintf("Charm++: Will%s consume outstanding sends in scheduler loop\n",        no_outstanding_sends?"":" not");
550   }
551   
552
553   if (CmiGetArgFlag(argv,"+enable_interrupts")) 
554     interrupts_enabled = 1;
555
556   _Cmi_numpes = _Cmi_numnodes * _Cmi_mynodesize;
557   Cmi_nodestart = _Cmi_mynode * _Cmi_mynodesize;
558   Cmi_argvcopy = CmiCopyArgs(argv);
559   Cmi_argv = argv; Cmi_startfn = fn; Cmi_usrsched = usched;
560
561   CpvInitialize(BGLTorusManager*, tmanager);
562   CpvAccess(tmanager) = new BGLTorusManager();
563
564
565   /* find dim = log2(numpes), to pretend we are a hypercube */
566   for ( Cmi_dim=0,n=_Cmi_numpes; n>1; n/=2 )
567     Cmi_dim++ ;
568
569   request_max=MAX_POSTED;
570   CmiGetArgInt(argv,"+requestmax",&request_max);
571  
572   maxMessages = MAX_QLEN;
573   maxBytes = MAX_BYTES;
574
575   /* checksum flag */
576   if (CmiGetArgFlag(argv,"+checksum")) {
577 #if !CMK_OPTIMIZE
578     checksum_flag = 1;
579     if (_Cmi_mynode == 0) CmiPrintf("Charm++: CheckSum checking enabled! \n");
580 #else
581     if (_Cmi_mynode == 0) CmiPrintf("Charm++: +checksum ignored in optimized version! \n");
582 #endif
583   }
584
585   CsvInitialize(CmiNodeState, NodeState);
586   CmiNodeStateInit(&CsvAccess(NodeState));
587
588   procState = (ProcState *)CmiAlloc((_Cmi_mynodesize+1) * sizeof(ProcState));
589   for (i=0; i<_Cmi_mynodesize+1; i++) {
590 /*    procState[i].sendMsgBuf = PCQueueCreate();   */
591     procState[i].recvLock = CmiCreateLock();
592   }
593
594   /* Network progress function is used to poll the network when for
595      messages. This flushes receive buffers on some  implementations*/
596   networkProgressPeriod = PROGRESS_PERIOD;
597   CmiGetArgInt(argv, "+networkProgressPeriod", &networkProgressPeriod);
598   
599   progress_cycles = PROGRESS_CYCLES;
600   CmiGetArgInt(argv, "+progressCycles", &progress_cycles);
601
602   CmiStartThreads(argv);
603   ConverseRunPE(initret);
604 }
605
606 static void ConverseRunPE(int everReturn)
607 {
608   CmiIdleState *s=CmiNotifyGetState();
609   CmiState cs;
610   char** CmiMyArgv;
611   CmiNodeAllBarrier();
612   cs = CmiGetState();
613   CpvInitialize(void *,CmiLocalQueue);
614   CpvAccess(CmiLocalQueue) = cs->localqueue;
615
616   if (CmiMyRank())
617     CmiMyArgv=CmiCopyArgs(Cmi_argvcopy);
618   else   
619     CmiMyArgv=Cmi_argv;
620     
621   CthInit(CmiMyArgv);
622
623   ConverseCommonInit(CmiMyArgv);
624
625   /* initialize the network progress counter*/
626   /* Network progress function is used to poll the network when for
627      messages. This flushes receive buffers on some  implementations*/
628   CpvInitialize(int , networkProgressCount);
629   CpvAccess(networkProgressCount) = 0;
630
631 #if CMK_SMP
632   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
633   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
634 #else
635   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyIdle,NULL);
636 #endif
637
638 #if MACHINE_DEBUG_LOG
639   if (CmiMyRank() == 0) {
640     char ln[200];
641     sprintf(ln,"debugLog.%d",CmiMyNode());
642     debugLog=fopen(ln,"w");
643   }
644 #endif
645
646 #if BGL_VERSION_CONTROLX
647   if(interrupts_enabled)
648     rts_thread_create(&BGX_InterruptThread, 0);
649 #endif
650
651   CmiBarrier();
652
653   /* Converse initialization finishes, immediate messages can be processed.
654      node barrier previously should take care of the node synchronization */
655   _immediateReady = 1;
656
657   /* communication thread */
658   if (CmiMyRank() == CmiMyNodeSize()) {
659     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
660     while (1) CommunicationServerThread(5);
661   }
662   else {  /* worker thread */
663     if (!everReturn) {
664       Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
665       if (Cmi_usrsched==0) CsdScheduler(-1);
666       ConverseExit();
667     }
668   }
669 }
670
671 #if CMK_SMP
672 static int inexit = 0;
673
674 static int MsgQueueEmpty()
675 {
676   int i;
677 #if 0
678   for (i=0; i<_Cmi_mynodesize; i++)
679     if (!PCQueueEmpty(procState[i].sendMsgBuf)) return 0;
680 #else
681   return PCQueueEmpty(sendMsgBuf);
682 #endif
683   return 1;
684 }
685
686 static int SendMsgBuf();
687
688 /* test if all processors recv queues are empty */
689 static int RecvQueueEmpty()
690 {
691   int i;
692   for (i=0; i<_Cmi_mynodesize; i++) {
693     CmiState cs=CmiGetStateN(i);
694     if (!PCQueueEmpty(cs->recv)) return 0;
695   }
696   return 1;
697 }
698
699 static void CommunicationServer(int sleepTime){
700   int static count=0;
701
702   SendMsgBuf(); 
703   AdvanceCommunications();
704 /*
705   if (count % 10000000==0) MACHSTATE(3, "} Exiting CommunicationServer.");
706 */
707   if (inexit == CmiMyNodeSize()) {
708     MACHSTATE(2, "CommunicationServer exiting {");
709 #if 0
710     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent() || !RecvQueueEmpty()) {
711 #endif
712     while(!MsgQueueEmpty() || !CmiAllAsyncMsgsSent()) {
713       SendMsgBuf(); 
714       AdvanceCommunications();
715     }
716 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
717     if (CmiMyNode() == 0){
718       CmiPrintf("End of program\n");
719     }
720 #endif
721     MACHSTATE(2, "} CommunicationServer EXIT");
722     exit(0);   
723   }
724 }
725 #endif /* CMK_SMP */
726
727 static void CommunicationServerThread(int sleepTime){
728 #if CMK_SMP
729   CommunicationServer(sleepTime);
730 #endif
731 #if CMK_IMMEDIATE_MSG
732   CmiHandleImmediate();
733 #endif
734 }
735
736 void ConverseExit(void){
737 /* #if ! CMK_SMP */
738 /* we don't have async send yet
739   while(!CmiAllAsyncMsgsSent()) {
740     AdvanceCommunications();
741   }
742 */
743   while(msgQueueLen) {
744     AdvanceCommunications(outstanding_recvs);
745   }
746
747   ConverseCommonExit();
748
749   CmiFree(procState);
750   CmiFree(_msgr_buf);
751   free(_recvArray); 
752 #if (CMK_DEBUG_MODE || CMK_WEB_MODE || NODE_0_IS_CONVHOST)
753   if (CmiMyPe() == 0){
754     CmiPrintf("End of program\n");
755   }
756 #endif
757
758   exit(0);
759 }
760
761 /* exit() called on any node would abort the whole program */
762 void CmiAbort(const char * message){
763   CmiError("------------- Processor %d Exiting: Called CmiAbort ------------\n"
764         "{snd:%d,rcv:%d} Reason: %s\n",CmiMyPe(),msgQueueLen,outstanding_recvs,message);
765   CmiPrintStackTrace(0);
766   exit(-1);
767 }
768
769 void *CmiGetNonLocal(){
770   static int count=0;
771   CmiState cs = CmiGetState();
772
773   void *msg = NULL;
774   CmiIdleLock_checkMessage(&cs->idle);
775   /* although it seems that lock is not needed, I found it crashes very often
776      on mpi-smp without lock */
777
778   AdvanceCommunications();
779   
780   //int length = PCQueueLength(cs->recv);
781   //if(length != 0)
782   //printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
783   
784   CmiLock(procState[cs->rank].recvLock);
785
786   //if(length > 0)
787   msg =  PCQueuePop(cs->recv); 
788   CmiUnlock(procState[cs->rank].recvLock);
789
790 #if !CMK_SMP
791   if (no_outstanding_sends) {
792     SendMsgsUntil(0, 0);
793   }
794   
795   if(!msg) {
796     AdvanceCommunications();
797     
798     //int length = PCQueueLength(cs->recv);
799     //if(length != 0)
800     //    printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
801
802     //if(length > 0)
803     return PCQueuePop(cs->recv);
804   }
805 #endif /* !CMK_SMP */
806   return msg;
807 }
808
809 static void CmiSendSelf(char *msg){
810 #if CMK_IMMEDIATE_MSG
811     if (CmiIsImmediate(msg)) {
812       /* CmiBecomeNonImmediate(msg); */
813       CmiPushImmediateMsg(msg);
814       CmiHandleImmediate();
815       return;
816     }
817 #endif
818     CQdCreate(CpvAccess(cQdState), 1);
819     CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),msg);
820 }
821
822
823 inline void machineSend(SMSG_LIST *msg_tmp);
824
825 #if CMK_PERSISTENT_COMM
826 #include "persistent.C"
827 #endif
828
829 inline void machineSend(SMSG_LIST *msg_tmp) {
830   
831   CMI_MAGIC(msg_tmp->msg) = CHARM_MAGIC_NUMBER;
832   CMI_SET_CHECKSUM(msg_tmp->msg, msg_tmp->size);
833   CQdCreate(CpvAccess(cQdState), 1);
834   /*
835   printf("------------------------------\n\nSender %d Receiver %d size %d:", 
836          CmiMyPe(), msg_tmp->destpe, msg_tmp->size);
837   
838   for(int count = 0; count < msg_tmp->size; count++) {
839     printf("%2x", msg_tmp->msg[count]);
840   } 
841   
842   printf("------------------------------\n\n");
843   */
844
845   numPosted ++;
846
847   if(msg_tmp->destpe == CmiMyPe())
848     CmiAbort("Sending to self\n");
849   
850   //printf("%d : Sending message to %d of size %d\n", CmiMyPe(), msg_tmp->destpe, msg_tmp->size);
851
852 #if CMK_PERSISTENT_COMM
853   if(msg_tmp->phs) {
854     if(machineSendPersistentMsg(msg_tmp))
855       return;
856   }
857 #endif
858
859   CmiAssert(msg_tmp->destpe >= 0 && msg_tmp->destpe < CmiNumPes());
860
861   if(msg_tmp->size < EAGER_MESSAGE_SIZE) {
862     //msg_tmp->send_buf = (char *) malloc (sizeof(BLMPI_Eager_Send_t));
863     msg_tmp->send_buf = (char *) CmiAlloc (sizeof(BLMPI_Eager_Send_t));    
864
865     BLMPI_Eager_Send_t *send = (BLMPI_Eager_Send_t *)
866       ALIGN_16(msg_tmp->send_buf);
867     BLMPI_Eager_Send(send, _msgr, &(msg_tmp->info), msg_tmp->msg, 
868                      msg_tmp->size, msg_tmp->destpe, send_done, 
869                      (void *)msg_tmp);
870   }
871   else {
872     //printf("%d : Sending rzv message to %d of size %d\n", CmiMyPe(), msg_tmp->destpe, msg_tmp->size);
873     //    msg_tmp->send_buf = (char *) malloc (sizeof(BLMPI_Rzv_Send_t));
874     
875     msg_tmp->send_buf = (char *) CmiAlloc (sizeof(BLMPI_Rzv_Send_t));
876
877     //CmiAssert((unsigned long)msg_tmp->send_buf % 16 == 0);
878     //CmiAssert((unsigned long)msg_tmp % 16 == 0);
879     
880     BLMPI_Rzv_Send_t *send = (BLMPI_Rzv_Send_t *)ALIGN_16(msg_tmp->send_buf);
881     BLMPI_Rzv_Send(send, _msgr, &(msg_tmp->info), msg_tmp->msg, msg_tmp->size, 
882                    msg_tmp->destpe, send_done, (void *)msg_tmp);
883   }
884 }
885
886 static inline void sendQueuedMessages() {
887   while(numPosted <= request_max && !PCQueueEmpty(message_q)) {
888     SMSG_LIST *msg_tmp = (SMSG_LIST *)PCQueuePop(message_q);    
889     machineSend(msg_tmp);
890   }
891 }
892
893 /* The general free send function
894  * Send is synchronous, and free msg after posted
895  */
896 inline void  CmiGeneralFreeSend(int destPE, int size, char* msg){
897
898   ((CmiMsgHeaderBasic *)msg)->size = size;
899   
900   CmiState cs = CmiGetState();
901   if(destPE==cs->pe){
902     CmiSendSelf(msg);
903     return;
904   }
905   
906   SendMsgsUntil(maxMessages, maxBytes);
907
908   SMSG_LIST *msg_tmp = (SMSG_LIST *) malloc(sizeof(SMSG_LIST));
909   
910   msg_tmp->destpe = destPE;
911   msg_tmp->size = size;
912   msg_tmp->msg = msg;
913   msg_tmp->send_buf = NULL;
914
915 #if CMK_PERSISTENT_COMM
916   msg_tmp->phs = phs;
917   msg_tmp->phscount = phscount;
918   msg_tmp->phsSize = phsSize;
919 #endif
920
921   BGX_BeginCriticalSection();
922   
923   sendQueuedMessages();
924   
925   if(numPosted > request_max) {
926     PCQueuePush(message_q, (char *)msg_tmp);
927   }    
928   else 
929     machineSend(msg_tmp);    
930   
931   msgQBytes += size;
932   msgQueueLen++;
933   
934   BGX_EndCriticalSection();
935 }
936
937 void CmiSyncSendFn(int destPE, int size, char *msg){
938   char *copymsg;
939   copymsg = (char *)CmiAlloc(size);
940   memcpy(copymsg,msg,size);
941   CmiFreeSendFn(destPE,size,copymsg);
942 }
943
944 void CmiFreeSendFn(int destPE, int size, char *msg){
945   CMI_SET_BROADCAST_ROOT(msg,0);
946   CmiGeneralFreeSend(destPE,size,msg);
947 }
948
949 /* same as CmiSyncSendFn, but don't set broadcast root in msg header */
950 void CmiSyncSendFn1(int destPE, int size, char *msg)
951 {
952   char *copymsg;
953   copymsg = (char *)CmiAlloc(size);
954   memcpy(copymsg, msg, size);
955
956   //  asm volatile("sync" ::: "memory");
957
958   CmiGeneralFreeSend(destPE,size,copymsg);
959 }
960
961 /* send msg to its spanning children in broadcast. G. Zheng */
962 void SendSpanningChildren(int size, char *msg)
963 {
964   CmiState cs = CmiGetState();
965   int startpe = CMI_BROADCAST_ROOT(msg)-1;
966   int i;
967
968   CmiAssert(startpe>=0 && startpe<_Cmi_numpes);
969   int dist = cs->pe-startpe;
970   if(dist<0) dist+=_Cmi_numpes;
971   for (i=1; i <= BROADCAST_SPANNING_FACTOR; i++) {
972     int p = BROADCAST_SPANNING_FACTOR*dist + i;
973     if (p > _Cmi_numpes - 1) break;
974     p += startpe;
975     p = p%_Cmi_numpes;
976     CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
977     CmiSyncSendFn1(p, size, msg);
978   }
979 }
980
981 /* send msg along the hypercube in broadcast. (Sameer) */
982 void SendHypercube(int size, char *msg)
983 {
984   CmiState cs = CmiGetState();
985   int curcycle = CMI_GET_CYCLE(msg);
986   int i;
987
988   double logp = CmiNumPes();
989   logp = log(logp)/log(2.0);
990   logp = ceil(logp);
991   
992   /*  CmiPrintf("In hypercube\n"); */
993   /* assert(startpe>=0 && startpe<_Cmi_numpes); */
994
995   for (i = curcycle; i < logp; i++) {
996     int p = cs->pe ^ (1 << i);
997     /*   CmiPrintf("p = %d, logp = %5.1f\n", p, logp);*/
998     if(p < CmiNumPes()) {
999       CMI_SET_CYCLE(msg, i + 1);
1000
1001       CmiAssert(p>=0 && p<_Cmi_numpes && p!=cs->pe);
1002       CmiSyncSendFn1(p, size, msg);
1003     }
1004   }
1005 }
1006
1007 void CmiSyncBroadcastFn(int size, char *msg){
1008   char *copymsg;
1009   copymsg = (char *)CmiAlloc(size);
1010   memcpy(copymsg,msg,size);
1011   CmiFreeBroadcastFn(size,copymsg);
1012 }
1013
1014 void CmiFreeBroadcastFn(int size, char *msg){
1015   
1016   //printf("%d: Calling Broadcast %d\n", CmiMyPe(), size);
1017   /*
1018   printf("------------------------------\n\nSender %d :", CmiMyPe());
1019   
1020   for(int count = 0; count < size; count++) {
1021     printf("%2x", msg[count]);
1022   } 
1023   
1024   printf("------------------------------\n\n");
1025   */
1026
1027
1028   CmiState cs = CmiGetState();
1029 #if CMK_BROADCAST_SPANNING_TREE
1030   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1031   SendSpanningChildren(size, msg);
1032   CmiFree(msg);
1033 #elif CMK_BROADCAST_HYPERCUBE
1034   CMI_SET_CYCLE(msg, 0);
1035   SendHypercube(size, msg);
1036   CmiFree(msg);
1037 #else
1038   int i;
1039
1040   for ( i=cs->pe+1; i<_Cmi_numpes; i++ ) 
1041     CmiSyncSendFn(i,size,msg);
1042   
1043   for ( i=0; i<cs->pe; i++ ) 
1044     CmiSyncSendFn(i,size,msg);
1045
1046   CmiFree(msg);
1047 #endif
1048   //SendMsgsUntil(0,0);
1049 }
1050
1051 void CmiSyncBroadcastAllFn(int size, char *msg){
1052   char *copymsg;
1053   copymsg = (char *)CmiAlloc(size);
1054   memcpy(copymsg,msg,size);
1055   CmiFreeBroadcastAllFn(size,copymsg);
1056 }
1057
1058 void CmiFreeBroadcastAllFn(int size, char *msg){
1059   
1060   //printf("%d: Calling All Broadcast %d\n", CmiMyPe(), size);
1061
1062   CmiState cs = CmiGetState();
1063 #if CMK_BROADCAST_SPANNING_TREE
1064   CmiSyncSendFn(cs->pe,size,msg);
1065   CMI_SET_BROADCAST_ROOT(msg, cs->pe+1);
1066   SendSpanningChildren(size, msg);
1067   CmiFree(msg);
1068 #elif CMK_BROADCAST_HYPERCUBE
1069   CmiSyncSendFn(cs->pe,size,msg);
1070   CMI_SET_CYCLE(msg, 0);
1071   SendHypercube(size, msg);
1072   CmiFree(msg);
1073 #else
1074   int i ;
1075
1076   CmiSyncSendFn(CmiMyPe(), size, msg);
1077   
1078   for ( i=0; i<_Cmi_numpes; i++ ) {
1079     if(i== CmiMyPe())
1080       continue;
1081
1082     CmiSyncSendFn(i,size,msg);
1083   }
1084   CmiFree(msg);
1085 #endif
1086
1087   //SendMsgsUntil(0,0);
1088 }
1089
1090 static inline void AdvanceCommunications(int max_out){
1091
1092   sendBroadcastMessages();
1093
1094   BGX_BeginCriticalSection();
1095
1096   while(msgQueueLen > maxMessages && msgQBytes > maxBytes){
1097     while(BGLML_Messager_advance(_msgr)>0) ;
1098     sendQueuedMessages();
1099   }
1100   
1101   int target = outstanding_recvs - max_out;
1102   while(target > 0){
1103     while(BGLML_Messager_advance(_msgr)>0) ;
1104     target = outstanding_recvs - max_out;
1105   }
1106   
1107   while(BGLML_Messager_advance(_msgr)>0);
1108   BGX_EndCriticalSection();
1109
1110   sendBroadcastMessages();
1111
1112 #if CMK_IMMEDIATE_MSG
1113   if(received_immediate)
1114     CmiHandleImmediate();
1115   received_immediate = 0;
1116 #endif
1117   
1118   BGX_BeginCriticalSection();  
1119   sendQueuedMessages();
1120   BGX_EndCriticalSection();  
1121 }
1122
1123 static inline void SendMsgsUntil(int targetm, int targetb){
1124
1125   sendBroadcastMessages();
1126   BGX_BeginCriticalSection();
1127   
1128   while(msgQueueLen>targetm && msgQBytes > targetb){
1129     while(BGLML_Messager_advance(_msgr)>0) ;
1130     sendQueuedMessages();
1131   }
1132   
1133   while(BGLML_Messager_advance(_msgr)>0) ;
1134   
1135   BGX_EndCriticalSection();
1136
1137   sendBroadcastMessages();
1138 }
1139
1140 void CmiNotifyIdle(){  
1141   AdvanceCommunications();
1142
1143   CmiState cs = CmiGetStateN(pe);
1144   //int length = PCQueueLength(cs->recv);
1145   //if(length != 0)
1146   //printf("%d: {%d} PCQueue length = %d\n", CmiMyPe(), bgx_in_interrupt, length);
1147 }
1148
1149 static void CmiNotifyBeginIdle(CmiIdleState *s)
1150 {
1151   s->sleepMs=0;
1152   s->nIdles=0;
1153 }
1154
1155 static void CmiNotifyStillIdle(CmiIdleState *s)
1156
1157 #if ! CMK_SMP
1158   AdvanceCommunications();
1159 #else
1160 /*  CmiYield();  */
1161 #endif
1162
1163 #if 1
1164   {
1165   int nSpins=20; /*Number of times to spin before sleeping*/
1166   MACHSTATE1(2,"still idle (%d) begin {",CmiMyPe())
1167   s->nIdles++;
1168   if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
1169     s->sleepMs+=2;
1170     if (s->sleepMs>10) s->sleepMs=10;
1171   }
1172   /*Comm. thread will listen on sockets-- just sleep*/
1173   if (s->sleepMs>0) {
1174     MACHSTATE1(2,"idle lock(%d) {",CmiMyPe())
1175     CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
1176     MACHSTATE1(2,"} idle lock(%d)",CmiMyPe())
1177   }       
1178   MACHSTATE1(2,"still idle (%d) end {",CmiMyPe())
1179   }
1180 #endif
1181 }
1182
1183 /*==========================================================*/
1184 /*==========================================================*/
1185 /*==========================================================*/
1186
1187 /************ Recommended routines ***********************/
1188 /************ You dont have to implement these but they are supported
1189  in the converse syntax and some rare programs may crash. But most
1190  programs dont need them. *************/
1191
1192 CmiCommHandle CmiAsyncSendFn(int, int, char *){
1193   CmiAbort("CmiAsyncSendFn not implemented.");
1194   return (CmiCommHandle) 0;
1195 }
1196 CmiCommHandle CmiAsyncBroadcastFn(int, char *){
1197   CmiAbort("CmiAsyncBroadcastFn not implemented.");
1198   return (CmiCommHandle) 0;
1199 }
1200 CmiCommHandle CmiAsyncBroadcastAllFn(int, char *){
1201   CmiAbort("CmiAsyncBroadcastAllFn not implemented.");
1202   return (CmiCommHandle) 0;
1203 }
1204
1205 int           CmiAsyncMsgSent(CmiCommHandle handle){
1206   CmiAbort("CmiAsyncMsgSent not implemented.");
1207   return 0;
1208 }
1209 void          CmiReleaseCommHandle(CmiCommHandle handle){
1210   CmiAbort("CmiReleaseCommHandle not implemented.");
1211 }
1212
1213
1214 /*==========================================================*/
1215 /*==========================================================*/
1216 /*==========================================================*/
1217
1218 /* Optional routines which could use common code which is shared with
1219    other machine layer implementations. */
1220
1221 /* MULTICAST/VECTOR SENDING FUNCTIONS
1222
1223  * In relations to some flags, some other delivery functions may be needed.
1224  */
1225
1226 #if ! CMK_MULTICAST_LIST_USE_COMMON_CODE
1227
1228 void CmiSyncListSendFn(int npes, int *pes, int size, char *msg){
1229   char *copymsg;
1230   copymsg = (char *)CmiAlloc(size);
1231   memcpy(copymsg,msg,size);
1232   CmiFreeListSendFn(npes, pes, size, msg);
1233 }
1234
1235 void CmiFreeListSendFn(int npes, int *pes, int size, char *msg) {
1236   CMI_SET_BROADCAST_ROOT(msg,0);
1237   
1238   //printf("%d: In Free List Send Fn\n", CmiMyPe());
1239
1240   //CmiBecomeImmediate(msg);
1241
1242   int i;
1243   for(i=0; i<npes; i++) {
1244     if(pes[i] == CmiMyPe())
1245       CmiSyncSend(pes[i], size, msg);
1246   }
1247   
1248   for(i=0;i<npes;i++) {
1249     if(pes[i] == CmiMyPe());
1250     else if(i < npes - 1){
1251       CmiReference(msg);
1252       CmiSyncSendAndFree(pes[i], size, msg);
1253       //CmiSyncSend(pes[i], size, msg);
1254     }
1255 #if CMK_PERSISTENT_COMM
1256     if(phs) 
1257       phscount ++;
1258 #endif
1259   }
1260   
1261   if (npes  && (pes[npes-1] != CmiMyPe()))
1262     CmiSyncSendAndFree(pes[npes-1], size, msg);
1263   else 
1264     CmiFree(msg);
1265   
1266   phscount = 0;
1267   
1268   //  SendMsgsUntil(0,0);
1269   SendMsgsUntil(maxMessages, maxBytes);
1270 }
1271
1272 CmiCommHandle CmiAsyncListSendFn(int npes, int *pes, int size, char *msg){
1273   CmiAbort("CmiAsyncListSendFn not implemented.");
1274   return (CmiCommHandle) 0;
1275 }
1276 #endif
1277
1278 /** NODE SENDING FUNCTIONS
1279
1280  * If there is a node queue, and we consider also nodes as entity (tipically in
1281  * SMP versions), these functions are needed.
1282  */
1283
1284 #if CMK_NODE_QUEUE_AVAILABLE
1285
1286 void          CmiSyncNodeSendFn(int, int, char *);
1287 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
1288 void          CmiFreeNodeSendFn(int, int, char *);
1289
1290 void          CmiSyncNodeBroadcastFn(int, char *);
1291 CmiCommHandle CmiAsyncNodeBroadcastFn(int, char *);
1292 void          CmiFreeNodeBroadcastFn(int, char *);
1293
1294 void          CmiSyncNodeBroadcastAllFn(int, char *);
1295 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
1296 void          CmiFreeNodeBroadcastAllFn(int, char *);
1297
1298 #endif
1299
1300
1301 #if CMK_SHARED_VARS_POSIX_THREADS_SMP /*Used by the net-*-smp versions*/
1302
1303 int _Cmi_numpes;
1304 int _Cmi_mynodesize;
1305 int _Cmi_mynode;
1306 int _Cmi_numnodes;
1307
1308 int CmiMyPe();
1309 int CmiMyRank();
1310 int CmiNodeFirst(int node);
1311 int CmiNodeSize(int node);
1312 int CmiNodeOf(int pe);
1313 int CmiRankOf(int pe);
1314
1315 /* optional, these functions are implemented in "machine-smp.c", so including
1316    this file avoid the necessity to reimplement them.
1317  */
1318 void CmiNodeBarrier(void);
1319 void CmiNodeAllBarrier(void);
1320 CmiNodeLock CmiCreateLock();
1321 void CmiDestroyLock(CmiNodeLock lock);
1322
1323 #endif
1324
1325 /** IMMEDIATE MESSAGES
1326
1327  * If immediate messages are supported, the following function is needed. There
1328  * is an exeption if the machine progress is also defined (see later for this).
1329
1330  * Moreover, the file "immediate.c" should be included, otherwise all its
1331  * functions and variables have to be redefined.
1332 */
1333
1334 #if CMK_CCS_AVAILABLE
1335
1336 #include "immediate.c"
1337
1338 #if ! CMK_MACHINE_PROGRESS_DEFINED /* Hack for some machines */
1339 void CmiProbeImmediateMsg();
1340 #endif
1341
1342 #endif
1343
1344
1345 /** MACHINE PROGRESS DEFINED
1346
1347  * Some machines (like BlueGene/L) do not have coprocessors, and messages need
1348  * to be pulled out of the network manually. For this reason the following
1349  * functions are needed. Notice that the function "CmiProbeImmediateMsg" must
1350  * not be defined anymore.
1351  */
1352
1353 #if CMK_MACHINE_PROGRESS_DEFINED
1354
1355 CpvDeclare(int, networkProgressCount);
1356 int  networkProgressPeriod;
1357
1358 static unsigned long long lastProgress = 0;
1359
1360 void CmiMachineProgressImpl()
1361 {
1362   unsigned long long new_time = rts_get_timebase();
1363   
1364   if(new_time < lastProgress + progress_cycles) {
1365     lastProgress = new_time;
1366     return;
1367   }
1368
1369   lastProgress = new_time;
1370   
1371 #if !CMK_SMP
1372   AdvanceCommunications();
1373 #else
1374     /*Not implemented yet. Communication server does not seem to be
1375       thread safe */
1376     /* CommunicationServerThread(0); */
1377 #endif
1378 }
1379
1380 #endif
1381 /* Dummy implementation */
1382 extern "C" void CmiBarrier()
1383 {
1384   //BGLML_Torus_SimpleBarrier(_msgr);
1385   if(interrupts_enabled)
1386     BGLML_Tree_Barrier(_msgr, 1);
1387 }
1388