45d94484ad1a68f7c2abdae84661b1a1a7be190e
[charm.git] / src / conv-core / msgq.h
1 #ifndef MSG_Q_H
2 #define MSG_Q_H
3
4 #include <deque>
5 #include <queue>
6 #include <ostream>
7
8 #if CMK_HAS_STD_UNORDERED_MAP
9 #include <unordered_map>
10 #else
11 #include <map>
12 #endif
13
14 namespace conv {
15
16 // Some day, messages may be handled as something other than void* within the runtime.
17 // Prepare for that day, while enhancing readability today.
18 typedef void msg_t;
19
20 /**
21  * Charm Message Queue: Holds msg pointers and returns the next message to execute on a PE
22  *
23  * Templated on the type of the priority field. Defaults to int priority.
24  * All scheduling policies are encapsulated behind this queue interface.
25  *
26  * All messages of a given priority p are stored in a single container. Since
27  * each message can be enqueued either to the front or back of this container,
28  * a dequeue is used. Each such dequeue is referred to as a bucket.
29  * The set of priority values of all the messages in the container is stored in
30  * a min-heap. A deq() operation simply peeks at the most important prio
31  * value, and finds the bucket associated with that value. It then dequeues the
32  * message at the front of this bucket.
33  * A mapping between the priority values and the corresponding buckets is
34  * maintained. enq() operations simply find the bucket corresponding to a prio
35  * value and place the msg into it.
36  */
37 template <typename P = int>
38 class msgQ
39 {
40     public:
41         /// The datatype for msg priorities
42         typedef P prio_t;
43
44         /// Hardly any initialization required
45         msgQ(): qSize(0) {}
46
47         /// Given a message (optionally with a priority and queuing policy), enqueue it for delivery
48         void enq(const msg_t *msg, const prio_t &prio = prio_t(), const bool isFifo = true);
49
50         /// Pop (and return) the next message to deliver
51         const msg_t* deq();
52
53         /// Return ptr to message that is next in line for delivery. Does not deq() the msg
54         const msg_t* front() const
55         {
56             if (prioQ.empty())
57                 return NULL;
58             return msgbuckets[prioQ.top().second].front();
59         }
60
61         /// Number of messages in the queue
62         inline size_t size() const { return qSize; }
63
64         /// Is the queue empty?
65         inline bool empty() const { return (0 == qSize); }
66
67         /// Returns the value of the highest priority amongst all the messages in the queue
68         inline prio_t top_priority() const { return prioQ.top().first; }
69
70         /// Just so that we can support CqsEnumerateQueue()
71         void enumerate(msg_t **first, msg_t **last) const;
72
73         /// An ostream operator overload, that currently just prints q size
74         friend std::ostream& operator<< (std::ostream &out, const msgQ &q)
75         {
76             out <<"\nmsgQ[" << q.qSize << "]:";
77             out<<"\n";
78             return out;
79         }
80
81     private:
82         /// The size of this message queue
83         size_t qSize;
84
85         /// Collection of msg buckets, each holding msgs of a given priority
86         std::vector< std::deque<const msg_t*> > msgbuckets;
87
88         /// The type of the index into the container of message buckets
89         typedef short bktidx_t;
90         /// A key-val pair of a priority value and the index to the bucket of msgs of that priority
91         typedef typename std::pair<prio_t, bktidx_t> prioidx_t;
92
93         /// A _min_ heap of distinct msg priorities along with the matching bucket indices
94         std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
95
96         /// A mapping between priority values and bucket indices, to locate buckets given a priority (used in enq)
97         #if CMK_HAS_STD_UNORDERED_MAP
98         std::unordered_map<prio_t, bktidx_t> prio2bktidx;
99         #else
100         std::map<prio_t, bktidx_t> prio2bktidx;
101         #endif
102 };
103
104
105
106 template <typename P>
107 void msgQ<P>::enq(const msg_t *msg, const prio_t &prio, const bool isFifo)
108 {
109     // Find index of / create the bucket holding msgs of this priority
110     #if CMK_HAS_STD_UNORDERED_MAP
111     typename std::unordered_map<prio_t, bktidx_t>::iterator
112     #else
113     typename std::map<prio_t, bktidx_t>::iterator
114     #endif
115     itr = prio2bktidx.find(prio);
116     bktidx_t bktidx;
117     if (prio2bktidx.end() != itr)
118         bktidx = itr->second;
119     else
120     {
121         msgbuckets.push_back( std::deque<const msg_t*>() );
122         bktidx = msgbuckets.size() - 1;
123         prio2bktidx[prio] = bktidx;
124     }
125
126     // Access deq holding msgs of this priority
127     std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
128     // If this deq is empty, insert corresponding priority into prioQ
129     if (bkt.empty())
130         prioQ.push( std::make_pair(prio, bktidx) );
131
132     // Enq msg either at front or back of deq
133     isFifo ? bkt.push_back(msg) : bkt.push_front(msg);
134     // Increment the total number of msgs in this container
135     qSize++;
136 }
137
138
139
140 template <typename P>
141 const msg_t* msgQ<P>::deq()
142 {
143     if (prioQ.empty())
144         return NULL;
145
146     // Get the index of the bucket holding the highest priority msgs
147     const bktidx_t &bktidx = prioQ.top().second;
148     std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
149
150     // Assert that there is at least one msg corresponding to this priority
151     //if (bkt.empty()) throw;
152
153     // Pop msg from the front of the deque
154     const msg_t *msg = bkt.front();
155     bkt.pop_front();
156     // If all msgs of the highest priority have been consumed, pop that priority from the priority Q
157     if (bkt.empty())
158         prioQ.pop();
159     // Decrement the total number of msgs in this container
160     qSize--;
161     return msg;
162 }
163
164
165
166 template <typename P>
167 void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
168 {
169     if (first >= last)
170         return;
171
172     msg_t **ptr = first;
173     for (int i=0; ptr != last && i <= msgbuckets.size(); i++)
174     {
175         std::deque<const msg_t*>::const_iterator itr = msgbuckets[i].begin();
176         while (ptr != last && itr != msgbuckets[i].end())
177             *ptr++ = (msg_t*) *itr++;
178     }
179 }
180
181 } // end namespace conv
182
183 #endif // MSG_Q_H
184