0eb0f3e9b244b54e4f69bb63ee3e19ff5724ec91
[charm.git] / src / arch / util / pcqueue.h
1 /** @file
2  * @brief Producer-Consumer Queues
3  * @ingroup Machine
4  *
5  * This queue implementation enables a producer and a consumer to
6  * communicate via a queue.  The queues are optimized for this situation,
7  * they don't require any operating system locks (they do require 32-bit
8  * reads and writes to be atomic.)  Cautions: there can only be one
9  * producer, and one consumer.  These queues cannot store null pointers.
10  *
11  ****************************************************************************/
12
13 /**
14  * \addtogroup Machine
15  * @{
16  */
17
18 #ifndef __PCQUEUE__
19 #define __PCQUEUE__
20
21 /*****************************************************************************
22  * #define PCQUEUE_LOCK
23  * PCQueue doesn't need any lock, the lock here is only 
24  * for debugging and testing purpose! it only make sense in smp version
25  ****************************************************************************/
26 #undef PCQUEUE_LOCK
27
28 #define PCQueueSize 0x100
29
30 typedef struct CircQueueStruct
31 {
32   struct CircQueueStruct *next;
33   int push;
34   int pull;
35   char *data[PCQueueSize];
36 }
37 *CircQueue;
38
39 typedef struct PCQueueStruct
40 {
41   CircQueue head;
42   CircQueue tail;
43   int  len;
44 #ifdef PCQUEUE_LOCK
45   CmiNodeLock  lock;
46 #endif
47 }
48 *PCQueue;
49
50 /* static CircQueue Cmi_freelist_circqueuestruct = 0;
51    static int freeCount = 0; */
52
53 #define FreeCircQueueStruct(dg) {\
54   CircQueue d;\
55   CmiMemLock();\
56   d=(dg);\
57   d->next = Cmi_freelist_circqueuestruct;\
58   Cmi_freelist_circqueuestruct = d;\
59   freeCount++;\
60   CmiMemUnlock();\
61 }
62
63 #define MallocCircQueueStruct(dg) {\
64   CircQueue d;\
65   CmiMemLock();\
66   d = Cmi_freelist_circqueuestruct;\
67   if (d==(CircQueue)0){\
68     d = ((CircQueue)calloc(1, sizeof(struct CircQueueStruct)));\
69   }\
70   else{\
71     freeCount--;\
72     Cmi_freelist_circqueuestruct = d->next;\
73     }\
74   dg = d;\
75   CmiMemUnlock();\
76 }
77
78 PCQueue PCQueueCreate(void)
79 {
80   CircQueue circ;
81   PCQueue Q;
82
83   /* MallocCircQueueStruct(circ); */
84   circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
85
86   Q = (PCQueue)malloc(sizeof(struct PCQueueStruct));
87   _MEMCHECK(Q);
88   Q->head = circ;
89   Q->tail = circ;
90   Q->len = 0;
91 #ifdef PCQUEUE_LOCK
92   Q->lock = CmiCreateLock();
93 #endif
94   return Q;
95 }
96
97 int PCQueueEmpty(PCQueue Q)
98 {
99   CircQueue circ = Q->head;
100   char *data = circ->data[circ->pull];
101   return (data == 0);
102 }
103
104 int PCQueueLength(PCQueue Q)
105 {
106   return Q->len;
107 }
108
109 char *PCQueuePop(PCQueue Q)
110 {
111   CircQueue circ; int pull; char *data;
112
113 #ifdef PCQUEUE_LOCK
114     CmiLock(Q->lock);
115 #endif
116     circ = Q->head;
117     pull = circ->pull;
118     data = circ->data[pull];
119 #if XT3_ONLY_PCQUEUE_WORKAROUND
120     if (data && (Q->len > 0)) {
121 #else
122     if (data) {
123 #endif
124       circ->pull = (pull + 1);
125       circ->data[pull] = 0;
126       if (pull == PCQueueSize - 1) { /* just pulled the data from the last slot
127                                      of this buffer */
128         Q->head = circ-> next; /* next buffer must exist, because "Push"  */
129         
130         /* FreeCircQueueStruct(circ); */
131         free(circ);
132         
133         /* links in the next buffer *before* filling */
134                                /* in the last slot. See below. */
135       }
136       Q->len --;
137 #ifdef PCQUEUE_LOCK
138       CmiUnlock(Q->lock);
139 #endif
140       return data;
141     }
142     else { /* queue seems to be empty. The producer may be adding something
143               to it, but its ok to report queue is empty. */
144 #ifdef PCQUEUE_LOCK
145       CmiUnlock(Q->lock);
146 #endif
147       return 0;
148     }
149 }
150
151 void PCQueuePush(PCQueue Q, char *data)
152 {
153   CircQueue circ, circ1; int push;
154   
155 #ifdef PCQUEUE_LOCK
156   CmiLock(Q->lock);
157 #endif
158   circ1 = Q->tail;
159   push = circ1->push;
160   if (push == (PCQueueSize -1)) { /* last slot is about to be filled */
161     /* this way, the next buffer is linked in before data is filled in 
162        in the last slot of this buffer */
163
164     circ = (CircQueue)calloc(1, sizeof(struct CircQueueStruct));
165     /* MallocCircQueueStruct(circ); */
166
167     Q->tail->next = circ;
168     Q->tail = circ;
169   }
170   circ1->data[push] = data;
171   circ1->push = (push + 1);
172   Q->len ++;
173 #ifdef PCQUEUE_LOCK
174   CmiUnlock(Q->lock);
175 #endif
176 }
177
178 #endif
179
180 /*@}*/