| // Copyright (C) 2014 Ian Forbed |
| // Copyright (C) 2014-2017 Vicente J. Botet Escriba |
| // |
| // Distributed under the Boost Software License, Version 1.0. (See accompanying |
| // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
| // |
| |
| #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP |
| #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP |
| |
| #include <boost/thread/detail/config.hpp> |
| |
| #include <boost/thread/concurrent_queues/sync_priority_queue.hpp> |
| #include <boost/chrono/duration.hpp> |
| #include <boost/chrono/time_point.hpp> |
| #include <boost/chrono/system_clocks.hpp> |
| #include <boost/chrono/chrono_io.hpp> |
| |
| #include <algorithm> // std::min |
| |
| #include <boost/config/abi_prefix.hpp> |
| |
| namespace boost |
| { |
| namespace concurrent |
| { |
| namespace detail |
| { |
| // fixme: shouldn't the timepoint be configurable |
| template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> |
| struct scheduled_type |
| { |
| typedef T value_type; |
| typedef Clock clock; |
| typedef TimePoint time_point; |
| T data; |
| time_point time; |
| |
| BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type) |
| |
| scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {} |
| scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {} |
| |
| scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {} |
| scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) { |
| data = other.data; |
| time = other.time; |
| return *this; |
| } |
| |
| scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {} |
| scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) { |
| data = boost::move(other.data); |
| time = other.time; |
| return *this; |
| } |
| |
| bool operator <(const scheduled_type & other) const |
| { |
| return this->time > other.time; |
| } |
| }; //end struct |
| |
| template <class Duration> |
| chrono::time_point<chrono::steady_clock,Duration> |
| limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) |
| { |
| // Clock == chrono::steady_clock |
| return tp; |
| } |
| |
| template <class Clock, class Duration> |
| chrono::time_point<Clock,Duration> |
| limit_timepoint(chrono::time_point<Clock,Duration> const& tp) |
| { |
| // Clock != chrono::steady_clock |
| // The system time may jump while wait_until() is waiting. To compensate for this and time out near |
| // the correct time, we limit how long wait_until() can wait before going around the loop again. |
| const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS))); |
| return (std::min)(tp, tpmax); |
| } |
| |
| template <class Duration> |
| chrono::steady_clock::time_point |
| convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp) |
| { |
| // Clock == chrono::steady_clock |
| return chrono::time_point_cast<chrono::steady_clock::duration>(tp); |
| } |
| |
| template <class Clock, class Duration> |
| chrono::steady_clock::time_point |
| convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp) |
| { |
| // Clock != chrono::steady_clock |
| // The system time may jump while wait_until() is waiting. To compensate for this and time out near |
| // the correct time, we limit how long wait_until() can wait before going around the loop again. |
| const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now())); |
| const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)); |
| return chrono::steady_clock::now() + (std::min)(dura, duramax); |
| } |
| |
| } //end detail namespace |
| |
| template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point> |
| class sync_timed_queue |
| : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> > |
| { |
| typedef detail::scheduled_type<T, Clock, TimePoint> stype; |
| typedef sync_priority_queue<stype> super; |
| public: |
| typedef T value_type; |
| typedef Clock clock; |
| typedef typename clock::duration duration; |
| typedef typename clock::time_point time_point; |
| typedef typename super::underlying_queue_type underlying_queue_type; |
| typedef typename super::size_type size_type; |
| typedef typename super::op_status op_status; |
| |
| sync_timed_queue() : super() {}; |
| ~sync_timed_queue() {} |
| |
| using super::size; |
| using super::empty; |
| using super::full; |
| using super::close; |
| using super::closed; |
| |
| T pull(); |
| void pull(T& elem); |
| |
| template <class Duration> |
| queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem); |
| template <class Rep, class Period> |
| queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem); |
| |
| queue_op_status try_pull(T& elem); |
| queue_op_status wait_pull(T& elem); |
| queue_op_status nonblocking_pull(T& elem); |
| |
| template <class Duration> |
| void push(const T& elem, chrono::time_point<clock,Duration> const& tp); |
| template <class Rep, class Period> |
| void push(const T& elem, chrono::duration<Rep,Period> const& dura); |
| |
| template <class Duration> |
| void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp); |
| template <class Rep, class Period> |
| void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura); |
| |
| template <class Duration> |
| queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp); |
| template <class Rep, class Period> |
| queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura); |
| |
| template <class Duration> |
| queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp); |
| template <class Rep, class Period> |
| queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura); |
| |
| private: |
| inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const; |
| inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const; |
| |
| bool wait_to_pull(unique_lock<mutex>&); |
| queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp); |
| template <class Rep, class Period> |
| queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura); |
| |
| T pull(unique_lock<mutex>&); |
| T pull(lock_guard<mutex>&); |
| |
| void pull(unique_lock<mutex>&, T& elem); |
| void pull(lock_guard<mutex>&, T& elem); |
| |
| queue_op_status try_pull(unique_lock<mutex>&, T& elem); |
| queue_op_status try_pull(lock_guard<mutex>&, T& elem); |
| |
| queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem); |
| |
| sync_timed_queue(const sync_timed_queue&); |
| sync_timed_queue& operator=(const sync_timed_queue&); |
| sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue)); |
| sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue)); |
| }; //end class |
| |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Duration> |
| void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp) |
| { |
| super::push(stype(elem,tp)); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura) |
| { |
| push(elem, clock::now() + dura); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Duration> |
| void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp) |
| { |
| super::push(stype(boost::move(elem),tp)); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura) |
| { |
| push(boost::move(elem), clock::now() + dura); |
| } |
| |
| |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Duration> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp) |
| { |
| return super::try_push(stype(elem,tp)); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura) |
| { |
| return try_push(elem,clock::now() + dura); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Duration> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp) |
| { |
| return super::try_push(stype(boost::move(elem), tp)); |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura) |
| { |
| return try_push(boost::move(elem), clock::now() + dura); |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const |
| { |
| return ! super::empty(lk) && clock::now() >= super::data_.top().time; |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const |
| { |
| return ! super::empty(lk) && clock::now() >= super::data_.top().time; |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk) |
| { |
| for (;;) |
| { |
| if (not_empty_and_time_reached(lk)) return false; // success |
| if (super::closed(lk)) return true; // closed |
| |
| super::wait_until_not_empty_or_closed(lk); |
| |
| if (not_empty_and_time_reached(lk)) return false; // success |
| if (super::closed(lk)) return true; // closed |
| |
| const time_point tpmin(detail::limit_timepoint(super::data_.top().time)); |
| super::cond_.wait_until(lk, tpmin); |
| } |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp) |
| { |
| for (;;) |
| { |
| if (not_empty_and_time_reached(lk)) return queue_op_status::success; |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; |
| |
| super::wait_until_not_empty_or_closed_until(lk, tp); |
| |
| if (not_empty_and_time_reached(lk)) return queue_op_status::success; |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; |
| |
| const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time))); |
| super::cond_.wait_until(lk, tpmin); |
| } |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura) |
| { |
| const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura)); |
| for (;;) |
| { |
| if (not_empty_and_time_reached(lk)) return queue_op_status::success; |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; |
| |
| super::wait_until_not_empty_or_closed_until(lk, tp); |
| |
| if (not_empty_and_time_reached(lk)) return queue_op_status::success; |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready; |
| |
| const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time))); |
| super::cond_.wait_until(lk, tpmin); |
| } |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&) |
| { |
| #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES |
| return boost::move(super::data_.pull().data); |
| #else |
| return super::data_.pull().data; |
| #endif |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&) |
| { |
| #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES |
| return boost::move(super::data_.pull().data); |
| #else |
| return super::data_.pull().data; |
| #endif |
| } |
| template <class T, class Clock, class TimePoint> |
| T sync_timed_queue<T, Clock, TimePoint>::pull() |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const bool has_been_closed = wait_to_pull(lk); |
| if (has_been_closed) super::throw_if_closed(lk); |
| return pull(lk); |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem) |
| { |
| #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES |
| elem = boost::move(super::data_.pull().data); |
| #else |
| elem = super::data_.pull().data; |
| #endif |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem) |
| { |
| #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES |
| elem = boost::move(super::data_.pull().data); |
| #else |
| elem = super::data_.pull().data; |
| #endif |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const bool has_been_closed = wait_to_pull(lk); |
| if (has_been_closed) super::throw_if_closed(lk); |
| pull(lk, elem); |
| } |
| |
| ////////////////////// |
| template <class T, class Clock, class TimePoint> |
| template <class Duration> |
| queue_op_status |
| sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp)); |
| if (rc == queue_op_status::success) pull(lk, elem); |
| return rc; |
| } |
| |
| ////////////////////// |
| template <class T, class Clock, class TimePoint> |
| template <class Rep, class Period> |
| queue_op_status |
| sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| const queue_op_status rc = wait_to_pull_for(lk, dura); |
| if (rc == queue_op_status::success) pull(lk, elem); |
| return rc; |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem) |
| { |
| if (not_empty_and_time_reached(lk)) |
| { |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (super::empty(lk)) return queue_op_status::empty; |
| return queue_op_status::not_ready; |
| } |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem) |
| { |
| if (not_empty_and_time_reached(lk)) |
| { |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| if (super::closed(lk)) return queue_op_status::closed; |
| if (super::empty(lk)) return queue_op_status::empty; |
| return queue_op_status::not_ready; |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem) |
| { |
| lock_guard<mutex> lk(super::mtx_); |
| return try_pull(lk, elem); |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem) |
| { |
| const bool has_been_closed = wait_to_pull(lk); |
| if (has_been_closed) return queue_op_status::closed; |
| pull(lk, elem); |
| return queue_op_status::success; |
| } |
| |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_); |
| return wait_pull(lk, elem); |
| } |
| |
| /////////////////////////// |
| template <class T, class Clock, class TimePoint> |
| queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem) |
| { |
| unique_lock<mutex> lk(super::mtx_, try_to_lock); |
| if (! lk.owns_lock()) return queue_op_status::busy; |
| return try_pull(lk, elem); |
| } |
| |
| } //end concurrent namespace |
| |
| using concurrent::sync_timed_queue; |
| |
| } //end boost namespace |
| #include <boost/config/abi_suffix.hpp> |
| |
| #endif |