blob: 92321f1174f3820feb8a06a4f281f17898e9e9c8 [file] [log] [blame]
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
#if !defined(RXCPP_RX_OBSERVER_HPP)
#define RXCPP_RX_OBSERVER_HPP
#include "rx-includes.hpp"
namespace rxcpp {
template<class T>
struct observer_base
{
typedef T value_type;
typedef tag_observer observer_tag;
};
namespace detail {
template<class T>
struct OnNextEmpty
{
void operator()(const T&) const {}
};
struct OnErrorEmpty
{
void operator()(rxu::error_ptr) const {
// error implicitly ignored, abort
std::terminate();
}
};
struct OnErrorIgnore
{
void operator()(rxu::error_ptr) const {
}
};
struct OnCompletedEmpty
{
void operator()() const {}
};
template<class T, class State, class OnNext>
struct OnNextForward
{
using state_t = rxu::decay_t<State>;
using onnext_t = rxu::decay_t<OnNext>;
OnNextForward() : onnext() {}
explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
onnext_t onnext;
void operator()(state_t& s, T& t) const {
onnext(s, t);
}
void operator()(state_t& s, T&& t) const {
onnext(s, t);
}
};
template<class T, class State>
struct OnNextForward<T, State, void>
{
using state_t = rxu::decay_t<State>;
OnNextForward() {}
void operator()(state_t& s, T& t) const {
s.on_next(t);
}
void operator()(state_t& s, T&& t) const {
s.on_next(t);
}
};
template<class State, class OnError>
struct OnErrorForward
{
using state_t = rxu::decay_t<State>;
using onerror_t = rxu::decay_t<OnError>;
OnErrorForward() : onerror() {}
explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
onerror_t onerror;
void operator()(state_t& s, rxu::error_ptr ep) const {
onerror(s, ep);
}
};
template<class State>
struct OnErrorForward<State, void>
{
using state_t = rxu::decay_t<State>;
OnErrorForward() {}
void operator()(state_t& s, rxu::error_ptr ep) const {
s.on_error(ep);
}
};
template<class State, class OnCompleted>
struct OnCompletedForward
{
using state_t = rxu::decay_t<State>;
using oncompleted_t = rxu::decay_t<OnCompleted>;
OnCompletedForward() : oncompleted() {}
explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
oncompleted_t oncompleted;
void operator()(state_t& s) const {
oncompleted(s);
}
};
template<class State>
struct OnCompletedForward<State, void>
{
OnCompletedForward() {}
void operator()(State& s) const {
s.on_completed();
}
};
template<class T, class F>
struct is_on_next_of
{
struct not_void {};
template<class CT, class CF>
static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
template<class CT, class CF>
static not_void check(...);
typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
static const bool value = std::is_same<detail_result, void>::value;
};
template<class F>
struct is_on_error
{
struct not_void {};
template<class CF>
static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
template<class CF>
static not_void check(...);
static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};
template<class State, class F>
struct is_on_error_for
{
struct not_void {};
template<class CF>
static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
template<class CF>
static not_void check(...);
static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};
template<class F>
struct is_on_completed
{
struct not_void {};
template<class CF>
static auto check(int) -> decltype((*(CF*)nullptr)());
template<class CF>
static not_void check(...);
static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};
}
/*!
\brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function.
\tparam T - the type of value in the stream
\tparam State - the type of the stored state
\tparam OnNext - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called.
\tparam OnError - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
\tparam OnCompleted - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called.
\ingroup group-core
*/
template<class T, class State, class OnNext, class OnError, class OnCompleted>
class observer : public observer_base<T>
{
public:
using this_type = observer<T, State, OnNext, OnError, OnCompleted>;
using state_t = rxu::decay_t<State>;
using on_next_t = typename std::conditional<
!std::is_same<void, OnNext>::value,
rxu::decay_t<OnNext>,
detail::OnNextForward<T, State, OnNext>>::type;
using on_error_t = typename std::conditional<
!std::is_same<void, OnError>::value,
rxu::decay_t<OnError>,
detail::OnErrorForward<State, OnError>>::type;
using on_completed_t = typename std::conditional<
!std::is_same<void, OnCompleted>::value,
rxu::decay_t<OnCompleted>,
detail::OnCompletedForward<State, OnCompleted>>::type;
private:
mutable state_t state;
on_next_t onnext;
on_error_t onerror;
on_completed_t oncompleted;
public:
explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
: state(std::move(s))
, onnext(std::move(n))
, onerror(std::move(e))
, oncompleted(std::move(c))
{
}
explicit observer(state_t s, on_next_t n, on_completed_t c)
: state(std::move(s))
, onnext(std::move(n))
, onerror(on_error_t())
, oncompleted(std::move(c))
{
}
observer(const this_type& o)
: state(o.state)
, onnext(o.onnext)
, onerror(o.onerror)
, oncompleted(o.oncompleted)
{
}
observer(this_type&& o)
: state(std::move(o.state))
, onnext(std::move(o.onnext))
, onerror(std::move(o.onerror))
, oncompleted(std::move(o.oncompleted))
{
}
this_type& operator=(this_type o) {
state = std::move(o.state);
onnext = std::move(o.onnext);
onerror = std::move(o.onerror);
oncompleted = std::move(o.oncompleted);
return *this;
}
void on_next(T& t) const {
onnext(state, t);
}
void on_next(T&& t) const {
onnext(state, std::move(t));
}
void on_error(rxu::error_ptr e) const {
onerror(state, e);
}
void on_completed() const {
oncompleted(state);
}
observer<T> as_dynamic() const {
return observer<T>(*this);
}
};
/*!
\brief consumes values from an observable using default empty method implementations with optional overrides of each function.
\tparam T - the type of value in the stream
\tparam OnNext - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used.
\tparam OnError - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
\tparam OnCompleted - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used.
\ingroup group-core
*/
template<class T, class OnNext, class OnError, class OnCompleted>
class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
{
public:
using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>;
using on_next_t = typename std::conditional<
!std::is_same<void, OnNext>::value,
rxu::decay_t<OnNext>,
detail::OnNextEmpty<T>>::type;
using on_error_t = typename std::conditional<
!std::is_same<void, OnError>::value,
rxu::decay_t<OnError>,
detail::OnErrorEmpty>::type;
using on_completed_t = typename std::conditional<
!std::is_same<void, OnCompleted>::value,
rxu::decay_t<OnCompleted>,
detail::OnCompletedEmpty>::type;
private:
on_next_t onnext;
on_error_t onerror;
on_completed_t oncompleted;
public:
static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
observer()
: onnext(on_next_t())
, onerror(on_error_t())
, oncompleted(on_completed_t())
{
}
explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
: onnext(std::move(n))
, onerror(std::move(e))
, oncompleted(std::move(c))
{
}
observer(const this_type& o)
: onnext(o.onnext)
, onerror(o.onerror)
, oncompleted(o.oncompleted)
{
}
observer(this_type&& o)
: onnext(std::move(o.onnext))
, onerror(std::move(o.onerror))
, oncompleted(std::move(o.oncompleted))
{
}
this_type& operator=(this_type o) {
onnext = std::move(o.onnext);
onerror = std::move(o.onerror);
oncompleted = std::move(o.oncompleted);
return *this;
}
void on_next(T& t) const {
onnext(t);
}
void on_next(T&& t) const {
onnext(std::move(t));
}
void on_error(rxu::error_ptr e) const {
onerror(e);
}
void on_completed() const {
oncompleted();
}
observer<T> as_dynamic() const {
return observer<T>(*this);
}
};
namespace detail
{
template<class T>
struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
{
virtual ~virtual_observer() {}
virtual void on_next(T&) const {};
virtual void on_next(T&&) const {};
virtual void on_error(rxu::error_ptr) const {};
virtual void on_completed() const {};
};
template<class T, class Observer>
struct specific_observer : public virtual_observer<T>
{
explicit specific_observer(Observer o)
: destination(std::move(o))
{
}
Observer destination;
virtual void on_next(T& t) const {
destination.on_next(t);
}
virtual void on_next(T&& t) const {
destination.on_next(std::move(t));
}
virtual void on_error(rxu::error_ptr e) const {
destination.on_error(e);
}
virtual void on_completed() const {
destination.on_completed();
}
};
}
/*!
\brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods)
\tparam T - the type of value in the stream
\ingroup group-core
*/
template<class T>
class observer<T, void, void, void, void> : public observer_base<T>
{
public:
typedef tag_dynamic_observer dynamic_observer_tag;
private:
using this_type = observer<T, void, void, void, void>;
using base_type = observer_base<T>;
using virtual_observer = detail::virtual_observer<T>;
std::shared_ptr<virtual_observer> destination;
template<class Observer>
static auto make_destination(Observer o)
-> std::shared_ptr<virtual_observer> {
return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
}
public:
observer()
{
}
observer(const this_type& o)
: destination(o.destination)
{
}
observer(this_type&& o)
: destination(std::move(o.destination))
{
}
template<class Observer>
explicit observer(Observer o)
: destination(make_destination(std::move(o)))
{
}
this_type& operator=(this_type o) {
destination = std::move(o.destination);
return *this;
}
// perfect forwarding delays the copy of the value.
template<class V>
void on_next(V&& v) const {
if (destination) {
destination->on_next(std::forward<V>(v));
}
}
void on_error(rxu::error_ptr e) const {
if (destination) {
destination->on_error(e);
}
}
void on_completed() const {
if (destination) {
destination->on_completed();
}
}
observer<T> as_dynamic() const {
return *this;
}
};
template<class T, class DefaultOnError = detail::OnErrorEmpty>
auto make_observer()
-> observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError> {
return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError>();
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
auto make_observer(observer<U, State, OnNext, OnError, OnCompleted> o)
-> observer<T, State, OnNext, OnError, OnCompleted> {
return observer<T, State, OnNext, OnError, OnCompleted>(std::move(o));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
auto make_observer(Observer ob)
-> typename std::enable_if<
!detail::is_on_next_of<T, Observer>::value &&
!detail::is_on_error<Observer>::value &&
is_observer<Observer>::value,
Observer>::type {
return std::move(ob);
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
auto make_observer(Observer ob)
-> typename std::enable_if<
!detail::is_on_next_of<T, Observer>::value &&
!detail::is_on_error<Observer>::value &&
!is_observer<Observer>::value,
observer<T, Observer>>::type {
return observer<T, Observer>(std::move(ob));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
auto make_observer(OnNext on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>>::type {
return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>(
std::move(on));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
auto make_observer(OnError oe)
-> typename std::enable_if<
!detail::is_on_next_of<T, OnError>::value &&
detail::is_on_error<OnError>::value,
observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>>::type {
return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>(
detail::OnNextEmpty<T>(), std::move(oe));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
auto make_observer(OnNext on, OnError oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
observer<T, detail::stateless_observer_tag, OnNext, OnError>>::type {
return observer<T, detail::stateless_observer_tag, OnNext, OnError>(
std::move(on), std::move(oe));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
auto make_observer(OnNext on, OnCompleted oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>>::type {
return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>(
std::move(on), DefaultOnError(), std::move(oc));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
auto make_observer(OnNext on, OnError oe, OnCompleted oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>::type {
return observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(
std::move(on), std::move(oe), std::move(oc));
}
template<class T, class State, class OnNext>
auto make_observer(State os, OnNext on)
-> typename std::enable_if<
!detail::is_on_next_of<T, State>::value &&
!detail::is_on_error<State>::value,
observer<T, State, OnNext>>::type {
return observer<T, State, OnNext>(
std::move(os), std::move(on));
}
template<class T, class State, class OnError>
auto make_observer(State os, OnError oe)
-> typename std::enable_if<
!detail::is_on_next_of<T, State>::value &&
!detail::is_on_error<State>::value &&
detail::is_on_error_for<State, OnError>::value,
observer<T, State, detail::OnNextEmpty<T>, OnError>>::type {
return observer<T, State, detail::OnNextEmpty<T>, OnError>(
std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
}
template<class T, class State, class OnNext, class OnError>
auto make_observer(State os, OnNext on, OnError oe)
-> typename std::enable_if<
!detail::is_on_next_of<T, State>::value &&
!detail::is_on_error<State>::value &&
detail::is_on_error_for<State, OnError>::value,
observer<T, State, OnNext, OnError>>::type {
return observer<T, State, OnNext, OnError>(
std::move(os), std::move(on), std::move(oe));
}
template<class T, class State, class OnNext, class OnCompleted>
auto make_observer(State os, OnNext on, OnCompleted oc)
-> typename std::enable_if<
!detail::is_on_next_of<T, State>::value &&
!detail::is_on_error<State>::value,
observer<T, State, OnNext, void, OnCompleted>>::type {
return observer<T, State, OnNext, void, OnCompleted>(
std::move(os), std::move(on), std::move(oc));
}
template<class T, class State, class OnNext, class OnError, class OnCompleted>
auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
-> typename std::enable_if<
!detail::is_on_next_of<T, State>::value &&
!detail::is_on_error<State>::value &&
detail::is_on_error_for<State, OnError>::value,
observer<T, State, OnNext, OnError, OnCompleted>>::type {
return observer<T, State, OnNext, OnError, OnCompleted>(
std::move(os), std::move(on), std::move(oe), std::move(oc));
}
template<class T, class Observer>
auto make_observer_dynamic(Observer o)
-> typename std::enable_if<
!detail::is_on_next_of<T, Observer>::value,
observer<T>>::type {
return observer<T>(std::move(o));
}
template<class T, class OnNext>
auto make_observer_dynamic(OnNext&& on)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value,
observer<T>>::type {
return observer<T>(
make_observer<T>(std::forward<OnNext>(on)));
}
template<class T, class OnNext, class OnError>
auto make_observer_dynamic(OnNext&& on, OnError&& oe)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value,
observer<T>>::type {
return observer<T>(
make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
}
template<class T, class OnNext, class OnCompleted>
auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_completed<OnCompleted>::value,
observer<T>>::type {
return observer<T>(
make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
}
template<class T, class OnNext, class OnError, class OnCompleted>
auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
-> typename std::enable_if<
detail::is_on_next_of<T, OnNext>::value &&
detail::is_on_error<OnError>::value &&
detail::is_on_completed<OnCompleted>::value,
observer<T>>::type {
return observer<T>(
make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
}
namespace detail {
template<class F>
struct maybe_from_result
{
typedef decltype((*(F*)nullptr)()) decl_result_type;
typedef rxu::decay_t<decl_result_type> result_type;
typedef rxu::maybe<result_type> type;
};
}
template<class F, class OnError>
auto on_exception(const F& f, const OnError& c)
-> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
typename detail::maybe_from_result<F>::type r;
RXCPP_TRY {
r.reset(f());
} RXCPP_CATCH(...) {
c(rxu::current_exception());
}
return r;
}
template<class F, class Subscriber>
auto on_exception(const F& f, const Subscriber& s)
-> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
typename detail::maybe_from_result<F>::type r;
RXCPP_TRY {
r.reset(f());
} RXCPP_CATCH(...) {
s.on_error(rxu::current_exception());
}
return r;
}
}
#endif