| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. |
| |
| #pragma once |
| |
| #if !defined(RXCPP_RX_OBSERVER_HPP) |
| #define RXCPP_RX_OBSERVER_HPP |
| |
| #include "rx-includes.hpp" |
| |
| namespace rxcpp { |
| |
| |
| template<class T> |
| struct observer_base |
| { |
| typedef T value_type; |
| typedef tag_observer observer_tag; |
| }; |
| |
| namespace detail { |
| template<class T> |
| struct OnNextEmpty |
| { |
| void operator()(const T&) const {} |
| }; |
| struct OnErrorEmpty |
| { |
| void operator()(rxu::error_ptr) const { |
| // error implicitly ignored, abort |
| std::terminate(); |
| } |
| }; |
| struct OnErrorIgnore |
| { |
| void operator()(rxu::error_ptr) const { |
| } |
| }; |
| struct OnCompletedEmpty |
| { |
| void operator()() const {} |
| }; |
| |
| template<class T, class State, class OnNext> |
| struct OnNextForward |
| { |
| using state_t = rxu::decay_t<State>; |
| using onnext_t = rxu::decay_t<OnNext>; |
| OnNextForward() : onnext() {} |
| explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {} |
| onnext_t onnext; |
| void operator()(state_t& s, T& t) const { |
| onnext(s, t); |
| } |
| void operator()(state_t& s, T&& t) const { |
| onnext(s, t); |
| } |
| }; |
| template<class T, class State> |
| struct OnNextForward<T, State, void> |
| { |
| using state_t = rxu::decay_t<State>; |
| OnNextForward() {} |
| void operator()(state_t& s, T& t) const { |
| s.on_next(t); |
| } |
| void operator()(state_t& s, T&& t) const { |
| s.on_next(t); |
| } |
| }; |
| |
| template<class State, class OnError> |
| struct OnErrorForward |
| { |
| using state_t = rxu::decay_t<State>; |
| using onerror_t = rxu::decay_t<OnError>; |
| OnErrorForward() : onerror() {} |
| explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {} |
| onerror_t onerror; |
| void operator()(state_t& s, rxu::error_ptr ep) const { |
| onerror(s, ep); |
| } |
| }; |
| template<class State> |
| struct OnErrorForward<State, void> |
| { |
| using state_t = rxu::decay_t<State>; |
| OnErrorForward() {} |
| void operator()(state_t& s, rxu::error_ptr ep) const { |
| s.on_error(ep); |
| } |
| }; |
| |
| template<class State, class OnCompleted> |
| struct OnCompletedForward |
| { |
| using state_t = rxu::decay_t<State>; |
| using oncompleted_t = rxu::decay_t<OnCompleted>; |
| OnCompletedForward() : oncompleted() {} |
| explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {} |
| oncompleted_t oncompleted; |
| void operator()(state_t& s) const { |
| oncompleted(s); |
| } |
| }; |
| template<class State> |
| struct OnCompletedForward<State, void> |
| { |
| OnCompletedForward() {} |
| void operator()(State& s) const { |
| s.on_completed(); |
| } |
| }; |
| |
| template<class T, class F> |
| struct is_on_next_of |
| { |
| struct not_void {}; |
| template<class CT, class CF> |
| static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr)); |
| template<class CT, class CF> |
| static not_void check(...); |
| |
| typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result; |
| static const bool value = std::is_same<detail_result, void>::value; |
| }; |
| |
| template<class F> |
| struct is_on_error |
| { |
| struct not_void {}; |
| template<class CF> |
| static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr)); |
| template<class CF> |
| static not_void check(...); |
| |
| static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; |
| }; |
| |
| template<class State, class F> |
| struct is_on_error_for |
| { |
| struct not_void {}; |
| template<class CF> |
| static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr)); |
| template<class CF> |
| static not_void check(...); |
| |
| static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; |
| }; |
| |
| template<class F> |
| struct is_on_completed |
| { |
| struct not_void {}; |
| template<class CF> |
| static auto check(int) -> decltype((*(CF*)nullptr)()); |
| template<class CF> |
| static not_void check(...); |
| |
| static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value; |
| }; |
| |
| } |
| |
| |
| /*! |
| \brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function. |
| |
| \tparam T - the type of value in the stream |
| \tparam State - the type of the stored state |
| \tparam OnNext - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called. |
| \tparam OnError - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called. |
| \tparam OnCompleted - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called. |
| |
| \ingroup group-core |
| |
| */ |
| template<class T, class State, class OnNext, class OnError, class OnCompleted> |
| class observer : public observer_base<T> |
| { |
| public: |
| using this_type = observer<T, State, OnNext, OnError, OnCompleted>; |
| using state_t = rxu::decay_t<State>; |
| using on_next_t = typename std::conditional< |
| !std::is_same<void, OnNext>::value, |
| rxu::decay_t<OnNext>, |
| detail::OnNextForward<T, State, OnNext>>::type; |
| using on_error_t = typename std::conditional< |
| !std::is_same<void, OnError>::value, |
| rxu::decay_t<OnError>, |
| detail::OnErrorForward<State, OnError>>::type; |
| using on_completed_t = typename std::conditional< |
| !std::is_same<void, OnCompleted>::value, |
| rxu::decay_t<OnCompleted>, |
| detail::OnCompletedForward<State, OnCompleted>>::type; |
| |
| private: |
| mutable state_t state; |
| on_next_t onnext; |
| on_error_t onerror; |
| on_completed_t oncompleted; |
| |
| public: |
| |
| explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) |
| : state(std::move(s)) |
| , onnext(std::move(n)) |
| , onerror(std::move(e)) |
| , oncompleted(std::move(c)) |
| { |
| } |
| explicit observer(state_t s, on_next_t n, on_completed_t c) |
| : state(std::move(s)) |
| , onnext(std::move(n)) |
| , onerror(on_error_t()) |
| , oncompleted(std::move(c)) |
| { |
| } |
| observer(const this_type& o) |
| : state(o.state) |
| , onnext(o.onnext) |
| , onerror(o.onerror) |
| , oncompleted(o.oncompleted) |
| { |
| } |
| observer(this_type&& o) |
| : state(std::move(o.state)) |
| , onnext(std::move(o.onnext)) |
| , onerror(std::move(o.onerror)) |
| , oncompleted(std::move(o.oncompleted)) |
| { |
| } |
| this_type& operator=(this_type o) { |
| state = std::move(o.state); |
| onnext = std::move(o.onnext); |
| onerror = std::move(o.onerror); |
| oncompleted = std::move(o.oncompleted); |
| return *this; |
| } |
| |
| void on_next(T& t) const { |
| onnext(state, t); |
| } |
| void on_next(T&& t) const { |
| onnext(state, std::move(t)); |
| } |
| void on_error(rxu::error_ptr e) const { |
| onerror(state, e); |
| } |
| void on_completed() const { |
| oncompleted(state); |
| } |
| observer<T> as_dynamic() const { |
| return observer<T>(*this); |
| } |
| }; |
| |
| /*! |
| \brief consumes values from an observable using default empty method implementations with optional overrides of each function. |
| |
| \tparam T - the type of value in the stream |
| \tparam OnNext - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used. |
| \tparam OnError - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used. |
| \tparam OnCompleted - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used. |
| |
| \ingroup group-core |
| |
| */ |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T> |
| { |
| public: |
| using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>; |
| using on_next_t = typename std::conditional< |
| !std::is_same<void, OnNext>::value, |
| rxu::decay_t<OnNext>, |
| detail::OnNextEmpty<T>>::type; |
| using on_error_t = typename std::conditional< |
| !std::is_same<void, OnError>::value, |
| rxu::decay_t<OnError>, |
| detail::OnErrorEmpty>::type; |
| using on_completed_t = typename std::conditional< |
| !std::is_same<void, OnCompleted>::value, |
| rxu::decay_t<OnCompleted>, |
| detail::OnCompletedEmpty>::type; |
| |
| private: |
| on_next_t onnext; |
| on_error_t onerror; |
| on_completed_t oncompleted; |
| |
| public: |
| static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);"); |
| static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);"); |
| static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();"); |
| |
| observer() |
| : onnext(on_next_t()) |
| , onerror(on_error_t()) |
| , oncompleted(on_completed_t()) |
| { |
| } |
| |
| explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t()) |
| : onnext(std::move(n)) |
| , onerror(std::move(e)) |
| , oncompleted(std::move(c)) |
| { |
| } |
| observer(const this_type& o) |
| : onnext(o.onnext) |
| , onerror(o.onerror) |
| , oncompleted(o.oncompleted) |
| { |
| } |
| observer(this_type&& o) |
| : onnext(std::move(o.onnext)) |
| , onerror(std::move(o.onerror)) |
| , oncompleted(std::move(o.oncompleted)) |
| { |
| } |
| this_type& operator=(this_type o) { |
| onnext = std::move(o.onnext); |
| onerror = std::move(o.onerror); |
| oncompleted = std::move(o.oncompleted); |
| return *this; |
| } |
| |
| void on_next(T& t) const { |
| onnext(t); |
| } |
| void on_next(T&& t) const { |
| onnext(std::move(t)); |
| } |
| void on_error(rxu::error_ptr e) const { |
| onerror(e); |
| } |
| void on_completed() const { |
| oncompleted(); |
| } |
| observer<T> as_dynamic() const { |
| return observer<T>(*this); |
| } |
| }; |
| |
| namespace detail |
| { |
| |
| template<class T> |
| struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>> |
| { |
| virtual ~virtual_observer() {} |
| virtual void on_next(T&) const {}; |
| virtual void on_next(T&&) const {}; |
| virtual void on_error(rxu::error_ptr) const {}; |
| virtual void on_completed() const {}; |
| }; |
| |
| template<class T, class Observer> |
| struct specific_observer : public virtual_observer<T> |
| { |
| explicit specific_observer(Observer o) |
| : destination(std::move(o)) |
| { |
| } |
| |
| Observer destination; |
| virtual void on_next(T& t) const { |
| destination.on_next(t); |
| } |
| virtual void on_next(T&& t) const { |
| destination.on_next(std::move(t)); |
| } |
| virtual void on_error(rxu::error_ptr e) const { |
| destination.on_error(e); |
| } |
| virtual void on_completed() const { |
| destination.on_completed(); |
| } |
| }; |
| |
| } |
| |
| /*! |
| \brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods) |
| |
| \tparam T - the type of value in the stream |
| |
| \ingroup group-core |
| |
| */ |
| template<class T> |
| class observer<T, void, void, void, void> : public observer_base<T> |
| { |
| public: |
| typedef tag_dynamic_observer dynamic_observer_tag; |
| |
| private: |
| using this_type = observer<T, void, void, void, void>; |
| using base_type = observer_base<T>; |
| using virtual_observer = detail::virtual_observer<T>; |
| |
| std::shared_ptr<virtual_observer> destination; |
| |
| template<class Observer> |
| static auto make_destination(Observer o) |
| -> std::shared_ptr<virtual_observer> { |
| return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o)); |
| } |
| |
| public: |
| observer() |
| { |
| } |
| observer(const this_type& o) |
| : destination(o.destination) |
| { |
| } |
| observer(this_type&& o) |
| : destination(std::move(o.destination)) |
| { |
| } |
| |
| template<class Observer> |
| explicit observer(Observer o) |
| : destination(make_destination(std::move(o))) |
| { |
| } |
| |
| this_type& operator=(this_type o) { |
| destination = std::move(o.destination); |
| return *this; |
| } |
| |
| // perfect forwarding delays the copy of the value. |
| template<class V> |
| void on_next(V&& v) const { |
| if (destination) { |
| destination->on_next(std::forward<V>(v)); |
| } |
| } |
| void on_error(rxu::error_ptr e) const { |
| if (destination) { |
| destination->on_error(e); |
| } |
| } |
| void on_completed() const { |
| if (destination) { |
| destination->on_completed(); |
| } |
| } |
| |
| observer<T> as_dynamic() const { |
| return *this; |
| } |
| }; |
| |
| template<class T, class DefaultOnError = detail::OnErrorEmpty> |
| auto make_observer() |
| -> observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError> { |
| return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError>(); |
| } |
| |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted> |
| auto make_observer(observer<U, State, OnNext, OnError, OnCompleted> o) |
| -> observer<T, State, OnNext, OnError, OnCompleted> { |
| return observer<T, State, OnNext, OnError, OnCompleted>(std::move(o)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer> |
| auto make_observer(Observer ob) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !detail::is_on_error<Observer>::value && |
| is_observer<Observer>::value, |
| Observer>::type { |
| return std::move(ob); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer> |
| auto make_observer(Observer ob) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value && |
| !detail::is_on_error<Observer>::value && |
| !is_observer<Observer>::value, |
| observer<T, Observer>>::type { |
| return observer<T, Observer>(std::move(ob)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext> |
| auto make_observer(OnNext on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>>::type { |
| return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>( |
| std::move(on)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError> |
| auto make_observer(OnError oe) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, OnError>::value && |
| detail::is_on_error<OnError>::value, |
| observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>>::type { |
| return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>( |
| detail::OnNextEmpty<T>(), std::move(oe)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError> |
| auto make_observer(OnNext on, OnError oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError>>::type { |
| return observer<T, detail::stateless_observer_tag, OnNext, OnError>( |
| std::move(on), std::move(oe)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted> |
| auto make_observer(OnNext on, OnCompleted oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>>::type { |
| return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>( |
| std::move(on), DefaultOnError(), std::move(oc)); |
| } |
| template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted> |
| auto make_observer(OnNext on, OnError oe, OnCompleted oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>::type { |
| return observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>( |
| std::move(on), std::move(oe), std::move(oc)); |
| } |
| |
| |
| template<class T, class State, class OnNext> |
| auto make_observer(State os, OnNext on) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, State>::value && |
| !detail::is_on_error<State>::value, |
| observer<T, State, OnNext>>::type { |
| return observer<T, State, OnNext>( |
| std::move(os), std::move(on)); |
| } |
| template<class T, class State, class OnError> |
| auto make_observer(State os, OnError oe) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, State>::value && |
| !detail::is_on_error<State>::value && |
| detail::is_on_error_for<State, OnError>::value, |
| observer<T, State, detail::OnNextEmpty<T>, OnError>>::type { |
| return observer<T, State, detail::OnNextEmpty<T>, OnError>( |
| std::move(os), detail::OnNextEmpty<T>(), std::move(oe)); |
| } |
| template<class T, class State, class OnNext, class OnError> |
| auto make_observer(State os, OnNext on, OnError oe) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, State>::value && |
| !detail::is_on_error<State>::value && |
| detail::is_on_error_for<State, OnError>::value, |
| observer<T, State, OnNext, OnError>>::type { |
| return observer<T, State, OnNext, OnError>( |
| std::move(os), std::move(on), std::move(oe)); |
| } |
| template<class T, class State, class OnNext, class OnCompleted> |
| auto make_observer(State os, OnNext on, OnCompleted oc) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, State>::value && |
| !detail::is_on_error<State>::value, |
| observer<T, State, OnNext, void, OnCompleted>>::type { |
| return observer<T, State, OnNext, void, OnCompleted>( |
| std::move(os), std::move(on), std::move(oc)); |
| } |
| template<class T, class State, class OnNext, class OnError, class OnCompleted> |
| auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, State>::value && |
| !detail::is_on_error<State>::value && |
| detail::is_on_error_for<State, OnError>::value, |
| observer<T, State, OnNext, OnError, OnCompleted>>::type { |
| return observer<T, State, OnNext, OnError, OnCompleted>( |
| std::move(os), std::move(on), std::move(oe), std::move(oc)); |
| } |
| |
| template<class T, class Observer> |
| auto make_observer_dynamic(Observer o) |
| -> typename std::enable_if< |
| !detail::is_on_next_of<T, Observer>::value, |
| observer<T>>::type { |
| return observer<T>(std::move(o)); |
| } |
| template<class T, class OnNext> |
| auto make_observer_dynamic(OnNext&& on) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value, |
| observer<T>>::type { |
| return observer<T>( |
| make_observer<T>(std::forward<OnNext>(on))); |
| } |
| template<class T, class OnNext, class OnError> |
| auto make_observer_dynamic(OnNext&& on, OnError&& oe) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value, |
| observer<T>>::type { |
| return observer<T>( |
| make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe))); |
| } |
| template<class T, class OnNext, class OnCompleted> |
| auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| observer<T>>::type { |
| return observer<T>( |
| make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc))); |
| } |
| template<class T, class OnNext, class OnError, class OnCompleted> |
| auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc) |
| -> typename std::enable_if< |
| detail::is_on_next_of<T, OnNext>::value && |
| detail::is_on_error<OnError>::value && |
| detail::is_on_completed<OnCompleted>::value, |
| observer<T>>::type { |
| return observer<T>( |
| make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc))); |
| } |
| |
| namespace detail { |
| |
| template<class F> |
| struct maybe_from_result |
| { |
| typedef decltype((*(F*)nullptr)()) decl_result_type; |
| typedef rxu::decay_t<decl_result_type> result_type; |
| typedef rxu::maybe<result_type> type; |
| }; |
| |
| } |
| |
| template<class F, class OnError> |
| auto on_exception(const F& f, const OnError& c) |
| -> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type { |
| typename detail::maybe_from_result<F>::type r; |
| RXCPP_TRY { |
| r.reset(f()); |
| } RXCPP_CATCH(...) { |
| c(rxu::current_exception()); |
| } |
| return r; |
| } |
| |
| template<class F, class Subscriber> |
| auto on_exception(const F& f, const Subscriber& s) |
| -> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type { |
| typename detail::maybe_from_result<F>::type r; |
| RXCPP_TRY { |
| r.reset(f()); |
| } RXCPP_CATCH(...) { |
| s.on_error(rxu::current_exception()); |
| } |
| return r; |
| } |
| |
| } |
| |
| #endif |