| // Copyright (C) 2014 Ian Forbed |
| // Copyright (C) 2014-2017 Vicente J. Botet Escriba |
| // |
| // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| // |
| |
| #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE |
| #define BOOST_THREAD_SYNC_PRIORITY_QUEUE |
| |
| #include <boost/thread/detail/config.hpp> |
| |
| #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp> |
| #include <boost/thread/concurrent_queues/queue_op_status.hpp> |
| #include <boost/thread/condition_variable.hpp> |
| #include <boost/thread/csbl/vector.hpp> |
| #include <boost/thread/detail/move.hpp> |
| #include <boost/thread/mutex.hpp> |
| |
| #include <boost/atomic.hpp> |
| #include <boost/chrono/duration.hpp> |
| #include <boost/chrono/time_point.hpp> |
| |
| #include <exception> |
| #include <queue> |
| #include <utility> |
| |
| #include <boost/config/abi_prefix.hpp> |
| |
| namespace boost |
| { |
| namespace detail { |
| |
| template < |
| class Type, |
| class Container = csbl::vector<Type>, |
| class Compare = std::less<Type> |
| > |
| class priority_queue |
| { |
| private: |
| Container _elements; |
| Compare _compare; |
| public: |
| typedef Type value_type; |
| typedef typename Container::size_type size_type; |
| |
| explicit priority_queue(const Compare& compare = Compare()) |
| : _elements(), _compare(compare) |
| { } |
| |
| size_type size() const |
| { |
| return _elements.size(); |
| } |
| |
| bool empty() const |
| { |
| return _elements.empty(); |
| } |
| |
| void push(Type const& element) |
| { |
| _elements.push_back(element); |
| std::push_heap(_elements.begin(), _elements.end(), _compare); |
| } |
| void push(BOOST_RV_REF(Type) element) |
| { |
| _elements.push_back(boost::move(element)); |
| std::push_heap(_elements.begin(), _elements.end(), _compare); |
| } |
| |
| void pop() |
| { |
| std::pop_heap(_elements.begin(), _elements.end(), _compare); |
| _elements.pop_back(); |
| } |
| Type pull() |
| { |
| Type result = boost::move(_elements.front()); |
| pop(); |
| return boost::move(result); |
| } |
| |
| Type const& top() const |
| { |
| return _elements.front(); |
| } |
| }; |
| } |
| |
| namespace concurrent |
| { |
| template <class ValueType, |
| class Container = csbl::vector<ValueType>, |
| class Compare = std::less<typename Container::value_type> > |
| class sync_priority_queue |
| : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > |
| { |
| typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super; |
| |
| public: |
| typedef ValueType value_type; |
| //typedef typename super::value_type value_type; // fixme |
| typedef typename super::underlying_queue_type underlying_queue_type; |
| typedef typename super::size_type size_type; |
| typedef typename super::op_status op_status; |
| |
| typedef chrono::steady_clock clock; |
| protected: |
| |
| public: |
| sync_priority_queue() {} |
| |
| ~sync_priority_queue() |
| { |
| if(!super::closed()) |
| { |
| super::close(); |
| } |
| } |
| |
| void push(const ValueType& elem); |
| void push(BOOST_THREAD_RV_REF(ValueType) elem); |
| |
| queue_op_status try_push(const ValueType& elem); |
| queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem); |
| |
| ValueType pull(); |
| |
| void pull(ValueType&); |
| |
| template <class WClock, class Duration> |
| queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&); |
| template <class Rep, class Period> |
| queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&); |
| |
| queue_op_status try_pull(ValueType& elem); |
| queue_op_status wait_pull(ValueType& elem); |
| queue_op_status nonblocking_pull(ValueType&); |
| |
| private: |
| void push(unique_lock<mutex>&, const ValueType& elem); |
| void push(lock_guard<mutex>&, const ValueType& elem); |
| void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); |
| void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); |
| |
| queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem); |
| queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem); |
| |
| ValueType pull(unique_lock<mutex>&); |
| ValueType pull(lock_guard<mutex>&); |
| |
| void pull(unique_lock<mutex>&, ValueType&); |
| void pull(lock_guard<mutex>&, ValueType&); |
| |
| queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem); |
| queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem); |
| |
| queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem); |
| |
| queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&); |
| |
| sync_priority_queue(const sync_priority_queue&); |
| sync_priority_queue& operator= (const sync_priority_queue&); |
| sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue)); |
| sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue)); |
| }; //end class |
| |
| |
| ////////////////////// |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem) |
| { |
| super::throw_if_closed(lk); |
| super::data_.push(elem); |
| super::notify_elem_added(lk); |
| } |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem) |
| { |
| super::throw_if_closed(lk); |
| super::data_.push(elem); |
| super::notify_elem_added(lk); |
| } |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(const T& elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| push(lk, elem); |
| } |
| |
| ////////////////////// |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) |
| { |
| super::throw_if_closed(lk); |
| super::data_.push(boost::move(elem)); |
| super::notify_elem_added(lk); |
| } |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem) |
| { |
| super::throw_if_closed(lk); |
| super::data_.push(boost::move(elem)); |
| super::notify_elem_added(lk); |
| } |
| template <class T, class Container,class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| push(lk, boost::move(elem)); |
| } |
| |
| ////////////////////// |
| template <class T, class Container,class Cmp> |
| queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| if (super::closed(lk)) return queue_op_status::closed; |
| push(lk, elem); |
| return queue_op_status::success; |
| } |
| |
| ////////////////////// |
| template <class T, class Container,class Cmp> |
| queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| if (super::closed(lk)) return queue_op_status::closed; |
| push(lk, boost::move(elem)); |
| |
| return queue_op_status::success; |
| } |
| |
| ////////////////////// |
| template <class T,class Container, class Cmp> |
| T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&) |
| { |
| return super::data_.pull(); |
| } |
| template <class T,class Container, class Cmp> |
| T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&) |
| { |
| return super::data_.pull(); |
| } |
| |
| template <class T,class Container, class Cmp> |
| T sync_priority_queue<T,Container,Cmp>::pull() |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); |
| if (has_been_closed) super::throw_if_closed(lk); |
| return pull(lk); |
| } |
| |
| ////////////////////// |
| template <class T,class Container, class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem) |
| { |
| elem = super::data_.pull(); |
| } |
| template <class T,class Container, class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem) |
| { |
| elem = super::data_.pull(); |
| } |
| |
| template <class T,class Container, class Cmp> |
| void sync_priority_queue<T,Container,Cmp>::pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); |
| if (has_been_closed) super::throw_if_closed(lk); |
| pull(lk, elem); |
| } |
| |
| ////////////////////// |
| template <class T, class Cont,class Cmp> |
| template <class WClock, class Duration> |
| queue_op_status |
| sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp); |
| if (rc == queue_op_status::success) pull(lk, elem); |
| return rc; |
| } |
| |
| ////////////////////// |
| template <class T, class Cont,class Cmp> |
| template <class Rep, class Period> |
| queue_op_status |
| sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem) |
| { |
| return pull_until(chrono::steady_clock::now() + dura, elem); |
| } |
| |
| ////////////////////// |
| template <class T, class Container,class Cmp> |
| queue_op_status |
| sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem) |
| { |
| if (super::empty(lk)) |
| { |
| if (super::closed(lk)) return queue_op_status::closed; |
| return queue_op_status::empty; |
| } |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| |
| template <class T, class Container,class Cmp> |
| queue_op_status |
| sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem) |
| { |
| if (super::empty(lk)) |
| { |
| if (super::closed(lk)) return queue_op_status::closed; |
| return queue_op_status::empty; |
| } |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| |
| template <class T, class Container,class Cmp> |
| queue_op_status |
| sync_priority_queue<T,Container,Cmp>::try_pull(T& elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| return try_pull(lk, elem); |
| } |
| |
| ////////////////////// |
| template <class T,class Container, class Cmp> |
| queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem) |
| { |
| const bool has_been_closed = super::wait_until_not_empty_or_closed(lk); |
| if (has_been_closed) return queue_op_status::closed; |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| |
| template <class T,class Container, class Cmp> |
| queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| return wait_pull(lk, elem); |
| } |
| |
| ////////////////////// |
| template <class T,class Container, class Cmp> |
| queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_, try_to_lock); |
| if (!lk.owns_lock()) return queue_op_status::busy; |
| return try_pull(lk, elem); |
| } |
| |
| |
| |
| } //end concurrent namespace |
| |
| using concurrent::sync_priority_queue; |
| |
| } //end boost namespace |
| #include <boost/config/abi_suffix.hpp> |
| |
| #endif |