| #include "rxcpp/rx.hpp" |
| |
| #include "rxcpp/rx-test.hpp" |
| #include "catch.hpp" |
| |
| #include <atomic> |
| #include <array> |
| |
| SCENARIO("publish_synchronized sample"){ |
| printf("//! [publish_synchronized sample]\n"); |
| auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)). |
| take(5). |
| publish_synchronized(rxcpp::observe_on_new_thread()); |
| |
| // Subscribe from the beginning |
| values.subscribe( |
| [](long v){printf("[1] OnNext: %ld\n", v);}, |
| [](){printf("[1] OnCompleted\n");}); |
| |
| // Another subscription from the beginning |
| values.subscribe( |
| [](long v){printf("[2] OnNext: %ld\n", v);}, |
| [](){printf("[2] OnCompleted\n");}); |
| |
| // Start emitting |
| values.connect(); |
| |
| // Wait before subscribing |
| rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ |
| values.subscribe( |
| [](long v){printf("[3] OnNext: %ld\n", v);}, |
| [](){printf("[3] OnCompleted\n");}); |
| }); |
| |
| // Add blocking subscription to see results |
| values.as_blocking().subscribe(); |
| printf("//! [publish_synchronized sample]\n"); |
| } |
| |
| SCENARIO("publish subject sample"){ |
| printf("//! [publish subject sample]\n"); |
| auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). |
| take(5). |
| publish(); |
| |
| // Subscribe from the beginning |
| values.subscribe( |
| [](long v){printf("[1] OnNext: %ld\n", v);}, |
| [](){printf("[1] OnCompleted\n");}); |
| |
| // Another subscription from the beginning |
| values.subscribe( |
| [](long v){printf("[2] OnNext: %ld\n", v);}, |
| [](){printf("[2] OnCompleted\n");}); |
| |
| // Start emitting |
| values.connect(); |
| |
| // Wait before subscribing |
| rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ |
| values.subscribe( |
| [](long v){printf("[3] OnNext: %ld\n", v);}, |
| [](){printf("[3] OnCompleted\n");}); |
| }); |
| |
| // Add blocking subscription to see results |
| values.as_blocking().subscribe(); |
| printf("//! [publish subject sample]\n"); |
| } |
| |
| SCENARIO("publish behavior sample"){ |
| printf("//! [publish behavior sample]\n"); |
| auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). |
| take(5). |
| publish(0L); |
| |
| // Subscribe from the beginning |
| values.subscribe( |
| [](long v){printf("[1] OnNext: %ld\n", v);}, |
| [](){printf("[1] OnCompleted\n");}); |
| |
| // Another subscription from the beginning |
| values.subscribe( |
| [](long v){printf("[2] OnNext: %ld\n", v);}, |
| [](){printf("[2] OnCompleted\n");}); |
| |
| // Start emitting |
| values.connect(); |
| |
| // Wait before subscribing |
| rxcpp::observable<>::timer(std::chrono::milliseconds(75)).subscribe([&](long){ |
| values.subscribe( |
| [](long v){printf("[3] OnNext: %ld\n", v);}, |
| [](){printf("[3] OnCompleted\n");}); |
| }); |
| |
| // Add blocking subscription to see results |
| values.as_blocking().subscribe(); |
| printf("//! [publish behavior sample]\n"); |
| } |
| |
| SCENARIO("publish diamond bgthread sample"){ |
| printf("//! [publish diamond bgthread sample]\n"); |
| |
| /* |
| * Implements the following diamond graph chain with publish+connect on a background thread. |
| * |
| * Values |
| * / \ |
| * *2 *100 |
| * \ / |
| * Merge |
| */ |
| auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()). |
| take(5). |
| publish(); |
| |
| // Left side multiplies by 2. |
| auto left = values.map( |
| [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} ); |
| |
| // Right side multiplies by 100. |
| auto right = values.map( |
| [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; }); |
| |
| // Merge the left,right sides together. |
| // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...]. |
| auto merged = left.merge(right); |
| |
| std::atomic<bool> completed{false}; |
| |
| // Add subscription to see results |
| merged.subscribe( |
| [](long v) { printf("[3] OnNext: %ld\n", v); }, |
| [&]() { printf("[3] OnCompleted:\n"); completed = true; }); |
| |
| // Start emitting |
| values.connect(); |
| |
| // Block until subscription terminates. |
| while (!completed) {} |
| |
| // Note: consider using ref_count(other) in real code, it's more composable. |
| |
| printf("//! [publish diamond bgthread sample]\n"); |
| } |
| |
| SCENARIO("publish diamond samethread sample"){ |
| printf("//! [publish diamond samethread sample]\n"); |
| |
| /* |
| * Implements the following diamond graph chain with publish+connect diamond without using threads. |
| * |
| * Values |
| * / \ |
| * *2 *100 |
| * \ / |
| * Merge |
| */ |
| |
| std::array<int, 5> a={{1, 2, 3, 4, 5}}; |
| auto values = rxcpp::observable<>::iterate(a). |
| publish(); |
| |
| // Left side multiplies by 2. |
| auto left = values.map( |
| [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} ); |
| |
| // Right side multiplies by 100. |
| auto right = values.map( |
| [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; }); |
| |
| // Merge the left,right sides together. |
| // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...]. |
| auto merged = left.merge(right); |
| |
| // Add subscription to see results |
| merged.subscribe( |
| [](long v) { printf("[3] OnNext: %ld\n", v); }, |
| [&]() { printf("[3] OnCompleted:\n"); }); |
| |
| // Start emitting |
| // - because there are no other threads here, the connect call blocks until the source |
| // calls on_completed. |
| values.connect(); |
| |
| // Note: consider using ref_count(other) in real code, it's more composable. |
| |
| printf("//! [publish diamond samethread sample]\n"); |
| } |
| |
| // see also examples/doxygen/ref_count.cpp for more diamond examples |