| #include "../test.h" |
| #include <rxcpp/operators/rx-take.hpp> |
| #include <rxcpp/operators/rx-map.hpp> |
| #include <rxcpp/operators/rx-observe_on.hpp> |
| |
| const int static_onnextcalls = 100000; |
| |
| SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a range"){ |
| WHEN("multicasting a million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| auto el = rx::observe_on_new_thread(); |
| |
| for (int n = 0; n < 10; n++) |
| { |
| std::atomic_bool disposed; |
| std::atomic_bool done; |
| auto c = std::make_shared<int>(0); |
| |
| rx::composite_subscription cs; |
| cs.add([&](){ |
| if (!done) {abort();} |
| disposed = true; |
| }); |
| |
| auto start = clock::now(); |
| rxs::range<int>(1) |
| .take(onnextcalls) |
| .observe_on(el) |
| .as_blocking() |
| .subscribe( |
| cs, |
| [c](int){ |
| ++(*c); |
| }, |
| [&](){ |
| done = true; |
| }); |
| auto expected = onnextcalls; |
| REQUIRE(*c == expected); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "range -> observe_on new_thread : " << (*c) << " on_next calls, " << msElapsed.count() << "ms elapsed, int-per-second " << *c / (msElapsed.count() / 1000.0) << std::endl; |
| } |
| } |
| } |
| } |
| |
| SCENARIO("observe_on", "[observe][observe_on]"){ |
| GIVEN("a source"){ |
| auto sc = rxsc::make_test(); |
| auto so = rx::synchronize_in_one_worker(sc); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| |
| WHEN("subscribe_on is specified"){ |
| |
| auto res = w.start( |
| [so, xs]() { |
| return xs |
| .observe_on(so); |
| } |
| ); |
| |
| THEN("the output contains items sent while subscribed"){ |
| auto required = rxu::to_vector({ |
| on.next(211, 2), |
| on.next(241, 3), |
| on.completed(301) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was 1 subscription/unsubscription to the source"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 300) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |
| |
| SCENARIO("stream observe_on", "[observe][observe_on]"){ |
| GIVEN("a source"){ |
| auto sc = rxsc::make_test(); |
| auto so = rx::synchronize_in_one_worker(sc); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| |
| WHEN("observe_on is specified"){ |
| |
| auto res = w.start( |
| [so, xs]() { |
| return xs |
| | rxo::observe_on(so); |
| } |
| ); |
| |
| THEN("the output contains items sent while subscribed"){ |
| auto required = rxu::to_vector({ |
| on.next(211, 2), |
| on.next(241, 3), |
| on.completed(301) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was 1 subscription/unsubscription to the source"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(200, 300) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |
| |
| class nocompare { |
| public: |
| int v; |
| }; |
| |
| SCENARIO("observe_on no-comparison", "[observe][observe_on]"){ |
| GIVEN("a source"){ |
| auto sc = rxsc::make_test(); |
| auto so = rx::observe_on_one_worker(sc); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<nocompare> in; |
| const rxsc::test::messages<int> out; |
| |
| auto xs = sc.make_hot_observable({ |
| in.next(150, nocompare{1}), |
| in.next(210, nocompare{2}), |
| in.next(240, nocompare{3}), |
| in.completed(300) |
| }); |
| |
| WHEN("observe_on is specified"){ |
| |
| auto res = w.start( |
| [so, xs]() { |
| return xs |
| | rxo::observe_on(so) |
| | rxo::map([](nocompare v){ return v.v; }) |
| | rxo::as_dynamic(); |
| } |
| ); |
| |
| THEN("the output contains items sent while subscribed"){ |
| auto required = rxu::to_vector({ |
| out.next(211, 2), |
| out.next(241, 3), |
| out.completed(301) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was 1 subscription/unsubscription to the source"){ |
| auto required = rxu::to_vector({ |
| out.subscribe(200, 300) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |