8fa3304d9a0487c276dd30830dfc8b4a9fa3dce5
[charm.git] / src / conv-core / msgq.h
1 #ifndef MSG_Q_H
2 #define MSG_Q_H
3
4 #include <deque>
5 #include <queue>
6 #include <ostream>
7
8 #if CMK_HAS_STD_UNORDERED_MAP
9 #include <unordered_map>
10 #else
11 #include <map>
12 #endif
13
14 namespace conv {
15
16 typedef void msg_t;
17
18 template <typename P>
19 inline P defaultprio(P *dummy_tag) { return 0; }
20
21
22 template <typename P = int>
23 class msgQ
24 {
25     public:
26         /// The datatype for msg priorities
27         typedef P prio_t;
28         /// The datatype of the index of the container storing msgs of a given priority
29         typedef short bktidx_t;
30         /// Yet another typedef. Just for terseness
31         typedef typename std::pair<prio_t, bktidx_t> prioidx_t;
32
33         ///
34         msgQ(): qSize(0) {}
35         ///
36         void enq(const msg_t *msg
37                 ,const prio_t &prio = defaultprio<prio_t>(0)
38                 ,const bool isFifo = true
39                 );
40         ///
41         const msg_t* deq();
42         ///
43         const msg_t* front() const;
44         ///
45         inline size_t size() const { return qSize; }
46         ///
47         inline bool empty() const { return (0 == qSize); }
48         ///
49         inline prio_t top_priority() const { return prios.top().first; }
50
51         ///
52         void enumerate(msg_t **first, msg_t **last) const;
53         ///
54         friend std::ostream& operator<< (std::ostream &out, const msgQ &q)
55         {
56             out <<"\nmsgQ[" << q.qSize << "]:";
57             out<<"\n";
58             return out;
59         }
60
61     private:
62         ///
63         size_t qSize;
64         /// Vector of msg buckets (each of them a deq)
65         std::vector< std::deque<const msg_t*> > msgbuckets;
66         /// A heap of distinct msg priorities
67         std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prios;
68         /// A mapping between priority values and the bucket indices
69         #if CMK_HAS_STD_UNORDERED_MAP
70         std::unordered_map<prio_t, int> prio2bktidx;
71         #else
72         std::map<prio_t, int> prio2bktidx;
73         #endif
74 };
75
76
77
78 template <typename P>
79 void msgQ<P>::enq(const msg_t *msg
80                  ,const prio_t &prio
81                  ,const bool isFifo
82                  )
83 {
84     // Find index of / create the bucket holding msgs of this priority
85     typename std::map<prio_t, int>::iterator itr = prio2bktidx.find(prio);
86     bktidx_t bktidx;
87     if (prio2bktidx.end() != itr)
88         bktidx = itr->second;
89     else
90     {
91         msgbuckets.push_back( std::deque<const msg_t*>() );
92         bktidx = msgbuckets.size() - 1;
93         prio2bktidx[prio] = bktidx;
94     }
95
96     // Access deq holding msgs of this priority
97     std::deque<const msg_t*> &q = msgbuckets[bktidx];
98     // If this deq is empty, insert corresponding priority into prioQ
99     if (q.empty())
100         prios.push( std::make_pair(prio, bktidx) );
101
102     // Enq msg either at front or back of deq
103     if (isFifo)
104         q.push_back(msg);
105     else
106         q.push_front(msg);
107     // Increment the total number of msgs in this container
108     qSize++;
109 }
110
111
112
113 template <typename P>
114 const msg_t* msgQ<P>::deq()
115 {
116     if (prios.empty())
117         return NULL;
118
119     // Get the index of the bucket holding the highest priority msgs
120     const bktidx_t &bktidx = prios.top().second;
121     std::deque<const msg_t*> &q = msgbuckets[bktidx];
122
123     // Assert that there is at least one msg corresponding to this priority
124     if (q.empty()) throw;
125     const msg_t *msg = q.front();
126     q.pop_front();
127     // If all msgs of the highest priority have been consumed, pop that priority from the priority Q
128     if (q.empty())
129         prios.pop();
130     // Decrement the total number of msgs in this container
131     qSize--;
132     return msg;
133 }
134
135
136
137 template <typename P>
138 const msg_t* msgQ<P>::front() const
139 {
140     if (prios.empty())
141         return NULL;
142     return msgbuckets[prios.top().second].front();
143 }
144
145
146
147 template <typename P>
148 void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
149 {
150     if (first >= last)
151         return;
152
153     msg_t **ptr = first;
154     for (int i=0; ptr != last && i <= msgbuckets.size(); i++)
155     {
156         std::deque<const msg_t*>::const_iterator itr = msgbuckets[i].begin();
157         while (ptr != last && itr != msgbuckets[i].end())
158             *ptr++ = (msg_t*) *itr++;
159     }
160 }
161
162 } // end namespace conv
163
164 #endif // MSG_Q_H
165