| #include "../test.h" |
| |
| namespace detail { |
| |
| template<class Predicate> |
| struct liftfilter |
| { |
| typedef typename std::decay<Predicate>::type test_type; |
| test_type test; |
| |
| liftfilter(test_type t) |
| : test(t) |
| { |
| } |
| |
| template<class Subscriber> |
| struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type> |
| { |
| typedef filter_observer<Subscriber> this_type; |
| typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type; |
| typedef typename base_type::value_type value_type; |
| typedef typename std::decay<Subscriber>::type dest_type; |
| typedef rx::observer<value_type, this_type> observer_type; |
| dest_type dest; |
| test_type test; |
| |
| filter_observer(dest_type d, test_type t) |
| : dest(d) |
| , test(t) |
| { |
| } |
| void on_next(typename dest_type::value_type v) const { |
| bool filtered = false; |
| RXCPP_TRY { |
| filtered = !test(v); |
| } RXCPP_CATCH(...) { |
| dest.on_error(rxu::current_exception()); |
| return; |
| } |
| if (!filtered) { |
| dest.on_next(v); |
| } |
| } |
| void on_error(rxu::error_ptr e) const { |
| dest.on_error(e); |
| } |
| void on_completed() const { |
| dest.on_completed(); |
| } |
| |
| static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) { |
| return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t))); |
| } |
| }; |
| |
| template<class Subscriber> |
| auto operator()(const Subscriber& dest) const |
| -> decltype(filter_observer<Subscriber>::make(dest, test)) { |
| return filter_observer<Subscriber>::make(dest, test); |
| } |
| }; |
| |
| } |
| |
| namespace { |
| |
| template<class Predicate> |
| auto liftfilter(Predicate&& p) |
| -> detail::liftfilter<typename std::decay<Predicate>::type> { |
| return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p)); |
| } |
| |
| bool IsPrime(int x) |
| { |
| if (x < 2) return false; |
| for (int i = 2; i <= x/2; ++i) |
| { |
| if (x % i == 0) |
| return false; |
| } |
| return true; |
| } |
| |
| } |
| |
| SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){ |
| GIVEN("a test hot observable of ints"){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| long invoked = 0; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(110, 1), |
| on.next(180, 2), |
| on.next(230, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(380, 6), |
| on.next(390, 7), |
| on.next(450, 8), |
| on.next(470, 9), |
| on.next(560, 10), |
| on.next(580, 11), |
| on.completed(600) |
| }); |
| |
| WHEN("filtered to ints that are primes"){ |
| |
| auto res = w.start( |
| [&xs, &invoked]() { |
| return xs |
| .lift<int>(liftfilter([&invoked](int x) { |
| invoked++; |
| return IsPrime(x); |
| })) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| }, |
| 400 |
| ); |
| |
| THEN("the output only contains primes that arrived before disposal"){ |
| auto required = rxu::to_vector({ |
| on.next(230, 3), |
| on.next(340, 5), |
| on.next(390, 7) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 400) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("where was called until disposed"){ |
| REQUIRE(5 == invoked); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){ |
| GIVEN("a test hot observable of ints"){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| long invoked = 0; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(110, 1), |
| on.next(180, 2), |
| on.next(230, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(380, 6), |
| on.next(390, 7), |
| on.next(450, 8), |
| on.next(470, 9), |
| on.next(560, 10), |
| on.next(580, 11), |
| on.completed(600) |
| }); |
| |
| WHEN("filtered to ints that are primes"){ |
| |
| auto res = w.start( |
| [&xs, &invoked]() { |
| return xs |
| >> rxo::lift<int>(liftfilter([&invoked](int x) { |
| invoked++; |
| return IsPrime(x); |
| })) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| >> rxo::as_dynamic(); |
| }, |
| 400 |
| ); |
| |
| THEN("the output only contains primes that arrived before disposal"){ |
| auto required = rxu::to_vector({ |
| on.next(230, 3), |
| on.next(340, 5), |
| on.next(390, 7) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 400) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("where was called until disposed"){ |
| REQUIRE(5 == invoked); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){ |
| GIVEN("a test hot observable of ints"){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| long invoked = 0; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(110, 1), |
| on.next(180, 2), |
| on.next(230, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(380, 6), |
| on.next(390, 7), |
| on.next(450, 8), |
| on.next(470, 9), |
| on.next(560, 10), |
| on.next(580, 11), |
| on.completed(600) |
| }); |
| |
| WHEN("filtered to ints that are primes"){ |
| |
| auto res = w.start( |
| [&xs, &invoked]() { |
| auto predicate = [&](int x){ |
| invoked++; |
| return IsPrime(x); |
| }; |
| return xs |
| .lift<int>([=](rx::subscriber<int> dest){ |
| // VS2013 deduction issue requires dynamic (type-forgetting) |
| return rx::make_subscriber<int>( |
| dest, |
| rx::make_observer_dynamic<int>( |
| [=](int n){ |
| bool pass = false; |
| RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());}; |
| if (pass) {dest.on_next(n);} |
| }, |
| [=](rxu::error_ptr e){dest.on_error(e);}, |
| [=](){dest.on_completed();})); |
| }) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| }, |
| 400 |
| ); |
| |
| THEN("the output only contains primes that arrived before disposal"){ |
| auto required = rxu::to_vector({ |
| on.next(230, 3), |
| on.next(340, 5), |
| on.next(390, 7) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 400) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("where was called until disposed"){ |
| REQUIRE(5 == invoked); |
| } |
| } |
| } |
| } |
| |