Bug: 74962500

Clone this repo:
  1. 37d2a9f observable: Fix dangling use in #as_blocking am: f2bfb6033a by Igor Murashkin · 3 months ago master
  2. b33827d schedulers: Fix std::thread clean-up race in newthread am: 96616d3ccb by Igor Murashkin · 3 months ago
  3. f2bfb60 observable: Fix dangling use in #as_blocking by Igor Murashkin · 3 months ago
  4. 96616d3 schedulers: Fix std::thread clean-up race in newthread by Igor Murashkin · 3 months ago
  5. fc1046c Merge stage-aosp-master into pi-dev-plus-aosp am: a44cd39d10 by Xin Li · 12 months ago android10-c2f2-release android10-c2f2-s1-release android10-c2f2-s2-release android10-dev android10-mainline-media-release android10-mainline-networking-release android10-mainline-resolv-release android10-qpr1-b-release android10-qpr1-b-s1-release android10-qpr1-c-release android10-qpr1-c-s1-release android10-qpr1-d-release android10-qpr1-mainline-release android10-qpr1-release ndk-sysroot-r21 android-10.0.0_r12 android-10.0.0_r13 android-10.0.0_r14 android-10.0.0_r15 android-10.0.0_r16 android-10.0.0_r18 android-10.0.0_r19 android-10.0.0_r20 android-10.0.0_r21 android-10.0.0_r22 android-10.0.0_r23 android-10.0.0_r24 android-10.0.0_r25 android-10.0.0_r26 android-10.0.0_r27 android-10.0.0_r28 android-10.0.0_r29 android-10.0.0_r7 android-10.0.0_r8 android-10.0.0_r9 android-mainline-10.0.0_r4 android-mainline-10.0.0_r5 android-mainline-10.0.0_r6 android-mainline-10.0.0_r7 android-mainline-10.0.0_r8 android-mainline-10.0.0_r9

The Reactive Extensions for C++ (RxCpp) is a library of algorithms for values-distributed-in-time. The Range-v3 library does the same for values-distributed-in-space.

WindowsWindows Status
Linux & OSXLinux & Osx Status
GithubGitHub license
GitHub release
GitHub commits
Gitter.imJoin in on gitter.im
PackagesNuGet version vcpkg port
Documentationrxcpp doxygen documentation
reactivex intro rx marble diagrams


RxCpp is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the Catch library.


Add Rx/v2/src to the include paths

lines from bytes

#include "rxcpp/rx.hpp"
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
using namespace Rx;

#include <regex>
#include <random>
using namespace std;
using namespace std::chrono;

int main()
    random_device rd;   // non-deterministic generator
    mt19937 gen(rd());
    uniform_int_distribution<> dist(4, 18);

    // for testing purposes, produce byte stream that from lines of text
    auto bytes = range(0, 10) |
        flat_map([&](int i){
            auto body = from((uint8_t)('A' + i)) |
                repeat(dist(gen)) |
            auto delim = from((uint8_t)'\r');
            return from(body, delim) | concat();
        }) |
        window(17) |
        flat_map([](observable<uint8_t> w){
            return w |
                    [](vector<uint8_t> v, uint8_t b){
                        return v;
                    }) |
        }) |
        tap([](vector<uint8_t>& v){
            // print input packet of bytes
            copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
            cout << endl;

    // recover lines of text from byte stream
    auto removespaces = [](string s){
        s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
        return s;

    // create strings split on \r
    auto strings = bytes |
        concat_map([](vector<uint8_t> v){
            string s(v.begin(), v.end());
            regex delim(R"/(\r)/");
            cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
            cregex_token_iterator end;
            vector<string> splits(cursor, end);
            return iterate(move(splits));
        }) |
        filter([](const string& s){
            return !s.empty();
        }) |
        publish() |

    // filter to last string in each line
    auto closes = strings |
            [](const string& s){
                return s.back() == '\r';
            }) |
        Rx::map([](const string&){return 0;});

    // group strings by line
    auto linewindows = strings |
        window_toggle(closes | start_with(0), [=](int){return closes;});

    // reduce the strings for a line into one string
    auto lines = linewindows |
        flat_map([&](observable<string> w) {
            return w | start_with<string>("") | sum() | Rx::map(removespaces);

    // print result
    lines |

    return 0;

Reactive Extensions

The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.

Credit ReactiveX.io

Other language implementations


Cloning RxCpp

RxCpp uses a git submodule (in ext/catch) for the excellent Catch library. The easiest way to ensure that the submodules are included in the clone is to add --recursive in the clone command.

git clone --recursive https://github.com/ReactiveX/RxCpp.git
cd RxCpp

Building RxCpp Unit Tests

  • RxCpp is regularly tested on OSX and Windows.
  • RxCpp is regularly built with Clang, Gcc and VC
  • RxCpp depends on the latest compiler releases.

RxCpp uses CMake to create build files for several platforms and IDE's

ide builds


mkdir projects/build
cd projects/build
cmake -G"Xcode" ../CMake -B.

Visual Studio 2017

mkdir projects\build
cd projects\build
cmake -G "Visual Studio 15" ..\CMake\
msbuild Project.sln

makefile builds


mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake

Linux --- Clang

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXE_LINKER_FLAGS="-stdlib=libc++" -B. ../CMake

Linux --- GCC

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake


mkdir projects\build
cd projects\build
cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ..\CMake

The build only produces test and example binaries.

Running tests

  • You can use the CMake test runner ctest
  • You can run the test binaries directly rxcpp_test_*
  • Tests can be selected by name or tag Example of by-tag

rxcpp_test_subscription [perf]


RxCpp uses Doxygen to generate project documentation.

When Doxygen+Graphviz is installed, CMake creates a special build task named doc. It creates actual documentation and puts it to projects/doxygen/html/ folder, which can be published to the gh-pages branch. Each merged pull request will build the docs and publish them.

Developers Material

Contributing Code

Before submitting a feature or substantial code contribution please discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source.

Microsoft Open Source Code of Conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.