msgQ: simplify enq and deq by eliminating unnecessary vector of msg buckets
authorRamprasad Venkataraman <ramv@illinois.edu>
Sun, 28 Oct 2012 23:19:50 +0000 (18:19 -0500)
committerRamprasad Venkataraman <ramv@illinois.edu>
Mon, 29 Oct 2012 00:08:37 +0000 (19:08 -0500)
The original intent of a vector of msg buckets was to permit constant-time deq()
and to permit features such as a randomized message queue. The implementation
now eschews this additional container in favor of simply storing pointers to the
buckets directly. The constant-time deq() is still retained, while randomized queues
will have to jump through a few additional hoops.

src/conv-core/msgq.h

index 45d94484ad1a68f7c2abdae84661b1a1a7be190e..762abb1890b49e7765eaff07bd723fbfb852fc23 100644 (file)
@@ -51,12 +51,7 @@ class msgQ
         const msg_t* deq();
 
         /// Return ptr to message that is next in line for delivery. Does not deq() the msg
-        const msg_t* front() const
-        {
-            if (prioQ.empty())
-                return NULL;
-            return msgbuckets[prioQ.top().second].front();
-        }
+        inline const msg_t* front() const { return empty() ? NULL : prioQ.top().second->front(); }
 
         /// Number of messages in the queue
         inline size_t size() const { return qSize; }
@@ -79,26 +74,23 @@ class msgQ
         }
 
     private:
-        /// The size of this message queue
-        size_t qSize;
-
-        /// Collection of msg buckets, each holding msgs of a given priority
-        std::vector< std::deque<const msg_t*> > msgbuckets;
-
-        /// The type of the index into the container of message buckets
-        typedef short bktidx_t;
-        /// A key-val pair of a priority value and the index to the bucket of msgs of that priority
-        typedef typename std::pair<prio_t, bktidx_t> prioidx_t;
-
-        /// A _min_ heap of distinct msg priorities along with the matching bucket indices
-        std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
-
-        /// A mapping between priority values and bucket indices, to locate buckets given a priority (used in enq)
+        /// Message bucket type
+        typedef std::deque<const msg_t*> bkt_t;
+        /// A key-val pair of a priority value and a handle to the bucket of msgs of that priority
+        typedef typename std::pair<prio_t, bkt_t*> prioidx_t;
+        /// A type for mapping between priority values and msg buckets
         #if CMK_HAS_STD_UNORDERED_MAP
-        std::unordered_map<prio_t, bktidx_t> prio2bktidx;
+        typedef typename std::unordered_map<prio_t, bkt_t> bktmap_t;
         #else
-        std::map<prio_t, bktidx_t> prio2bktidx;
+        typedef typename std::map<prio_t, bkt_t> bktmap_t;
         #endif
+
+        /// The size of this message queue
+        size_t qSize;
+        /// Collection of msg buckets, each holding msgs of a given priority (maps priorities to buckets)
+        bktmap_t msgbuckets;
+        /// A _min_ heap of distinct msg priorities along with handles to matching buckets
+        std::priority_queue<prioidx_t, std::vector<prioidx_t>, std::greater<prioidx_t> > prioQ;
 };
 
 
@@ -106,29 +98,11 @@ class msgQ
 template <typename P>
 void msgQ<P>::enq(const msg_t *msg, const prio_t &prio, const bool isFifo)
 {
-    // Find index of / create the bucket holding msgs of this priority
-    #if CMK_HAS_STD_UNORDERED_MAP
-    typename std::unordered_map<prio_t, bktidx_t>::iterator
-    #else
-    typename std::map<prio_t, bktidx_t>::iterator
-    #endif
-    itr = prio2bktidx.find(prio);
-    bktidx_t bktidx;
-    if (prio2bktidx.end() != itr)
-        bktidx = itr->second;
-    else
-    {
-        msgbuckets.push_back( std::deque<const msg_t*>() );
-        bktidx = msgbuckets.size() - 1;
-        prio2bktidx[prio] = bktidx;
-    }
-
-    // Access deq holding msgs of this priority
-    std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
+    // Find / create the bucket holding msgs of this priority
+    bkt_t &bkt = msgbuckets[prio];
     // If this deq is empty, insert corresponding priority into prioQ
     if (bkt.empty())
-        prioQ.push( std::make_pair(prio, bktidx) );
-
+        prioQ.push( std::make_pair(prio, &bkt) );
     // Enq msg either at front or back of deq
     isFifo ? bkt.push_back(msg) : bkt.push_front(msg);
     // Increment the total number of msgs in this container
@@ -140,20 +114,14 @@ void msgQ<P>::enq(const msg_t *msg, const prio_t &prio, const bool isFifo)
 template <typename P>
 const msg_t* msgQ<P>::deq()
 {
-    if (prioQ.empty())
+    if (empty())
         return NULL;
-
-    // Get the index of the bucket holding the highest priority msgs
-    const bktidx_t &bktidx = prioQ.top().second;
-    std::deque<const msg_t*> &bkt = msgbuckets[bktidx];
-
-    // Assert that there is at least one msg corresponding to this priority
-    //if (bkt.empty()) throw;
-
+    // Find the bucket holding the highest priority msgs
+    bkt_t &bkt = * prioQ.top().second;
     // Pop msg from the front of the deque
     const msg_t *msg = bkt.front();
     bkt.pop_front();
-    // If all msgs of the highest priority have been consumed, pop that priority from the priority Q
+    // If all msgs in the bucket have been consumed, pop that priority from the priority Q
     if (bkt.empty())
         prioQ.pop();
     // Decrement the total number of msgs in this container
@@ -170,11 +138,12 @@ void msgQ<P>::enumerate(msg_t **first, msg_t **last) const
         return;
 
     msg_t **ptr = first;
-    for (int i=0; ptr != last && i <= msgbuckets.size(); i++)
+    for (typename bktmap_t::const_iterator bktitr = msgbuckets.begin();
+            ptr != last && bktitr != msgbuckets.end(); bktitr++)
     {
-        std::deque<const msg_t*>::const_iterator itr = msgbuckets[i].begin();
-        while (ptr != last && itr != msgbuckets[i].end())
-            *ptr++ = (msg_t*) *itr++;
+        bkt_t::const_iterator msgitr = bktitr->second.begin();
+        while (ptr != last && msgitr != bktitr->second.end())
+            *ptr++ = (msg_t*) *msgitr++;
     }
 }