minor
[charm.git] / src / arch / pami-bluegeneq / L2AtomicQueue.h
1
2 #ifndef __L2_ATOMIC_QUEUE__
3 #define __L2_ATOMIC_QUEUE__
4
5 #include <pthread.h>
6 #include <stdio.h>
7 #include <assert.h>
8 #include <stdint.h>
9 #include "spi/include/l2/atomic.h"
10 #include "spi/include/l1p/flush.h"
11 #include "pcqueue.h"
12
13 #define DEFAULT_SIZE         1024
14 #define L2_ATOMIC_FULL        0x8000000000000000UL
15 #define L2_ATOMIC_EMPTY       0x8000000000000000UL
16
17 #define L2A_SUCCESS  0
18 #define L2A_EAGAIN  -1
19 #define L2A_FAIL    -2
20
21 typedef  void* L2AtomicQueueElement;
22
23 typedef struct _l2atomicstate {
24   volatile uint64_t Consumer;   // not used atomically
25   volatile uint64_t Producer;
26   volatile uint64_t UpperBound;
27   volatile uint64_t Flush;      // contents not used
28 } L2AtomicState;
29
30 typedef struct _l2atomicq {
31   L2AtomicState               * _l2state;
32   volatile void * volatile    * _array;
33   int                           _useOverflowQ;
34   int                           _qsize;
35   PCQueue                       _overflowQ;
36   pthread_mutex_t               _overflowMutex;
37 } L2AtomicQueue;
38
39 void L2AtomicQueueInit      (void           * l2mem, 
40                              size_t           l2memsize, 
41                              L2AtomicQueue  * queue,
42                              int              use_overflow,
43                              int              nelem) 
44 {
45   pami_result_t rc;
46   
47   //Verify counter array is 64-byte aligned 
48   assert( (((uintptr_t) l2mem) & (0x1F)) == 0 );  
49   assert (sizeof(L2AtomicState) <= l2memsize);
50   
51   queue->_useOverflowQ = use_overflow;
52
53   int qsize = 2;
54   while (qsize < nelem) 
55     qsize *= 2;
56   queue->_qsize = qsize;
57
58   queue->_l2state = (L2AtomicState *)l2mem;
59   pthread_mutex_init(&queue->_overflowMutex, NULL);
60   queue->_overflowQ = PCQueueCreate();
61   L2_AtomicStore(&queue->_l2state->Consumer, 0);
62   L2_AtomicStore(&queue->_l2state->Producer, 0);
63   L2_AtomicStore(&queue->_l2state->UpperBound, qsize);
64   
65   rc = posix_memalign ((void **)&queue->_array,
66                        64, /*L1 line size for BG/Q */
67                        sizeof(L2AtomicQueueElement) * qsize);
68
69   assert(rc == PAMI_SUCCESS);
70   memset((void*)queue->_array, 0, sizeof(L2AtomicQueueElement)*qsize);
71 }
72
73 int L2AtomicEnqueue (L2AtomicQueue          * queue,
74                      void                   * element) 
75 {
76   //fprintf(stderr,"Insert message %p\n", element);
77
78   register int qsize_1 = queue->_qsize - 1;
79   uint64_t index = L2_AtomicLoadIncrementBounded(&queue->_l2state->Producer);
80   L1P_FlushRequests();
81   if (index != L2_ATOMIC_FULL) {
82     queue->_array[index & qsize_1] = element;
83     return L2A_SUCCESS;
84   }
85   
86   //We dont want to use the overflow queue
87   if (!queue->_useOverflowQ)
88     return L2A_EAGAIN; //Q is full, try later
89   
90   //No ordering is guaranteed if there is overflow
91   pthread_mutex_lock(&queue->_overflowMutex);
92   PCQueuePush(queue->_overflowQ, element);
93   pthread_mutex_unlock(&queue->_overflowMutex);
94   
95   return L2A_SUCCESS;
96 }
97
98 void * L2AtomicDequeue (L2AtomicQueue    *queue)
99 {
100   uint64_t head, tail;
101   tail = queue->_l2state->Producer;
102   head = queue->_l2state->Consumer;
103   register int qsize_1 = queue->_qsize-1;
104
105   volatile void *e = NULL;
106   if (head < tail) {    
107     e = queue->_array[head & qsize_1];
108     while (e == NULL) 
109       e = queue->_array[head & qsize_1];
110
111     //fprintf(stderr,"Found message %p\n", e);
112
113     queue->_array[head & qsize_1] = NULL;
114     ppc_msync();
115
116     head ++;
117     queue->_l2state->Consumer = head;    
118     
119     //Charm++ does not require message ordering
120     //So we dont acquire overflow mutex here
121     uint64_t n = head + queue->_qsize;
122     // is atomic-store needed?
123     L2_AtomicStore(&queue->_l2state->UpperBound, n);
124     return (void*) e;
125   }
126
127   //We dont have an overflowQ
128   if (!queue->_useOverflowQ)
129     return NULL;
130   
131   /* head == tail (head cannot be greater than tail) */
132   if (PCQueueLength(queue->_overflowQ) > 0) {
133     pthread_mutex_lock(&queue->_overflowMutex);      
134     e = PCQueuePop (queue->_overflowQ);    
135     pthread_mutex_unlock(&queue->_overflowMutex);      
136     
137     return (void *) e;
138   }
139
140   return (void *) e;
141 }
142
143 int L2AtomicQueueEmpty (L2AtomicQueue *queue) {
144   return ( (PCQueueLength(queue->_overflowQ) == 0) &&
145            (queue->_l2state->Producer == queue->_l2state->Consumer) );
146 }
147
148 //spin block in the L2 atomic queue till there is a message. fail and
149 //return after n iterations
150 int L2AtomicQueueSpinWait (L2AtomicQueue    * queue,
151                            int                n)
152 {
153   if (!L2AtomicQueueEmpty(queue))
154     return 0;  //queue is not empty so return
155   
156   uint64_t head, tail;
157   head = queue->_l2state->Consumer;
158   
159   size_t i = n;
160   do {
161     tail = queue->_l2state->Producer;    
162     i--;
163   }
164   //While the queue is empty and i < n
165   while (head == tail && i != 0);
166   
167   return 0; //fail queue is empty
168 }
169
170 //spin block in the L2 atomic queue till there is a message. fail and
171 //return after n iterations
172 int L2AtomicQueue2QSpinWait (L2AtomicQueue    * queue0,
173                              L2AtomicQueue    * queue1,
174                              int                n)
175 {
176   if (!L2AtomicQueueEmpty(queue0))
177     return 0;  //queue0 is not empty so return
178   
179   if (!L2AtomicQueueEmpty(queue1))
180     return 0;  //queue is not empty so return  
181
182   uint64_t head0, tail0;
183   uint64_t head1, tail1;
184   
185   head0 = queue0->_l2state->Consumer;  
186   head1 = queue1->_l2state->Consumer;
187   
188   size_t i = n;
189   do {
190     tail0 = queue0->_l2state->Producer;    
191     tail1 = queue1->_l2state->Producer;    
192     i --;
193   } while (head0==tail0 && head1==tail1 && i!=0);   
194  
195   return 0; 
196 }
197
198
199
200 #endif