| #include "../test.h" |
| #include <rxcpp/operators/rx-reduce.hpp> |
| #include <rxcpp/operators/rx-map.hpp> |
| #include <rxcpp/operators/rx-subscribe_on.hpp> |
| #include <rxcpp/operators/rx-observe_on.hpp> |
| |
| #include <sstream> |
| |
| static const int static_subscriptions = 50000; |
| |
| SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){ |
| const int& subscriptions = static_subscriptions; |
| GIVEN("a for loop"){ |
| WHEN("subscribe 50K times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| int runs = 10; |
| |
| for (;runs > 0; --runs) { |
| |
| int c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| for (int i = 0; i < subscriptions; ++i) { |
| c += rx::observable<>::just(1) |
| .map([](int i) { |
| std::stringstream serializer; |
| serializer << i; |
| return serializer.str(); |
| }) |
| .map([](const std::string& s) { |
| int i; |
| std::stringstream(s) >> i; |
| return i; |
| }) |
| .subscribe_on(rx::observe_on_event_loop()) |
| .observe_on(rx::observe_on_event_loop()) |
| .as_blocking() |
| .count(); |
| } |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| REQUIRE(subscriptions == c); |
| std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| } |
| } |
| } |
| } |
| |
| SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){ |
| const int& subscriptions = static_subscriptions; |
| GIVEN("a for loop"){ |
| WHEN("subscribe 50K times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| int runs = 10; |
| |
| for (;runs > 0; --runs) { |
| |
| int c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| |
| for (int i = 0; i < subscriptions; ++i) { |
| c += rx::observable<>:: |
| just(1). |
| map([](int i) { |
| std::stringstream serializer; |
| serializer << i; |
| return serializer.str(); |
| }). |
| map([](const std::string& s) { |
| int i; |
| std::stringstream(s) >> i; |
| return i; |
| }). |
| subscribe_on(rx::observe_on_event_loop()). |
| as_blocking(). |
| count(); |
| } |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| REQUIRE(subscriptions == c); |
| std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| } |
| } |
| } |
| } |
| |
| SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){ |
| GIVEN("a source"){ |
| auto sc = rxsc::make_test(); |
| auto so = rx::synchronize_in_one_worker(sc); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| |
| WHEN("subscribe_on is specified"){ |
| |
| auto res = w.start( |
| [so, xs]() { |
| return xs |
| .subscribe_on(so); |
| } |
| ); |
| |
| THEN("the output contains items sent while subscribed"){ |
| auto required = rxu::to_vector({ |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was 1 subscription/unsubscription to the source"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(201, 300) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |
| |
| SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){ |
| GIVEN("a source"){ |
| auto sc = rxsc::make_test(); |
| auto so = rx::synchronize_in_one_worker(sc); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(150, 1), |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| |
| WHEN("subscribe_on is specified"){ |
| |
| auto res = w.start( |
| [so, xs]() { |
| return xs |
| | rxo::subscribe_on(so); |
| } |
| ); |
| |
| THEN("the output contains items sent while subscribed"){ |
| auto required = rxu::to_vector({ |
| on.next(210, 2), |
| on.next(240, 3), |
| on.completed(300) |
| }); |
| auto actual = res.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("there was 1 subscription/unsubscription to the source"){ |
| auto required = rxu::to_vector({ |
| on.subscribe(201, 300) |
| }); |
| auto actual = xs.subscriptions(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |