| #include "../test.h" |
| #include <rxcpp/operators/rx-zip.hpp> |
| |
| SCENARIO("zip never/never", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto n1 = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| auto n2 = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return n1 |
| | rxo::zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| n2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| | rxo::as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip never N", "[zip][join][operators]"){ |
| GIVEN("N never completed hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| const std::size_t N = 4; |
| |
| std::vector<rxcpp::test::testable_observable<int>> n; |
| for (std::size_t i = 0; i < N; ++i) { |
| n.push_back( |
| sc.make_hot_observable({ |
| on.next(150, 1) |
| }) |
| ); |
| } |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return n[0] |
| | rxo::zip( |
| [](int v0, int v1, int v2, int v3){ |
| return v0 + v1 + v2 + v3; |
| }, |
| n[1], n[2], n[3] |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| | rxo::as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to each observable"){ |
| |
| std::for_each(n.begin(), n.end(), [&](rxcpp::test::testable_observable<int> &s){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = s.subscriptions(); |
| REQUIRE(required == actual); |
| }); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip never/empty", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| auto e = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return n |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| e |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the e"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip empty/never", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto e = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return e |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| n |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the e"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip empty/empty", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto e1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| auto e2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return e1 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| e2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only complete message"){ |
| auto required = rxu::to_vector({ |
| on.completed(210) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the e"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip empty N", "[zip][join][operators]"){ |
| GIVEN("N empty hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| const int N = 4; |
| |
| std::vector<rxcpp::test::testable_observable<int>> e; |
| for (int i = 0; i < N; ++i) { |
| e.push_back( |
| sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210 + 10 * i) |
| }) |
| ); |
| } |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return e[0] |
| .zip( |
| [](int v0, int v1, int v2, int v3){ |
| return v0 + v1 + v2 + v3; |
| }, |
| e[1], e[2], e[3] |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only complete message"){ |
| auto required = rxu::to_vector({ |
| on.completed(200 + 10 * N) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to each observable"){ |
| |
| int i = 0; |
| std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 200 + 10 * ++i) |
| }); |
| auto actual = s.subscriptions(); |
| REQUIRE(required == actual); |
| }); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip empty/return", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto e = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| auto o = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(220) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return e |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only complete message"){ |
| auto required = rxu::to_vector({ |
| on.completed(215) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the e"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 215) |
| }); |
| auto actual = o.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip return/empty", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(220) |
| }); |
| |
| auto e = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(210) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| e |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only complete message"){ |
| auto required = rxu::to_vector({ |
| on.completed(215) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 215) |
| }); |
| auto actual = o.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the e"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 210) |
| }); |
| auto actual = e.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip never/return", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| auto o = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(220) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return n |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip return/never", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(220) |
| }); |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| n |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output is empty"){ |
| auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 1000) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip return/return", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(230) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(220, 3), |
| on.completed(240) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o1 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(220, 2 + 3), |
| on.completed(240) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 240) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip empty/error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto emp = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(230) |
| }); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return emp |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the emp"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = emp.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error/empty", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| auto emp = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.completed(230) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| emp |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the emp"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = emp.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip never/error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return n |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error/never", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| auto n = sc.make_hot_observable({ |
| on.next(150, 1) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| n |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the n"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = n.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error/error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex1("zip on_error from source 1"); |
| std::runtime_error ex2("zip on_error from source 2"); |
| |
| auto err1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex1) |
| }); |
| |
| auto err2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(230, ex2) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err1 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex1) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip return/error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto o = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(230) |
| }); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the ret"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error/return", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| auto ret = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(230) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| ret |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the ret"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = ret.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip left completes first", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(220) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 4), |
| on.completed(225) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(215, 2 + 4), |
| on.completed(225) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 225) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip right completes first", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 4), |
| on.completed(225) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(220) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(215, 2 + 4), |
| on.completed(225) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 225) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip selector throws", "[zip][join][operators][!throws]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.completed(230) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(220, 3), |
| on.completed(240) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o1 |
| .zip( |
| [&ex](int, int) -> int { |
| rxu::throw_exception(ex); |
| }, |
| o2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip selector throws N", "[zip][join][operators][!throws]"){ |
| GIVEN("N hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| const int N = 4; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| std::vector<rxcpp::test::testable_observable<int>> e; |
| for (int i = 0; i < N; ++i) { |
| e.push_back( |
| sc.make_hot_observable({ |
| on.next(210 + 10 * i, 1), |
| on.completed(500) |
| }) |
| ); |
| } |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return e[0] |
| .zip( |
| [&ex](int, int, int, int) -> int { |
| rxu::throw_exception(ex); |
| }, |
| e[1], e[2], e[3] |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error"){ |
| auto required = rxu::to_vector({ |
| on.error(200 + 10 * N, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to each observable"){ |
| |
| std::for_each(e.begin(), e.end(), [&](rxcpp::test::testable_observable<int> &s){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 200 + 10 * N) |
| }); |
| auto actual = s.subscriptions(); |
| REQUIRE(required == actual); |
| }); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip typical N", "[zip][join][operators]"){ |
| GIVEN("N hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| const int N = 4; |
| |
| std::vector<rxcpp::test::testable_observable<int>> o; |
| for (int i = 0; i < N; ++i) { |
| o.push_back( |
| sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210 + 10 * i, i + 1), |
| on.next(410 + 10 * i, i + N + 1), |
| on.completed(800) |
| }) |
| ); |
| } |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o[0] |
| .zip( |
| [](int v0, int v1, int v2, int v3) { |
| return v0 + v1 + v2 + v3; |
| }, |
| o[1], o[2], o[3] |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(200 + 10 * N, N * (N + 1) / 2), |
| on.next(400 + 10 * N, N * (3 * N + 1) / 2) |
| }); |
| required.push_back(on.completed(800)); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to each observable"){ |
| |
| std::for_each(o.begin(), o.end(), [&](rxcpp::test::testable_observable<int> &s){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 800) |
| }); |
| auto actual = s.subscriptions(); |
| REQUIRE(required == actual); |
| }); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip interleaved with tail", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.next(225, 4), |
| on.completed(230) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(220, 3), |
| on.next(230, 5), |
| on.next(235, 6), |
| on.next(240, 7), |
| on.completed(250) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(220, 2 + 3), |
| on.next(230, 4 + 5), |
| on.completed(230) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip consecutive", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.next(225, 4), |
| on.completed(230) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(235, 6), |
| on.next(240, 7), |
| on.completed(250) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints"){ |
| auto required = rxu::to_vector({ |
| on.next(235, 2 + 6), |
| on.next(240, 4 + 7), |
| on.completed(240) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 240) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip consecutive ends with error left", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.next(225, 4), |
| on.error(230, ex) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(235, 6), |
| on.next(240, 7), |
| on.completed(250) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only an error"){ |
| auto required = rxu::to_vector({ |
| on.error(230, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 230) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip consecutive ends with error right", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto o1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(215, 2), |
| on.next(225, 4), |
| on.completed(250) |
| }); |
| |
| auto o2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(235, 6), |
| on.next(240, 7), |
| on.error(245, ex) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return o2 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| o1 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains combined ints followed by an error"){ |
| auto required = rxu::to_vector({ |
| on.next(235, 2 + 6), |
| on.next(240, 4 + 7), |
| on.error(245, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 245) |
| }); |
| auto actual = o1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the o2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 245) |
| }); |
| auto actual = o2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip next+error/error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex1("zip on_error from source 1"); |
| std::runtime_error ex2("zip on_error from source 2"); |
| |
| auto err1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.error(220, ex1) |
| }); |
| |
| auto err2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(230, ex2) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err1 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex1) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error/next+error", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex1("zip on_error from source 1"); |
| std::runtime_error ex2("zip on_error from source 2"); |
| |
| auto err1 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(230, ex1) |
| }); |
| |
| auto err2 = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.error(220, ex2) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err1 |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err2 |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex2) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err1"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err1.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err2"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err2.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error after completed left", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto ret = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(215) |
| }); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return ret |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| err |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the ret"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 215) |
| }); |
| auto actual = ret.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |
| |
| SCENARIO("zip error after completed right", "[zip][join][operators]"){ |
| GIVEN("2 hot observables of ints."){ |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("zip on_error from source"); |
| |
| auto err = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.error(220, ex) |
| }); |
| |
| auto ret = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.completed(215) |
| }); |
| |
| WHEN("each int is combined with the latest from the other source"){ |
| |
| auto res = w.start( |
| [&]() { |
| return err |
| .zip( |
| [](int v2, int v1){ |
| return v2 + v1; |
| }, |
| ret |
| ) |
| // forget type to workaround lambda deduction bug on msvc 2013 |
| .as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains only error message"){ |
| auto required = rxu::to_vector({ |
| on.error(220, ex) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the ret"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 215) |
| }); |
| auto actual = ret.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was one subscription and one unsubscription to the err"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 220) |
| }); |
| auto actual = err.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| } |
| } |
| } |