| #define RXCPP_SUBJECT_TEST_ASYNC 1 |
| |
| #include "../test.h" |
| |
| #include <rxcpp/operators/rx-finally.hpp> |
| |
| #include <future> |
| |
| |
| const int static_onnextcalls = 10000000; |
| static int aliased = 0; |
| |
| SCENARIO("for loop locks mutex", "[hide][for][mutex][long][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("locking mutex 100 million times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| int c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| std::mutex m; |
| for (int i = 0; i < onnextcalls; i++) { |
| std::unique_lock<std::mutex> guard(m); |
| ++c; |
| } |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop mutex : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| |
| } |
| } |
| } |
| |
| namespace syncwithvoid { |
| template<class T, class OnNext> |
| class sync_subscriber |
| { |
| public: |
| OnNext onnext; |
| bool issubscribed; |
| explicit sync_subscriber(OnNext on) |
| : onnext(on) |
| , issubscribed(true) |
| { |
| } |
| bool is_subscribed() {return issubscribed;} |
| void unsubscribe() {issubscribed = false;} |
| void on_next(T v) { |
| onnext(v); |
| } |
| }; |
| } |
| SCENARIO("for loop calls void on_next(int)", "[hide][for][asyncobserver][baseline][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("calling on_next 100 million times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| auto c = std::addressof(aliased); |
| *c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| auto onnext = [c](int){++*c;}; |
| syncwithvoid::sync_subscriber<int, decltype(onnext)> scbr(onnext); |
| for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { |
| scbr.on_next(i); |
| } |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop void : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| |
| } |
| } |
| } |
| |
| namespace asyncwithready { |
| // ready is an immutable class. |
| class ready |
| { |
| public: |
| typedef std::function<void()> onthen_type; |
| private: |
| std::function<void(onthen_type)> setthen; |
| public: |
| ready() {} |
| ready(std::function<void(onthen_type)> st) : setthen(st) {} |
| bool is_ready() {return !setthen;} |
| void then(onthen_type ot) { |
| if (is_ready()) { |
| abort(); |
| } |
| setthen(ot); |
| } |
| }; |
| template<class T, class OnNext> |
| class async_subscriber |
| { |
| public: |
| OnNext onnext; |
| bool issubscribed; |
| int count; |
| explicit async_subscriber(OnNext on) |
| : onnext(on) |
| , issubscribed(true) |
| , count(0) |
| { |
| } |
| bool is_subscribed() {return issubscribed;} |
| void unsubscribe() {issubscribed = false;} |
| ready on_next(T v) { |
| // push v onto queue |
| |
| // under some condition pop v off of queue and pass it on |
| onnext(v); |
| |
| // for demo purposes |
| // simulate queue full every 100000 items |
| if (count == 100000) { |
| // 'queue is full' |
| ready no([this](ready::onthen_type ot){ |
| // full version will sync producer and consumer (in producer push and consumer pop) |
| // and decide when to restart the producer |
| if (!this->count) { |
| ot(); |
| } |
| }); |
| // set queue empty since the demo has no separate consumer thread |
| count = 0; |
| // 'queue is empty' |
| return no; |
| } |
| static const ready yes; |
| return yes; |
| } |
| }; |
| } |
| SCENARIO("for loop calls ready on_next(int)", "[hide][for][asyncobserver][ready][perf]"){ |
| static const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("calling on_next 100 million times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| auto c = std::addressof(aliased); |
| *c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| auto onnext = [&c](int){++*c;}; |
| asyncwithready::async_subscriber<int, decltype(onnext)> scbr(onnext); |
| asyncwithready::ready::onthen_type chunk; |
| int i = 0; |
| chunk = [&chunk, scbr, i]() mutable { |
| for (; i < onnextcalls && scbr.is_subscribed(); i++) { |
| auto controller = scbr.on_next(i); |
| if (!controller.is_ready()) { |
| controller.then(chunk); |
| return; |
| } |
| } |
| }; |
| chunk(); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop ready : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| |
| } |
| } |
| } |
| |
| namespace asyncwithfuture { |
| class unit {}; |
| template<class T, class OnNext> |
| class async_subscriber |
| { |
| public: |
| OnNext onnext; |
| bool issubscribed; |
| explicit async_subscriber(OnNext on) |
| : onnext(on) |
| , issubscribed(true) |
| { |
| } |
| bool is_subscribed() {return issubscribed;} |
| void unsubscribe() {issubscribed = false;} |
| std::future<unit> on_next(T v) { |
| std::promise<unit> ready; |
| ready.set_value(unit()); |
| onnext(v); return ready.get_future();} |
| }; |
| } |
| SCENARIO("for loop calls std::future<unit> on_next(int)", "[hide][for][asyncobserver][future][long][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("calling on_next 100 million times"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| auto c = std::addressof(aliased); |
| *c = 0; |
| int n = 1; |
| auto start = clock::now(); |
| auto onnext = [&c](int){++*c;}; |
| asyncwithfuture::async_subscriber<int, decltype(onnext)> scbr(onnext); |
| for (int i = 0; i < onnextcalls && scbr.is_subscribed(); i++) { |
| auto isready = scbr.on_next(i); |
| if (isready.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { |
| isready.wait(); |
| } |
| } |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop future<unit> : " << n << " subscribed, " << *c << " on_next calls, " << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| |
| } |
| } |
| } |
| |
| SCENARIO("for loop calls observer", "[hide][for][observer][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("observing 100 million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| static int& c = aliased; |
| int n = 1; |
| |
| c = 0; |
| auto start = clock::now(); |
| auto o = rx::make_observer<int>( |
| [](int){++c;}, |
| [](rxu::error_ptr){abort();}); |
| for (int i = 0; i < onnextcalls; i++) { |
| o.on_next(i); |
| } |
| o.on_completed(); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop -> observer : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; |
| } |
| } |
| } |
| |
| SCENARIO("for loop calls subscriber", "[hide][for][subscriber][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop"){ |
| WHEN("observing 100 million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| static int& c = aliased; |
| int n = 1; |
| |
| c = 0; |
| auto start = clock::now(); |
| auto o = rx::make_subscriber<int>( |
| [](int){++c;}, |
| [](rxu::error_ptr){abort();}); |
| for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { |
| o.on_next(i); |
| } |
| o.on_completed(); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| } |
| } |
| } |
| |
| SCENARIO("range calls subscriber", "[hide][range][subscriber][perf]"){ |
| const int& onnextcalls = static_onnextcalls; |
| GIVEN("a range"){ |
| WHEN("observing 100 million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| static int& c = aliased; |
| int n = 1; |
| |
| c = 0; |
| auto start = clock::now(); |
| |
| rxs::range<int>(1, onnextcalls).subscribe( |
| [](int){ |
| ++c; |
| }, |
| [](rxu::error_ptr){abort();}); |
| |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "range -> subscriber : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| } |
| } |
| } |
| |
| SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]"){ |
| static const int& onnextcalls = static_onnextcalls; |
| GIVEN("a for loop and a subject"){ |
| WHEN("multicasting a million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| |
| for (int n = 0; n < 10; n++) |
| { |
| auto p = std::make_shared<int>(0); |
| auto c = std::make_shared<int>(0); |
| rxsub::subject<int> sub; |
| |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| std::vector<std::future<int>> f(n); |
| std::atomic<int> asyncUnsubscriptions{0}; |
| #endif |
| |
| auto o = sub.get_subscriber(); |
| |
| o.add(rx::make_subscription([c, n](){ |
| auto expected = n * onnextcalls; |
| REQUIRE(*c == expected); |
| })); |
| |
| for (int i = 0; i < n; i++) { |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| f[i] = std::async([sub, o, &asyncUnsubscriptions]() { |
| auto source = sub.get_observable(); |
| while(o.is_subscribed()) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| rx::composite_subscription cs; |
| source |
| .finally([&asyncUnsubscriptions](){ |
| ++asyncUnsubscriptions;}) |
| .subscribe( |
| rx::make_subscriber<int>( |
| cs, |
| [cs](int){ |
| cs.unsubscribe(); |
| }, |
| [](rxu::error_ptr){abort();})); |
| } |
| return 0; |
| }); |
| #endif |
| sub.get_observable().subscribe( |
| [c, p](int){ |
| ++(*c); |
| }, |
| [](rxu::error_ptr){abort();}); |
| } |
| |
| auto start = clock::now(); |
| for (int i = 0; i < onnextcalls && o.is_subscribed(); i++) { |
| #if RXCPP_DEBUG_SUBJECT_RACE |
| if (*p != *c) abort(); |
| (*p) += n; |
| #endif |
| o.on_next(i); |
| } |
| o.on_completed(); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "loop -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; |
| #endif |
| std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl; |
| } |
| } |
| } |
| } |
| |
| SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){ |
| static const int& onnextcalls = static_onnextcalls; |
| GIVEN("a range and a subject"){ |
| WHEN("multicasting a million ints"){ |
| using namespace std::chrono; |
| typedef steady_clock clock; |
| for (int n = 0; n < 10; n++) |
| { |
| auto p = std::make_shared<int>(0); |
| auto c = std::make_shared<int>(0); |
| rxsub::subject<int> sub; |
| |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| std::vector<std::future<int>> f(n); |
| std::atomic<int> asyncUnsubscriptions{0}; |
| #endif |
| |
| auto o = sub.get_subscriber(); |
| |
| o.add(rx::make_subscription([c, n](){ |
| auto expected = n * onnextcalls; |
| REQUIRE(*c == expected); |
| })); |
| |
| for (int i = 0; i < n; i++) { |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| f[i] = std::async([sub, o, &asyncUnsubscriptions]() { |
| while(o.is_subscribed()) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| rx::composite_subscription cs; |
| sub.get_observable() |
| .finally([&asyncUnsubscriptions](){ |
| ++asyncUnsubscriptions;}) |
| .subscribe(cs, |
| [cs](int){ |
| cs.unsubscribe(); |
| }, |
| [](rxu::error_ptr){abort();}); |
| } |
| return 0; |
| }); |
| #endif |
| sub.get_observable() |
| .subscribe( |
| [c, p](int){ |
| ++(*c); |
| }, |
| [](rxu::error_ptr){abort();} |
| ); |
| } |
| |
| auto start = clock::now(); |
| rxs::range<int>(1, onnextcalls) |
| #if RXCPP_DEBUG_SUBJECT_RACE |
| .filter([c, p, n](int){ |
| if (*p != *c) abort(); |
| (*p) += n; |
| return true; |
| }) |
| #endif |
| .subscribe(o); |
| auto finish = clock::now(); |
| auto msElapsed = duration_cast<milliseconds>(finish-start); |
| std::cout << "range -> subject : " << n << " subscribed, " << std::setw(9) << (*c) << " on_next calls, "; |
| #if RXCPP_SUBJECT_TEST_ASYNC |
| std::cout << std::setw(4) << asyncUnsubscriptions << " async, "; |
| #endif |
| std::cout << msElapsed.count() << "ms elapsed " << *c / (msElapsed.count() / 1000.0) << " ops/sec"<< std::endl; |
| } |
| } |
| } |
| } |
| |
| |
| SCENARIO("subject - infinite source", "[subject][subjects]"){ |
| GIVEN("a subject and an infinite source"){ |
| |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| const rxsc::test::messages<bool> check; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(70, 1), |
| on.next(110, 2), |
| on.next(220, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7), |
| on.next(630, 8), |
| on.next(710, 9), |
| on.next(870, 10), |
| on.next(940, 11), |
| on.next(1020, 12) |
| }); |
| |
| rxsub::subject<int> s; |
| |
| auto results1 = w.make_subscriber<int>(); |
| |
| auto results2 = w.make_subscriber<int>(); |
| |
| auto results3 = w.make_subscriber<int>(); |
| |
| WHEN("multicasting an infinite source"){ |
| |
| auto checks = rxu::to_vector({ |
| check.next(0, false) |
| }); |
| |
| auto record = [&s, &check, &checks](long at) -> void { |
| checks.push_back(check.next(at, s.has_observers())); |
| }; |
| |
| auto o = s.get_subscriber(); |
| |
| w.schedule_absolute(100, [&s, &o, &checks, &record](const rxsc::schedulable&){ |
| s = rxsub::subject<int>(); o = s.get_subscriber(); checks.clear(); record(100);}); |
| w.schedule_absolute(200, [&xs, &o, &record](const rxsc::schedulable&){ |
| xs.subscribe(o); record(200);}); |
| w.schedule_absolute(1000, [&o, &record](const rxsc::schedulable&){ |
| o.unsubscribe(); record(1000);}); |
| |
| w.schedule_absolute(300, [&s, &results1, &record](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results1); record(300);}); |
| w.schedule_absolute(400, [&s, &results2, &record](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results2); record(400);}); |
| w.schedule_absolute(900, [&s, &results3, &record](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results3); record(900);}); |
| |
| w.schedule_absolute(600, [&results1, &record](const rxsc::schedulable&){ |
| results1.unsubscribe(); record(600);}); |
| w.schedule_absolute(700, [&results2, &record](const rxsc::schedulable&){ |
| results2.unsubscribe(); record(700);}); |
| w.schedule_absolute(800, [&results1, &record](const rxsc::schedulable&){ |
| results1.unsubscribe(); record(800);}); |
| w.schedule_absolute(950, [&results3, &record](const rxsc::schedulable&){ |
| results3.unsubscribe(); record(950);}); |
| |
| w.start(); |
| |
| THEN("result1 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7) |
| }); |
| auto actual = results1.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result2 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(410, 6), |
| on.next(520, 7), |
| on.next(630, 8) |
| }); |
| auto actual = results2.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result3 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(940, 11) |
| }); |
| auto actual = results3.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("checks contains expected messages"){ |
| auto required = rxu::to_vector({ |
| check.next(100, false), |
| check.next(200, false), |
| check.next(300, true), |
| check.next(400, true), |
| check.next(600, true), |
| check.next(700, false), |
| check.next(800, false), |
| check.next(900, true), |
| check.next(950, false), |
| check.next(1000, false) |
| }); |
| auto actual = checks; |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |
| |
| SCENARIO("subject - finite source", "[subject][subjects]"){ |
| GIVEN("a subject and an finite source"){ |
| |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(70, 1), |
| on.next(110, 2), |
| on.next(220, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7), |
| on.completed(630), |
| on.next(640, 9), |
| on.completed(650), |
| on.error(660, std::runtime_error("error on unsubscribed stream")) |
| }); |
| |
| rxsub::subject<int> s; |
| |
| auto results1 = w.make_subscriber<int>(); |
| |
| auto results2 = w.make_subscriber<int>(); |
| |
| auto results3 = w.make_subscriber<int>(); |
| |
| WHEN("multicasting an infinite source"){ |
| |
| auto o = s.get_subscriber(); |
| |
| w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ |
| s = rxsub::subject<int>(); o = s.get_subscriber();}); |
| w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ |
| xs.subscribe(o);}); |
| w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ |
| o.unsubscribe();}); |
| |
| w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results1);}); |
| w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results2);}); |
| w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results3);}); |
| |
| w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ |
| results1.unsubscribe();}); |
| w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ |
| results2.unsubscribe();}); |
| w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ |
| results1.unsubscribe();}); |
| w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ |
| results3.unsubscribe();}); |
| |
| w.start(); |
| |
| THEN("result1 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7) |
| }); |
| auto actual = results1.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result2 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(410, 6), |
| on.next(520, 7), |
| on.completed(630) |
| }); |
| auto actual = results2.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result3 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.completed(900) |
| }); |
| auto actual = results3.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |
| |
| |
| SCENARIO("subject - on_error in source", "[subject][subjects]"){ |
| GIVEN("a subject and a source with an error"){ |
| |
| auto sc = rxsc::make_test(); |
| auto w = sc.create_worker(); |
| const rxsc::test::messages<int> on; |
| |
| std::runtime_error ex("subject on_error in stream"); |
| |
| auto xs = sc.make_hot_observable({ |
| on.next(70, 1), |
| on.next(110, 2), |
| on.next(220, 3), |
| on.next(270, 4), |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7), |
| on.error(630, ex), |
| on.next(640, 9), |
| on.completed(650), |
| on.error(660, std::runtime_error("error on unsubscribed stream")) |
| }); |
| |
| rxsub::subject<int> s; |
| |
| auto results1 = w.make_subscriber<int>(); |
| |
| auto results2 = w.make_subscriber<int>(); |
| |
| auto results3 = w.make_subscriber<int>(); |
| |
| WHEN("multicasting an infinite source"){ |
| |
| auto o = s.get_subscriber(); |
| |
| w.schedule_absolute(100, [&s, &o](const rxsc::schedulable&){ |
| s = rxsub::subject<int>(); o = s.get_subscriber();}); |
| w.schedule_absolute(200, [&xs, &o](const rxsc::schedulable&){ |
| xs.subscribe(o);}); |
| w.schedule_absolute(1000, [&o](const rxsc::schedulable&){ |
| o.unsubscribe();}); |
| |
| w.schedule_absolute(300, [&s, &results1](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results1);}); |
| w.schedule_absolute(400, [&s, &results2](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results2);}); |
| w.schedule_absolute(900, [&s, &results3](const rxsc::schedulable&){ |
| s.get_observable().subscribe(results3);}); |
| |
| w.schedule_absolute(600, [&results1](const rxsc::schedulable&){ |
| results1.unsubscribe();}); |
| w.schedule_absolute(700, [&results2](const rxsc::schedulable&){ |
| results2.unsubscribe();}); |
| w.schedule_absolute(800, [&results1](const rxsc::schedulable&){ |
| results1.unsubscribe();}); |
| w.schedule_absolute(950, [&results3](const rxsc::schedulable&){ |
| results3.unsubscribe();}); |
| |
| w.start(); |
| |
| THEN("result1 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(340, 5), |
| on.next(410, 6), |
| on.next(520, 7) |
| }); |
| auto actual = results1.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result2 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.next(410, 6), |
| on.next(520, 7), |
| on.error(630, ex) |
| }); |
| auto actual = results2.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| THEN("result3 contains expected messages"){ |
| auto required = rxu::to_vector({ |
| on.error(900, ex) |
| }); |
| auto actual = results3.get_observer().messages(); |
| REQUIRE(required == actual); |
| } |
| |
| } |
| } |
| } |