Merge optimizations/fixes/changes from pami-bgq branch.
[charm.git] / src / arch / pami-bluegeneq / L2AtomicQueue.h
1
2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
4
5 #include <pthread.h>
6 #include <stdio.h>
7 #include <assert.h>
8 #include <stdint.h>
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
11 #include "pcqueue.h"
12
13 #define DEFAULT_SIZE         1024
14 #define L2_ATOMIC_FULL        0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY       0x8000000000000000UL
16
17 typedef  void* L2AtomicQueueElement;
18
19 typedef struct _l2atomicstate {
20   volatile uint64_t Consumer;   // not used atomically
21   volatile uint64_t Producer;
22   volatile uint64_t UpperBound;
23   volatile uint64_t Flush;      // contents not used
24 } L2AtomicState;
25
26 typedef struct _l2atomicq {
27   L2AtomicState               * _l2state;
28   volatile void * volatile    * _array;
29   PCQueue                       _overflowQ;
30   pthread_mutex_t               _overflowMutex;
31 } L2AtomicQueue;
32
33 void L2AtomicQueueInit(void *l2mem, size_t l2memsize, L2AtomicQueue *queue) {
34   pami_result_t rc;
35   
36   //Verify counter array is 64-byte aligned 
37   assert( (((uintptr_t) l2mem) & (0x1F)) == 0 );  
38   assert (sizeof(L2AtomicState) <= l2memsize);
39   
40   queue->_l2state = (L2AtomicState *)l2mem;
41   pthread_mutex_init(&queue->_overflowMutex, NULL);
42   queue->_overflowQ = PCQueueCreate();
43   L2_AtomicStore(&queue->_l2state->Consumer, 0);
44   L2_AtomicStore(&queue->_l2state->Producer, 0);
45   L2_AtomicStore(&queue->_l2state->UpperBound, DEFAULT_SIZE);
46   
47   rc = posix_memalign ((void **)&queue->_array,
48                        64, /*L1 line size for BG/Q */
49                        sizeof(L2AtomicQueueElement) * DEFAULT_SIZE);
50
51   assert(rc == PAMI_SUCCESS);
52   memset((void*)queue->_array, 0, sizeof(L2AtomicQueueElement)*DEFAULT_SIZE);
53 }
54
55 void L2AtomicEnqueue (L2AtomicQueue          * queue,
56                       void                   * element) 
57 {
58   //fprintf(stderr,"Insert message %p\n", element);
59
60   uint64_t index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer);
61   ppc_msync();
62   if (index != L2_ATOMIC_FULL) {
63     queue->_array[index & (DEFAULT_SIZE-1)] = element;
64     return;
65   }
66   
67   pthread_mutex_lock(&queue->_overflowMutex);
68   // must check again to avoid race
69   if ((index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer)) != L2_ATOMIC_FULL) {
70     queue->_array[index & (DEFAULT_SIZE-1)] = element;
71   } else {
72     PCQueuePush(queue->_overflowQ, element);
73   }
74   pthread_mutex_unlock(&queue->_overflowMutex);
75 }
76
77 void * L2AtomicDequeue (L2AtomicQueue    *queue)
78 {
79   uint64_t head, tail;
80   tail = queue->_l2state->Producer;
81   head = queue->_l2state->Consumer;
82
83   volatile void *e = NULL;
84   if (head < tail) {    
85     e = queue->_array[head & (DEFAULT_SIZE-1)];
86     while (e == NULL) 
87       e = queue->_array[head & (DEFAULT_SIZE-1)];
88
89     //fprintf(stderr,"Found message %p\n", e);
90
91     queue->_array[head & (DEFAULT_SIZE-1)] = NULL;
92     ppc_msync();
93
94     head ++;
95     queue->_l2state->Consumer = head;    
96     
97     if (head == tail) {
98       pthread_mutex_lock(&queue->_overflowMutex);      
99       if (PCQueueLength(queue->_overflowQ) == 0) {
100         uint64_t n = head + DEFAULT_SIZE;
101         // is atomic-store needed?
102         L2_AtomicStore(&queue->_l2state->UpperBound, n);
103       }
104       pthread_mutex_unlock(&queue->_overflowMutex);
105     }
106     return (void*) e;
107   }
108   
109   /* head == tail (head cannot be greater than tail) */
110   if (PCQueueLength(queue->_overflowQ) > 0) {
111     pthread_mutex_lock(&queue->_overflowMutex);      
112     e = PCQueuePop (queue->_overflowQ);    
113     if (PCQueueLength(queue->_overflowQ) == 0) {
114       uint64_t n = head + DEFAULT_SIZE;
115       // is atomic-store needed?
116       L2_AtomicStore(&queue->_l2state->UpperBound, n);
117     }
118     pthread_mutex_unlock(&queue->_overflowMutex);      
119     
120     return (void *) e;
121   }
122
123   return (void *) e;
124 }
125
126 int L2AtomicQueueEmpty (L2AtomicQueue *queue) {
127   return ( (PCQueueLength(queue->_overflowQ) == 0) &&
128            (queue->_l2state->Producer == queue->_l2state->Consumer) );
129 }
130
131 #endif