Added a new gcc4 configuration for the XT3 machine option only.
[charm.git] / src / arch / util / pcqueue.h
1 /** @file
2  * @brief Producer-Consumer Queues
3  * @ingroup Machine
4  *
5  * This queue implementation enables a producer and a consumer to
6  * communicate via a queue.  The queues are optimized for this situation,
7  * they don't require any operating system locks (they do require 32-bit
8  * reads and writes to be atomic.)  Cautions: there can only be one
9  * producer, and one consumer.  These queues cannot store null pointers.
10  *
11  ****************************************************************************/
12
13 /**
14  * \addtogroup Machine
15  * @{
16  */
17
18 #ifndef __PCQUEUE__
19 #define __PCQUEUE__
20
21 /*****************************************************************************
22  * #define PCQUEUE_LOCK
23  * PCQueue doesn't need any lock, the lock here is only 
24  * for debugging and testing purpose! it only make sense in smp version
25  ****************************************************************************/
26 #undef PCQUEUE_LOCK
27
28 #define PCQueueSize 0x100
29
30 typedef struct CircQueueStruct
31 {
32   struct CircQueueStruct *next;
33   int push;
34   int pull;
35   char *data[PCQueueSize];
36 }
37 *CircQueue;
38
39 typedef struct PCQueueStruct
40 {
41   CircQueue head;
42   CircQueue tail;
43   int  len;
44 #ifdef PCQUEUE_LOCK
45   CmiNodeLock  lock;
46 #endif
47 }
48 *PCQueue;
49
50 /* static CircQueue Cmi_freelist_circqueuestruct = 0;
51    static int freeCount = 0; */
52
53 #define FreeCircQueueStruct(dg) {\
54   CircQueue d;\
55   CmiMemLock();\
56   d=(dg);\
57   d->next = Cmi_freelist_circqueuestruct;\
58   Cmi_freelist_circqueuestruct = d;\
59   freeCount++;\
60   CmiMemUnlock();\
61 }
62
63 #if !CMK_XT3
64 #define MallocCircQueueStruct(dg) {\
65   CircQueue d;\
66   CmiMemLock();\
67   d = Cmi_freelist_circqueuestruct;\
68   if (d==(CircQueue)0){\
69     d = ((CircQueue)calloc(1, sizeof(struct CircQueueStruct))); \
70   }\
71   else{\
72     freeCount--;\
73     Cmi_freelist_circqueuestruct = d->next;\
74     }\
75   dg = d;\
76   CmiMemUnlock();\
77 }
78 #else
79 #define MallocCircQueueStruct(dg) {\
80   CircQueue d;\
81   CmiMemLock();\
82   d = Cmi_freelist_circqueuestruct;\
83   if (d==(CircQueue)0){\
84     d = ((CircQueue)malloc(sizeof(struct CircQueueStruct))); \
85     d = ((CircQueue)memset(d, 0, sizeof(struct CircQueueStruct))); \
86   }\
87   else{\
88     freeCount--;\
89     Cmi_freelist_circqueuestruct = d->next;\
90     }\
91   dg = d;\
92   CmiMemUnlock();\
93 }
94 #endif
95
96 PCQueue PCQueueCreate(void)
97 {
98   CircQueue circ;
99   PCQueue Q;
100
101   /* MallocCircQueueStruct(circ); */
102 #if !CMK_XT3
103   circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
104 #else
105   circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
106   circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
107 #endif
108   Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
109   _MEMCHECK(Q);
110   Q->head = circ;
111   Q->tail = circ;
112   Q->len = 0;
113 #ifdef PCQUEUE_LOCK
114   Q->lock = CmiCreateLock();
115 #endif
116   return Q;
117 }
118
119 int PCQueueEmpty(PCQueue Q)
120 {
121   CircQueue circ = Q->head;
122   char *data = circ->data[circ->pull];
123   return (data == 0);
124 }
125
126 int PCQueueLength(PCQueue Q)
127 {
128   return Q->len;
129 }
130
131 char *PCQueuePop(PCQueue Q)
132 {
133   CircQueue circ; int pull; char *data;
134
135 #ifdef PCQUEUE_LOCK
136     CmiLock(Q->lock);
137 #endif
138     circ = Q->head;
139     pull = circ->pull;
140     data = circ->data[pull];
141 #if XT3_ONLY_PCQUEUE_WORKAROUND
142     if (data && (Q->len > 0)) {
143 #else
144     if (data) {
145 #endif
146       circ->pull = (pull + 1);
147       circ->data[pull] = 0;
148       if (pull == PCQueueSize - 1) { /* just pulled the data from the last slot
149                                      of this buffer */
150         Q->head = circ-> next; /* next buffer must exist, because "Push"  */
151         
152         /* FreeCircQueueStruct(circ); */
153         free(circ);
154         
155         /* links in the next buffer *before* filling */
156                                /* in the last slot. See below. */
157       }
158       Q->len --;
159 #ifdef PCQUEUE_LOCK
160       CmiUnlock(Q->lock);
161 #endif
162       return data;
163     }
164     else { /* queue seems to be empty. The producer may be adding something
165               to it, but its ok to report queue is empty. */
166 #ifdef PCQUEUE_LOCK
167       CmiUnlock(Q->lock);
168 #endif
169       return 0;
170     }
171 }
172
173 void PCQueuePush(PCQueue Q, char *data)
174 {
175   CircQueue circ, circ1; int push;
176   
177 #ifdef PCQUEUE_LOCK
178   CmiLock(Q->lock);
179 #endif
180   circ1 = Q->tail;
181   push = circ1->push;
182   if (push == (PCQueueSize -1)) { /* last slot is about to be filled */
183     /* this way, the next buffer is linked in before data is filled in 
184        in the last slot of this buffer */
185
186 #if CMK_XT3
187     circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
188 #else
189     circ = (CircQueue)malloc(sizeof(struct CircQueueStruct));
190     circ = (CircQueue)memset(circ, 0, sizeof(struct CircQueueStruct));
191 #endif
192     /* MallocCircQueueStruct(circ); */
193
194     Q->tail->next = circ;
195     Q->tail = circ;
196   }
197   circ1->data[push] = data;
198   circ1->push = (push + 1);
199   Q->len ++;
200 #ifdef PCQUEUE_LOCK
201   CmiUnlock(Q->lock);
202 #endif
203 }
204
205 #endif
206
207 /*@}*/