| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. |
| |
| #pragma once |
| |
| #if !defined(RXCPP_RX_NOTIFICATION_HPP) |
| #define RXCPP_RX_NOTIFICATION_HPP |
| |
| #include "rx-includes.hpp" |
| |
| namespace rxcpp { |
| |
| namespace notifications { |
| |
| class subscription |
| { |
| long s; |
| long u; |
| |
| public: |
| explicit inline subscription(long s) |
| : s(s), u(std::numeric_limits<long>::max()) { |
| } |
| inline subscription(long s, long u) |
| : s(s), u(u) { |
| } |
| inline long subscribe() const { |
| return s; |
| } |
| inline long unsubscribe() const { |
| return u; |
| } |
| }; |
| |
| inline bool operator == (subscription lhs, subscription rhs) { |
| return lhs.subscribe() == rhs.subscribe() && lhs.unsubscribe() == rhs.unsubscribe(); |
| } |
| |
| inline std::ostream& operator<< (std::ostream& out, const subscription& s) { |
| out << s.subscribe() << "-" << s.unsubscribe(); |
| return out; |
| } |
| |
| namespace detail { |
| |
| template<typename T> |
| struct notification_base |
| : public std::enable_shared_from_this<notification_base<T>> |
| { |
| typedef subscriber<T> observer_type; |
| typedef std::shared_ptr<notification_base<T>> type; |
| |
| virtual ~notification_base() {} |
| |
| virtual void out(std::ostream& out) const =0; |
| virtual bool equals(const type& other) const = 0; |
| virtual void accept(const observer_type& o) const =0; |
| }; |
| |
| template<class T> |
| std::ostream& operator<< (std::ostream& out, const std::vector<T>& v); |
| |
| template<class T> |
| auto to_stream(std::ostream& os, const T& t, int, int) |
| -> decltype(os << t) { |
| return os << t; |
| } |
| |
| #if RXCPP_USE_RTTI |
| template<class T> |
| std::ostream& to_stream(std::ostream& os, const T&, int, ...) { |
| return os << "< " << typeid(T).name() << " does not support ostream>"; |
| } |
| #endif |
| |
| template<class T> |
| std::ostream& to_stream(std::ostream& os, const T&, ...) { |
| return os << "<the value does not support ostream>"; |
| } |
| |
| template<class T> |
| inline std::ostream& ostreamvector (std::ostream& os, const std::vector<T>& v) { |
| os << "["; |
| bool doemit = false; |
| for(auto& i : v) { |
| if (doemit) { |
| os << ", "; |
| } else { |
| doemit = true; |
| } |
| to_stream(os, i, 0, 0); |
| } |
| os << "]"; |
| return os; |
| } |
| |
| template<class T> |
| inline std::ostream& operator<< (std::ostream& os, const std::vector<T>& v) { |
| return ostreamvector(os, v); |
| } |
| |
| template<class T> |
| auto equals(const T& lhs, const T& rhs, int) |
| -> decltype(bool(lhs == rhs)) { |
| return lhs == rhs; |
| } |
| |
| template<class T> |
| bool equals(const T&, const T&, ...) { |
| rxu::throw_exception(std::runtime_error("value does not support equality tests")); |
| return false; |
| } |
| |
| } |
| |
| template<typename T> |
| struct notification |
| { |
| typedef typename detail::notification_base<T>::type type; |
| typedef typename detail::notification_base<T>::observer_type observer_type; |
| |
| private: |
| typedef detail::notification_base<T> base; |
| |
| struct on_next_notification : public base { |
| on_next_notification(T value) : value(std::move(value)) { |
| } |
| on_next_notification(const on_next_notification& o) : value(o.value) {} |
| on_next_notification(const on_next_notification&& o) : value(std::move(o.value)) {} |
| on_next_notification& operator=(on_next_notification o) { value = std::move(o.value); return *this; } |
| virtual void out(std::ostream& os) const { |
| os << "on_next( "; |
| detail::to_stream(os, value, 0, 0); |
| os << ")"; |
| } |
| virtual bool equals(const typename base::type& other) const { |
| bool result = false; |
| other->accept(make_subscriber<T>(make_observer_dynamic<T>([this, &result](T v) { |
| result = detail::equals(this->value, v, 0); |
| }))); |
| return result; |
| } |
| virtual void accept(const typename base::observer_type& o) const { |
| o.on_next(value); |
| } |
| const T value; |
| }; |
| |
| struct on_error_notification : public base { |
| on_error_notification(rxu::error_ptr ep) : ep(ep) { |
| } |
| on_error_notification(const on_error_notification& o) : ep(o.ep) {} |
| on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {} |
| on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; } |
| virtual void out(std::ostream& os) const { |
| os << "on_error("; |
| os << rxu::what(ep); |
| os << ")"; |
| } |
| virtual bool equals(const typename base::type& other) const { |
| bool result = false; |
| // not trying to compare exceptions |
| other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){ |
| result = true; |
| }))); |
| return result; |
| } |
| virtual void accept(const typename base::observer_type& o) const { |
| o.on_error(ep); |
| } |
| const rxu::error_ptr ep; |
| }; |
| |
| struct on_completed_notification : public base { |
| on_completed_notification() { |
| } |
| virtual void out(std::ostream& os) const { |
| os << "on_completed()"; |
| } |
| virtual bool equals(const typename base::type& other) const { |
| bool result = false; |
| other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](){ |
| result = true; |
| }))); |
| return result; |
| } |
| virtual void accept(const typename base::observer_type& o) const { |
| o.on_completed(); |
| } |
| }; |
| |
| struct exception_tag {}; |
| |
| template<typename Exception> |
| static |
| type make_on_error(exception_tag&&, Exception&& e) { |
| rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e)); |
| return std::make_shared<on_error_notification>(ep); |
| } |
| |
| struct exception_ptr_tag {}; |
| |
| static |
| type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) { |
| return std::make_shared<on_error_notification>(ep); |
| } |
| |
| public: |
| template<typename U> |
| static type on_next(U value) { |
| return std::make_shared<on_next_notification>(std::move(value)); |
| } |
| |
| static type on_completed() { |
| return std::make_shared<on_completed_notification>(); |
| } |
| |
| template<typename Exception> |
| static type on_error(Exception&& e) { |
| return make_on_error(typename std::conditional< |
| std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value, |
| exception_ptr_tag, exception_tag>::type(), |
| std::forward<Exception>(e)); |
| } |
| }; |
| |
| template<class T> |
| bool operator == (const std::shared_ptr<detail::notification_base<T>>& lhs, const std::shared_ptr<detail::notification_base<T>>& rhs) { |
| if (!lhs && !rhs) {return true;} |
| if (!lhs || !rhs) {return false;} |
| return lhs->equals(rhs); |
| } |
| |
| template<class T> |
| std::ostream& operator<< (std::ostream& os, const std::shared_ptr<detail::notification_base<T>>& n) { |
| n->out(os); |
| return os; |
| } |
| |
| |
| template<class T> |
| class recorded |
| { |
| long t; |
| T v; |
| public: |
| recorded(long t, T v) |
| : t(t), v(v) { |
| } |
| long time() const { |
| return t; |
| } |
| const T& value() const { |
| return v; |
| } |
| }; |
| |
| template<class T> |
| bool operator == (recorded<T> lhs, recorded<T> rhs) { |
| return lhs.time() == rhs.time() && lhs.value() == rhs.value(); |
| } |
| |
| template<class T> |
| std::ostream& operator<< (std::ostream& out, const recorded<T>& r) { |
| out << "@" << r.time() << "-" << r.value(); |
| return out; |
| } |
| |
| } |
| namespace rxn=notifications; |
| |
| inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::subscription>& vs) { |
| return rxcpp::notifications::detail::ostreamvector(out, vs); |
| } |
| template<class T> |
| inline std::ostream& operator<< (std::ostream& out, const std::vector<rxcpp::notifications::recorded<T>>& vr) { |
| return rxcpp::notifications::detail::ostreamvector(out, vr); |
| } |
| |
| } |
| |
| #endif |