msgQ: user iterator for appropriate container (map/unordered_map)
[charm.git] / src / conv-core / msgq.h
1 #ifndef MSG_Q_H
2 #define MSG_Q_H
3
4 #include <deque>
5 #include <queue>
6 #include <ostream>
7
8 #if CMK_HAS_STD_UNORDERED_MAP
9 #include <unordered_map>
10 #else
11 #include <map>
12 #endif
13
14 namespace conv {
15
16 // Some day, messages may be handled as something other than void* within the runtime.
17 // Prepare for that day.This also enhances readability
18 typedef void msg_t;
19
20 /**
21  * Charm Message Queue: Holds msg pointers and returns the next message to execute on a PE
22  *
23  * Templated on the type of the priority field. Defaults to int priority.
24  * All scheduling policies are encapsulated behind this queue interface.
25  */
26 template <typename P = int>
27 class msgQ
28 {
29     public:
30         /// The datatype for msg priorities
31         typedef P prio_t;
32
33         /// Hardly any initialization required
34         msgQ(): qSize(0) {}
35
36         /// Given a message (optionally with a priority and queuing policy), enqueue it for delivery
37         void enq(const msg_t *msg
38                 ,const prio_t &prio = prio_t()
39                 ,const bool isFifo = true
40                 );
41
42         /// Pop (and return) the next message to deliver
43         const msg_t* deq();
44
45         /// Return ptr to message that is next in line for delivery. Does not deq() the msg
46         const msg_t* front() const
47         {
48             if (prioQ.empty())
49                 return NULL;
50             return msgbuckets[prioQ.top().second].front();
51         }
52
53         /// Number of messages in the queue
54         inline size_t size() const { return qSize; }
55
56         /// Is the queue empty?
57         inline bool empty() const { return (0 == qSize); }
58
59         /** Returns the value of the highest priority amongst all the messages in the queue
60          *
61          * @note: Depending on scheduling policy, this may or may not be the priority of the
62          * next msg in line delivery. However, the default scheduling policy does return a msg
63          * of this priority.
64          */
65         inline prio_t top_priority() const { return prioQ.top().first; }
66
67         /// Just so that we can support CqsEnumerateQueue()
68         void enumerate(msg_t **first, msg_t **last) const;
69
70         /// An ostream operator overload, that currently just prints q size
71         friend std::ostream& operator<< (std::ostream &out, const msgQ &q)
72         {
73             out <<"\nmsgQ[" << q.qSize << "]:";
74             out<<"\n";
75             return out;
76         }
77
78     private:
79         /// The size of this message queue
80         size_t qSize;
81
82         /// Collection of msg buckets, each holding msgs of a given priority
83         std::vector< std::deque<const msg_t*> > msgbuckets;
84
85         /// The type of the index into the container of message buckets
86         typedef short bktidx_t;
87         /// A key-val pair of a priority value and the index to the bucket of msgs of that priority
88         typedef typename std::pair<prio_t, bktidx_t> prioidx_t;
89
90         /// A _min_ heap of distinct msg priorities along with the matching bucket indices
91         std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
92
93         /// A mapping between priority values and bucket indices, to locate buckets given a priority (used in enq)
94         #if CMK_HAS_STD_UNORDERED_MAP
95         std::unordered_map<prio_t, bktidx_t> prio2bktidx;
96         #else
97         std::map<prio_t, bktidx_t> prio2bktidx;
98         #endif
99 };
100
101
102
103 template <typename P>
104 void msgQ<P>::enq(const msg_t *msg
105                  ,const prio_t &prio
106                  ,const bool isFifo
107                  )
108 {
109     // Find index of / create the bucket holding msgs of this priority
110     #if CMK_HAS_STD_UNORDERED_MAP
111     typename std::unordered_map<prio_t, bktidx_t>::iterator
112     #else
113     typename std::map<prio_t, bktidx_t>::iterator
114     #endif
115     itr = prio2bktidx.find(prio);
116     bktidx_t bktidx;
117     if (prio2bktidx.end() != itr)
118         bktidx = itr->second;
119     else
120     {
121         msgbuckets.push_back( std::deque<const msg_t*>() );
122         bktidx = msgbuckets.size() - 1;
123         prio2bktidx[prio] = bktidx;
124     }
125
126     // Access deq holding msgs of this priority
127     std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
128     // If this deq is empty, insert corresponding priority into prioQ
129     if (bkt.empty())
130         prioQ.push( std::make_pair(prio, bktidx) );
131
132     // Enq msg either at front or back of deq
133     if (isFifo)
134         bkt.push_back(msg);
135     else
136         bkt.push_front(msg);
137     // Increment the total number of msgs in this container
138     qSize++;
139 }
140
141
142
143 template <typename P>
144 const msg_t* msgQ<P>::deq()
145 {
146     if (prioQ.empty())
147         return NULL;
148
149     // Get the index of the bucket holding the highest priority msgs
150     const bktidx_t &bktidx = prioQ.top().second;
151     std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
152
153     // Assert that there is at least one msg corresponding to this priority
154     //if (bkt.empty()) throw;
155
156     // Pop msg from the front of the deque
157     const msg_t *msg = bkt.front();
158     bkt.pop_front();
159     // If all msgs of the highest priority have been consumed, pop that priority from the priority Q
160     if (bkt.empty())
161         prioQ.pop();
162     // Decrement the total number of msgs in this container
163     qSize--;
164     return msg;
165 }
166
167
168
169 template <typename P>
170 void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
171 {
172     if (first >= last)
173         return;
174
175     msg_t **ptr = first;
176     for (int i=0; ptr != last && i <= msgbuckets.size(); i++)
177     {
178         std::deque<const msg_t*>::const_iterator itr = msgbuckets[i].begin();
179         while (ptr != last && itr != msgbuckets[i].end())
180             *ptr++ = (msg_t*) *itr++;
181     }
182 }
183
184 } // end namespace conv
185
186 #endif // MSG_Q_H
187