Merge "DO NOT MERGE - Merge pi-dev@5234907 into stage-aosp-master" into stage-aosp-master
diff --git a/.travis.yml b/.travis.yml
index 20a2197..db24163 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -10,15 +10,15 @@
 matrix:
   include:
 
-  - env: BUILD_TYPE=Debug ASAN=Off RUN_TEST=On
+  - env: BUILD_TYPE=Debug ASAN=Off RUN_TEST=On EXCEPTIONS=On
     os: osx
     osx_image: xcode8
 
-  - env: BUILD_TYPE=Debug ASAN=Off LLVM_VERSION=3.8.0 RUN_TEST=On
+  - env: BUILD_TYPE=Debug ASAN=Off LLVM_VERSION=3.8.0 RUN_TEST=On EXCEPTIONS=On
     os: linux
     compiler: clang
 
-  - env: BUILD_TYPE=Release ASAN=Off GCC_VERSION=4.9 RUN_TEST=On
+  - env: BUILD_TYPE=Release ASAN=Off GCC_VERSION=4.9 RUN_TEST=On EXCEPTIONS=On
     os: linux
     compiler: gcc
     addons:
@@ -28,7 +28,37 @@
         sources:
         - ubuntu-toolchain-r-test
 
-  - env: BUILD_TYPE=Debug ASAN=Off LLVM_VERSION=3.8.0 RUN_TEST=Off PROJECT=doc PUBLISH_DOCS=On DOXYGEN_VERSION=1.8.11
+  - env: BUILD_TYPE=Release ASAN=Off GCC_VERSION=7 RUN_TEST=On EXCEPTIONS=On
+    os: linux
+    compiler: gcc
+    addons: &gcc7
+      apt:
+        packages:
+          - g++-7
+        sources:
+          - ubuntu-toolchain-r-test
+
+  - env: BUILD_TYPE=Debug ASAN=Off GCC_VERSION=8 RUN_TEST=On EXCEPTIONS=Off
+    os: linux
+    compiler: gcc
+    addons: &gcc8
+      apt:
+        packages:
+          - g++-8
+        sources:
+          - ubuntu-toolchain-r-test
+
+  - env: BUILD_TYPE=Release ASAN=Off GCC_VERSION=8 RUN_TEST=On EXCEPTIONS=On
+    os: linux
+    compiler: gcc
+    addons: &gcc8
+      apt:
+        packages:
+          - g++-8
+        sources:
+          - ubuntu-toolchain-r-test
+
+  - env: BUILD_TYPE=Debug ASAN=Off LLVM_VERSION=3.8.0 RUN_TEST=Off EXCEPTIONS=On PROJECT=doc PUBLISH_DOCS=On DOXYGEN_VERSION=1.8.11
     os: linux
     compiler: clang
     addons:
@@ -96,7 +126,7 @@
     if [ -n "$DOXYGEN_VERSION" ]; then
       DOXYGEN_DIR=${DEPS_DIR}/doxygen-${DOXYGEN_VERSION}
       if [[ -z "$(ls -A ${DOXYGEN_DIR})" ]]; then
-        DOXYGEN_URL="http://ftp.stack.nl/pub/users/dimitri/doxygen-${DOXYGEN_VERSION}.linux.bin.tar.gz"
+        DOXYGEN_URL="https://downloads.sourceforge.net/doxygen/doxygen-${DOXYGEN_VERSION}.linux.bin.tar.gz"
         mkdir -p ${DOXYGEN_DIR} && travis_retry wget --quiet -O - ${DOXYGEN_URL} | tar --strip-components=1 -xz -C ${DOXYGEN_DIR}
       fi
       export PATH=${DOXYGEN_DIR}/bin:${PATH}
@@ -127,6 +157,9 @@
     if [ -z "$BUILD_TYPE" ]; then
       BUILD_TYPE=Release;
     fi;
+    if [ -z "$EXCEPTIONS" ]; then
+      EXCEPTIONS=On;
+    fi;
     if [[ "${ASAN}" == "On" ]]; then
       export CXXFLAGS="${CXXFLAGS} -fsanitize=address,undefined,integer -fno-omit-frame-pointer -fno-sanitize=unsigned-integer-overflow";
     fi;
@@ -138,7 +171,7 @@
   # generate build
   ############################################################################
   - cd ${TRAVIS_BUILD_DIR}
-  - cmake . -DCMAKE_BUILD_TYPE=$BUILD_TYPE
+  - cmake . -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DRX_USE_EXCEPTIONS=${EXCEPTIONS}
 
 script:
   ############################################################################
diff --git a/METADATA b/METADATA
index 2448caf..daf68f9 100644
--- a/METADATA
+++ b/METADATA
@@ -6,13 +6,13 @@
 third_party {
   url {
     type: HOMEPAGE
-    value: "https://github.com/Reactive-Extensions/RxCpp/"
+    value: "https://github.com/ReactiveX/RxCpp/"
   }
   url {
     type: GIT
-    value: "https://github.com/Reactive-Extensions/RxCpp.git"
+    value: "https://github.com/ReactiveX/RxCpp.git"
   }
-  version: "a7d5856385f126e874db6010d9dbfd37290c61de"
-  last_upgrade_date { year: 2018 month: 8 day: 10 }
+  version: "aac2fc97bc5fe680446afb5ae81bef0a9c0fbf8a"
+  last_upgrade_date { year: 2019 month: 2 day: 20 }
   license_type: NOTICE
 }
diff --git a/README.md b/README.md
index 330ca9c..228121e 100644
--- a/README.md
+++ b/README.md
@@ -169,12 +169,12 @@
 cmake -G"Xcode" ../CMake -B.
 ```
 
-#### Visual Studio 2013
+#### Visual Studio 2017
 ```batch
 mkdir projects\build
 cd projects\build
-cmake -G"Visual Studio 14" ..\CMake -B.
-msbuild rxcpp.sln
+cmake -G "Visual Studio 15" ..\CMake\
+msbuild Project.sln
 ```
 
 ### makefile builds
@@ -191,7 +191,7 @@
 ```shell
 mkdir projects/build
 cd projects/build
-cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
+cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXE_LINKER_FLAGS="-stdlib=libc++" -B. ../CMake
 make
 ```
 
diff --git a/Rx/v2/examples/doxygen/amb.cpp b/Rx/v2/examples/doxygen/amb.cpp
index e7dfaa6..eab7f7e 100644
--- a/Rx/v2/examples/doxygen/amb.cpp
+++ b/Rx/v2/examples/doxygen/amb.cpp
@@ -30,7 +30,7 @@
     printf("//! [implicit amb sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded amb sample"){
     printf("//! [threaded amb sample]\n");
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
index 58f023a..503db12 100644
--- a/Rx/v2/examples/doxygen/buffer.cpp
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -35,7 +35,7 @@
     printf("//! [buffer count+skip sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("buffer period+skip+coordination sample"){
     printf("//! [buffer period+skip+coordination sample]\n");
diff --git a/Rx/v2/examples/doxygen/combine_latest.cpp b/Rx/v2/examples/doxygen/combine_latest.cpp
index b220da4..5f2168b 100644
--- a/Rx/v2/examples/doxygen/combine_latest.cpp
+++ b/Rx/v2/examples/doxygen/combine_latest.cpp
@@ -17,7 +17,7 @@
     printf("//! [combine_latest sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("Coordination combine_latest sample"){
     printf("//! [Coordination combine_latest sample]\n");
diff --git a/Rx/v2/examples/doxygen/composite_exception.cpp b/Rx/v2/examples/doxygen/composite_exception.cpp
index 6cbfce3..00f351d 100644
--- a/Rx/v2/examples/doxygen/composite_exception.cpp
+++ b/Rx/v2/examples/doxygen/composite_exception.cpp
@@ -17,11 +17,11 @@
             [](std::exception_ptr composite_e) {
                 printf("OnError %s\n", rxu::what(composite_e).c_str());
                 try { std::rethrow_exception(composite_e); }
-                catch(rxcpp::composite_exception ce) {
+                catch(rxcpp::composite_exception const &ce) {
                     for(std::exception_ptr particular_e : ce.exceptions) {
 
                         try{ std::rethrow_exception(particular_e); }
-                        catch(std::runtime_error error) { printf(" *** %s\n", error.what()); }
+                        catch(std::runtime_error const &error) { printf(" *** %s\n", error.what()); }
 
                     }
                 }
diff --git a/Rx/v2/examples/doxygen/concat_map.cpp b/Rx/v2/examples/doxygen/concat_map.cpp
index 957665f..a087431 100644
--- a/Rx/v2/examples/doxygen/concat_map.cpp
+++ b/Rx/v2/examples/doxygen/concat_map.cpp
@@ -22,7 +22,7 @@
     printf("//! [concat_map sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded concat_map sample"){
     printf("//! [threaded concat_map sample]\n");
diff --git a/Rx/v2/examples/doxygen/create.cpp b/Rx/v2/examples/doxygen/create.cpp
index c2886c2..1f707c6 100644
--- a/Rx/v2/examples/doxygen/create.cpp
+++ b/Rx/v2/examples/doxygen/create.cpp
@@ -90,8 +90,8 @@
             [](int v){
                 printf("OnNext: %d\n", v);
             },
-            [](rxcpp::error_ptr ep){
-                printf("OnError: %s\n", rxu::what(ep));
+            [](rxcpp::util::error_ptr ep){
+                printf("OnError: %s\n", rxcpp::util::what(ep).c_str());
             },
             [](){
                 printf("OnCompleted\n");
diff --git a/Rx/v2/examples/doxygen/error.cpp b/Rx/v2/examples/doxygen/error.cpp
index 7600b94..87b5b78 100644
--- a/Rx/v2/examples/doxygen/error.cpp
+++ b/Rx/v2/examples/doxygen/error.cpp
@@ -9,8 +9,8 @@
     values.
         subscribe(
             [](int v){printf("OnNext: %d\n", v);},
-            [](rxcpp::error_ptr ep){
-                printf("OnError: %s\n", rxu::what(ep));
+            [](rxcpp::util::error_ptr ep){
+                printf("OnError: %s\n", rxcpp::util::what(ep).c_str());
             },
             [](){printf("OnCompleted\n");});
     printf("//! [error sample]\n");
@@ -23,8 +23,8 @@
         as_blocking().
         subscribe(
             [](int v){printf("OnNext: %d\n", v);},
-            [](rxcpp::error_ptr ep){
-                printf("OnError: %s\n", rxu::what(ep));
+            [](rxcpp::util::error_ptr ep){
+                printf("OnError: %s\n", rxcpp::util::what(ep).c_str());
             },
             [](){printf("OnCompleted\n");});
     printf("//! [threaded error sample]\n");
diff --git a/Rx/v2/examples/doxygen/finally.cpp b/Rx/v2/examples/doxygen/finally.cpp
index d947320..253d3a9 100644
--- a/Rx/v2/examples/doxygen/finally.cpp
+++ b/Rx/v2/examples/doxygen/finally.cpp
@@ -26,8 +26,8 @@
     values.
         subscribe(
             [](int v){printf("OnNext: %d\n", v);},
-            [](rxcpp::error_ptr ep){
-                printf("OnError: %s\n", rxu::what(ep));
+            [](rxcpp::util::error_ptr ep){
+                printf("OnError: %s\n", rxcpp::util::what(ep).c_str());
             },
             [](){printf("OnCompleted\n");});
     printf("//! [error finally sample]\n");
diff --git a/Rx/v2/examples/doxygen/flat_map.cpp b/Rx/v2/examples/doxygen/flat_map.cpp
index 0c88cbe..3e0a09f 100644
--- a/Rx/v2/examples/doxygen/flat_map.cpp
+++ b/Rx/v2/examples/doxygen/flat_map.cpp
@@ -22,7 +22,7 @@
     printf("//! [flat_map sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded flat_map sample"){
     printf("//! [threaded flat_map sample]\n");
diff --git a/Rx/v2/examples/doxygen/from.cpp b/Rx/v2/examples/doxygen/from.cpp
index 15186ba..7cc2276 100644
--- a/Rx/v2/examples/doxygen/from.cpp
+++ b/Rx/v2/examples/doxygen/from.cpp
@@ -13,7 +13,7 @@
     printf("//! [from sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded from sample"){
     printf("//! [threaded from sample]\n");
diff --git a/Rx/v2/examples/doxygen/group_by.cpp b/Rx/v2/examples/doxygen/group_by.cpp
index d30f334..74ef859 100644
--- a/Rx/v2/examples/doxygen/group_by.cpp
+++ b/Rx/v2/examples/doxygen/group_by.cpp
@@ -3,6 +3,8 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
+#include <sstream>
+
 SCENARIO("group_by sample"){
     printf("//! [group_by sample]\n");
     auto values = rxcpp::observable<>::range(0, 8).
@@ -23,7 +25,7 @@
 }
 
 //! [group_by full intro]
-bool less(int v1, int v2){
+static bool less(int v1, int v2){
     return v1 < v2;
 }
 //! [group_by full intro]
diff --git a/Rx/v2/examples/doxygen/main.cpp b/Rx/v2/examples/doxygen/main.cpp
index 8a831f2..4da59c5 100644
--- a/Rx/v2/examples/doxygen/main.cpp
+++ b/Rx/v2/examples/doxygen/main.cpp
@@ -4,6 +4,9 @@
 #include <iostream>
 #include <thread>
 #include <string>
+
+#include "main.hpp"
+
 std::string get_pid() {
     std::stringstream s;
     s << std::this_thread::get_id();
diff --git a/Rx/v2/examples/doxygen/main.hpp b/Rx/v2/examples/doxygen/main.hpp
new file mode 100644
index 0000000..7439c6a
--- /dev/null
+++ b/Rx/v2/examples/doxygen/main.hpp
@@ -0,0 +1,3 @@
+#pragma once
+
+std::string get_pid();
diff --git a/Rx/v2/examples/doxygen/merge.cpp b/Rx/v2/examples/doxygen/merge.cpp
index 6ff6539..008c378 100644
--- a/Rx/v2/examples/doxygen/merge.cpp
+++ b/Rx/v2/examples/doxygen/merge.cpp
@@ -30,7 +30,7 @@
     printf("//! [implicit merge sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded merge sample"){
     printf("//! [threaded merge sample]\n");
diff --git a/Rx/v2/examples/doxygen/merge_delay_error.cpp b/Rx/v2/examples/doxygen/merge_delay_error.cpp
index 8c28cd8..ae75926 100644
--- a/Rx/v2/examples/doxygen/merge_delay_error.cpp
+++ b/Rx/v2/examples/doxygen/merge_delay_error.cpp
@@ -4,6 +4,8 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
+#include <sstream>
+
 SCENARIO("merge_delay_error sample"){
 	printf("//! [merge_delay_error sample]\n");
 	auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(15)).map([](int) {return 1;});
@@ -33,24 +35,24 @@
 	printf("//! [implicit merge_delay_error sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded merge_delay_error sample"){
 	printf("//! [threaded merge_delay_error sample]\n");
 	printf("[thread %s] Start task\n", get_pid().c_str());
-	auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+	auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](long) -> long {
 		printf("[thread %s] Timer1 fired\n", get_pid().c_str());
 		return 1;
 	});
-	auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) {
+	auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](long) -> rxcpp::observable<long> {
 		std::stringstream ss;
 		ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n";
 		printf("%s\n", ss.str().c_str());
 		ss.str(std::string());
 		ss << "(Error from thread: " << get_pid().c_str() << ")\n";
-		return rxcpp::observable<>::error<int>(std::runtime_error(ss.str()));
+		return rxcpp::observable<>::error<long>(std::runtime_error(ss.str()));
 	});
-	auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+	auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](long) -> long {
 		printf("[thread %s] Timer3 fired\n", get_pid().c_str());
 		return 3;
 	});
@@ -68,19 +70,19 @@
 SCENARIO("threaded implicit merge_delay_error sample"){
 	printf("//! [threaded implicit merge_delay_error sample]\n");
 	printf("[thread %s] Start task\n", get_pid().c_str());
-	auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](int) {
+	auto o1 = rxcpp::observable<>::timer(std::chrono::milliseconds(10)).map([](long) -> long {
 		printf("[thread %s] Timer1 fired\n", get_pid().c_str());
 		return 1;
 	});
-	auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](int) {
+	auto o2 = rxcpp::observable<>::timer(std::chrono::milliseconds(20)).flat_map([](long) -> rxcpp::observable<long> {
 		std::stringstream ss;
 		ss << "[thread " << get_pid().c_str() << "] Timer2 failed\n";
 		printf("%s\n", ss.str().c_str());
 		ss.str(std::string());
 		ss << "(Error from thread: " << get_pid().c_str() << ")\n";
-		return rxcpp::observable<>::error<int>(std::runtime_error(ss.str()));
+		return rxcpp::observable<>::error<long>(std::runtime_error(ss.str()));
 	});
-	auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](int) {
+	auto o3 = rxcpp::observable<>::timer(std::chrono::milliseconds(30)).map([](long) -> long {
 		printf("[thread %s] Timer3 fired\n", get_pid().c_str());
 		return 3;
 	});
@@ -89,7 +91,7 @@
 	values.
 		as_blocking().
 		subscribe(
-			[](int v){printf("[thread %s] OnNext: %d\n", get_pid().c_str(), v);},
+			[](long v){printf("[thread %s] OnNext: %ld\n", get_pid().c_str(), v);},
 			[](std::exception_ptr eptr) { printf("[thread %s] OnError %s\n", get_pid().c_str(), rxu::what(eptr).c_str()); },
 			[](){printf("[thread %s] OnCompleted\n", get_pid().c_str());});
 	printf("[thread %s] Finish task\n", get_pid().c_str());
diff --git a/Rx/v2/examples/doxygen/observe_on.cpp b/Rx/v2/examples/doxygen/observe_on.cpp
index 927c339..9046999 100644
--- a/Rx/v2/examples/doxygen/observe_on.cpp
+++ b/Rx/v2/examples/doxygen/observe_on.cpp
@@ -3,7 +3,7 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("observe_on sample"){
     printf("//! [observe_on sample]\n");
diff --git a/Rx/v2/examples/doxygen/pairwise.cpp b/Rx/v2/examples/doxygen/pairwise.cpp
index 3dd8d34..3133679 100644
--- a/Rx/v2/examples/doxygen/pairwise.cpp
+++ b/Rx/v2/examples/doxygen/pairwise.cpp
@@ -23,7 +23,7 @@
     printf("//! [pairwise short sample]\n");
 }
 
-//std::string get_pid();
+//#include "main.hpp"
 //
 //SCENARIO("threaded flat_map sample"){
 //    printf("//! [threaded flat_map sample]\n");
diff --git a/Rx/v2/examples/doxygen/publish.cpp b/Rx/v2/examples/doxygen/publish.cpp
index d34e991..6c348a2 100644
--- a/Rx/v2/examples/doxygen/publish.cpp
+++ b/Rx/v2/examples/doxygen/publish.cpp
@@ -3,6 +3,9 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
+#include <atomic>
+#include <array>
+
 SCENARIO("publish_synchronized sample"){
     printf("//! [publish_synchronized sample]\n");
     auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
@@ -95,3 +98,95 @@
     values.as_blocking().subscribe();
     printf("//! [publish behavior sample]\n");
 }
+
+SCENARIO("publish diamond bgthread sample"){
+    printf("//! [publish diamond bgthread sample]\n");
+
+    /*
+     * Implements the following diamond graph chain with publish+connect on a background thread.
+     *
+     *            Values
+     *          /      \
+     *        *2        *100
+     *          \      /
+     *            Merge
+     */
+    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
+        take(5).
+        publish();
+
+    // Left side multiplies by 2.
+    auto left = values.map(
+        [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
+
+    // Right side multiplies by 100.
+    auto right = values.map(
+        [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
+
+    // Merge the left,right sides together.
+    // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+    auto merged = left.merge(right);
+
+    std::atomic<bool> completed{false};
+
+    // Add subscription to see results
+    merged.subscribe(
+        [](long v) { printf("[3] OnNext: %ld\n", v); },
+        [&]() { printf("[3] OnCompleted:\n"); completed = true; });
+
+    // Start emitting
+    values.connect();
+
+    // Block until subscription terminates.
+    while (!completed) {}
+
+    // Note: consider using ref_count(other) in real code, it's more composable.
+
+    printf("//! [publish diamond bgthread sample]\n");
+}
+
+SCENARIO("publish diamond samethread sample"){
+    printf("//! [publish diamond samethread sample]\n");
+
+    /*
+     * Implements the following diamond graph chain with publish+connect diamond without using threads.
+     *
+     *            Values
+     *          /      \
+     *        *2        *100
+     *          \      /
+     *            Merge
+     */
+
+    std::array<int, 5> a={{1, 2, 3, 4, 5}};
+    auto values = rxcpp::observable<>::iterate(a).
+        publish();
+
+    // Left side multiplies by 2.
+    auto left = values.map(
+        [](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );
+
+    // Right side multiplies by 100.
+    auto right = values.map(
+        [](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });
+
+    // Merge the left,right sides together.
+    // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+    auto merged = left.merge(right);
+
+    // Add subscription to see results
+    merged.subscribe(
+        [](long v) { printf("[3] OnNext: %ld\n", v); },
+        [&]() { printf("[3] OnCompleted:\n"); });
+
+    // Start emitting
+    // - because there are no other threads here, the connect call blocks until the source
+    //   calls on_completed.
+    values.connect();
+
+    // Note: consider using ref_count(other) in real code, it's more composable.
+
+    printf("//! [publish diamond samethread sample]\n");
+}
+
+// see also examples/doxygen/ref_count.cpp for more diamond examples
diff --git a/Rx/v2/examples/doxygen/range.cpp b/Rx/v2/examples/doxygen/range.cpp
index 69eecbd..3abb0ec 100644
--- a/Rx/v2/examples/doxygen/range.cpp
+++ b/Rx/v2/examples/doxygen/range.cpp
@@ -13,7 +13,7 @@
     printf("//! [range sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded range sample"){
     printf("//! [threaded range sample]\n");
diff --git a/Rx/v2/examples/doxygen/ref_count.cpp b/Rx/v2/examples/doxygen/ref_count.cpp
new file mode 100644
index 0000000..d056274
--- /dev/null
+++ b/Rx/v2/examples/doxygen/ref_count.cpp
@@ -0,0 +1,55 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+#include <array>
+
+SCENARIO("ref_count other diamond sample"){
+    printf("//! [ref_count other diamond sample]\n");
+
+    /*
+     * Implements the following diamond graph chain with publish+ref_count without using threads.
+     * This version is composable because it does not use connect explicitly.
+     *
+     *            Values
+     *          /      \
+     *        *2        *100
+     *          \      /
+     *            Merge
+     *             |
+     *            RefCount
+     */
+
+    std::array<double, 5> a={{1.0, 2.0, 3.0, 4.0, 5.0}};
+    auto values = rxcpp::observable<>::iterate(a)
+        // The root of the chain is only subscribed to once.
+        .tap([](double v) { printf("[0] OnNext: %lf\n", v); })
+        .publish();
+
+    auto values_to_long = values.map([](double v) { return (long) v; });
+
+    // Left side multiplies by 2.
+    auto left = values_to_long.map(
+        [](long v) -> long {printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2L;} );
+
+    // Right side multiplies by 100.
+    auto right = values_to_long.map(
+        [](long v) -> long {printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100L; });
+
+    // Merge the left,right sides together.
+    // The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
+    auto merged = left.merge(right);
+
+    // When this value is subscribed to, it calls connect on values.
+    auto connect_on_subscribe = merged.ref_count(values);
+
+    // This immediately starts emitting all values and blocks until they are completed.
+    connect_on_subscribe.subscribe(
+        [](long v) { printf("[3] OnNext: %ld\n", v); },
+        [&]() { printf("[3] OnCompleted:\n"); });
+
+    printf("//! [ref_count other diamond sample]\n");
+}
+
+// see also examples/doxygen/publish.cpp for non-ref_count diamonds
diff --git a/Rx/v2/examples/doxygen/replay.cpp b/Rx/v2/examples/doxygen/replay.cpp
index d6f08ed..2340851 100644
--- a/Rx/v2/examples/doxygen/replay.cpp
+++ b/Rx/v2/examples/doxygen/replay.cpp
@@ -3,7 +3,7 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("replay sample"){
     printf("//! [replay sample]\n");
diff --git a/Rx/v2/examples/doxygen/skip_until.cpp b/Rx/v2/examples/doxygen/skip_until.cpp
index c539d8e..d99cb6e 100644
--- a/Rx/v2/examples/doxygen/skip_until.cpp
+++ b/Rx/v2/examples/doxygen/skip_until.cpp
@@ -15,7 +15,7 @@
     printf("//! [skip_until sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded skip_until sample"){
     printf("//! [threaded skip_until sample]\n");
diff --git a/Rx/v2/examples/doxygen/subscribe_on.cpp b/Rx/v2/examples/doxygen/subscribe_on.cpp
index e2614bc..7a9da50 100644
--- a/Rx/v2/examples/doxygen/subscribe_on.cpp
+++ b/Rx/v2/examples/doxygen/subscribe_on.cpp
@@ -3,7 +3,7 @@
 #include "rxcpp/rx-test.hpp"
 #include "catch.hpp"
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("subscribe_on sample"){
     printf("//! [subscribe_on sample]\n");
diff --git a/Rx/v2/examples/doxygen/take_until.cpp b/Rx/v2/examples/doxygen/take_until.cpp
index 5c98bc4..082171b 100644
--- a/Rx/v2/examples/doxygen/take_until.cpp
+++ b/Rx/v2/examples/doxygen/take_until.cpp
@@ -26,7 +26,7 @@
     printf("//! [take_until time sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("threaded take_until sample"){
     printf("//! [threaded take_until sample]\n");
diff --git a/Rx/v2/examples/doxygen/with_latest_from.cpp b/Rx/v2/examples/doxygen/with_latest_from.cpp
index 200f446..cf0a422 100644
--- a/Rx/v2/examples/doxygen/with_latest_from.cpp
+++ b/Rx/v2/examples/doxygen/with_latest_from.cpp
@@ -17,7 +17,7 @@
     printf("//! [with_latest_from sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("Coordination with_latest_from sample"){
     printf("//! [Coordination with_latest_from sample]\n");
diff --git a/Rx/v2/examples/doxygen/zip.cpp b/Rx/v2/examples/doxygen/zip.cpp
index c5cd07b..6bd295b 100644
--- a/Rx/v2/examples/doxygen/zip.cpp
+++ b/Rx/v2/examples/doxygen/zip.cpp
@@ -17,7 +17,7 @@
     printf("//! [zip sample]\n");
 }
 
-std::string get_pid();
+#include "main.hpp"
 
 SCENARIO("Coordination zip sample"){
     printf("//! [Coordination zip sample]\n");
diff --git a/Rx/v2/src/rxcpp/operators/rx-publish.hpp b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
index dc38191..bc686fc 100644
--- a/Rx/v2/src/rxcpp/operators/rx-publish.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-publish.hpp
@@ -21,6 +21,18 @@
     \sample
     \snippet publish.cpp publish behavior sample
     \snippet output.txt publish behavior sample
+
+    \sample
+    \snippet publish.cpp publish diamond samethread sample
+    \snippet output.txt publish diamond samethread sample
+
+    \sample
+    \snippet publish.cpp publish diamond bgthread sample
+    \snippet output.txt publish diamond bgthread sample
+
+    \sample
+    \snippet ref_count.cpp ref_count other diamond sample
+    \snippet output.txt ref_count other diamond sample
 */
 
 #if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
diff --git a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
index 55dde05..b68315d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
@@ -4,10 +4,26 @@
 
 /*! \file rx-ref_count.hpp
 
-    \brief  takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source.
-            The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.
+    \brief  Make some \c connectable_observable behave like an ordinary \c observable.
+            Uses a reference count of the subscribers to control the connection to the published observable.
 
-    \return An observable that emitting the items from its source.
+            The first subscription will cause a call to \c connect(), and the last \c unsubscribe will unsubscribe the connection.
+
+            There are 2 variants of the operator:
+            \li \c ref_count(): calls \c connect on the \c source \c connectable_observable.
+            \li \c ref_count(other): calls \c connect on the \c other \c connectable_observable.
+
+    \tparam ConnectableObservable the type of the \c other \c connectable_observable (optional)
+    \param  other \c connectable_observable to call \c connect on (optional)
+
+    If \c other is omitted, then \c source is used instead (which must be a \c connectable_observable).
+    Otherwise, \c source can be a regular \c observable.
+
+    \return An \c observable that emits the items from its \c source.
+
+    \sample
+    \snippet ref_count.cpp ref_count other diamond sample
+    \snippet output.txt ref_count other diamond sample
  */
 
 #if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
@@ -30,29 +46,100 @@
 };
 template<class... AN>
 using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;
-    
-template<class T, class ConnectableObservable>
+
+// ref_count(other) takes a regular observable source, not a connectable_observable.
+// use template specialization to avoid instantiating 'subscribe' for two different types
+// which would cause a compilation error.
+template <typename connectable_type, typename observable_type>
+struct ref_count_state_base {
+    ref_count_state_base(connectable_type other, observable_type source)
+        : connectable(std::move(other))
+        , subscribable(std::move(source)) {}
+
+    connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+    observable_type subscribable; // subscribes to this if non-empty.
+
+    template <typename Subscriber>
+    void subscribe(Subscriber&& o) {
+        subscribable.subscribe(std::forward<Subscriber>(o));
+    }
+};
+
+// Note: explicit specializations have to be at namespace scope prior to C++17.
+template <typename connectable_type>
+struct ref_count_state_base<connectable_type, void> {
+    explicit ref_count_state_base(connectable_type c)
+        : connectable(std::move(c)) {}
+
+    connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
+
+    template <typename Subscriber>
+    void subscribe(Subscriber&& o) {
+        connectable.subscribe(std::forward<Subscriber>(o));
+    }
+};
+
+template<class T,
+         class ConnectableObservable,
+         class Observable = void> // note: type order flipped versus the operator.
 struct ref_count : public operator_base<T>
 {
-    typedef rxu::decay_t<ConnectableObservable> source_type;
+    typedef rxu::decay_t<Observable> observable_type;
+    typedef rxu::decay_t<ConnectableObservable> connectable_type;
 
-    struct ref_count_state : public std::enable_shared_from_this<ref_count_state>
+    // ref_count() == false
+    // ref_count(other) == true
+    using has_observable_t = rxu::negation<std::is_same<void, Observable>>;
+    static constexpr bool has_observable_v = has_observable_t::value;
+
+    struct ref_count_state : public std::enable_shared_from_this<ref_count_state>,
+                             public ref_count_state_base<ConnectableObservable, Observable>
     {
-        explicit ref_count_state(source_type o)
-            : source(std::move(o))
+        template <class HasObservable = has_observable_t,
+                  class Enabled = rxu::enable_if_all_true_type_t<
+                      rxu::negation<HasObservable>>>
+        explicit ref_count_state(connectable_type source)
+            : ref_count_state_base<ConnectableObservable, Observable>(std::move(source))
             , subscribers(0)
         {
         }
 
-        source_type source;
+        template <bool HasObservableV = has_observable_v>
+        ref_count_state(connectable_type other,
+                        typename std::enable_if<HasObservableV, observable_type>::type source)
+            : ref_count_state_base<ConnectableObservable, Observable>(std::move(other),
+                                                                      std::move(source))
+            , subscribers(0)
+        {
+        }
+
         std::mutex lock;
         long subscribers;
         composite_subscription connection;
     };
     std::shared_ptr<ref_count_state> state;
 
-    explicit ref_count(source_type o)
-        : state(std::make_shared<ref_count_state>(std::move(o)))
+    // connectable_observable<T> source = ...;
+    // source.ref_count();
+    //
+    // calls connect on source after the subscribe on source.
+    template <class HasObservable = has_observable_t,
+              class Enabled = rxu::enable_if_all_true_type_t<
+                  rxu::negation<HasObservable>>>
+    explicit ref_count(connectable_type source)
+        : state(std::make_shared<ref_count_state>(std::move(source)))
+    {
+    }
+
+    // connectable_observable<?> other = ...;
+    // observable<T> source = ...;
+    // source.ref_count(other);
+    //
+    // calls connect on 'other' after the subscribe on 'source'.
+    template <bool HasObservableV = has_observable_v>
+    ref_count(connectable_type other,
+              typename std::enable_if<HasObservableV, observable_type>::type source)
+        : state(std::make_shared<ref_count_state>(std::move(other), std::move(source)))
     {
     }
 
@@ -70,9 +157,9 @@
                     keepAlive->connection = composite_subscription();
                 }
             });
-        keepAlive->source.subscribe(std::forward<Subscriber>(o));
+        keepAlive->subscribe(std::forward<Subscriber>(o));
         if (needConnect) {
-            keepAlive->source.connect(keepAlive->connection);
+            keepAlive->connectable.connect(keepAlive->connection);
         }
     }
 };
@@ -104,11 +191,28 @@
         return Result(RefCount(std::forward<ConnectableObservable>(o)));
     }
 
+    template<class Observable,
+        class ConnectableObservable,
+        class Enabled = rxu::enable_if_all_true_type_t<
+            is_observable<Observable>,
+            is_connectable_observable<ConnectableObservable>>,
+        class SourceValue = rxu::value_type_t<Observable>,
+        class RefCount = rxo::detail::ref_count<SourceValue,
+            rxu::decay_t<ConnectableObservable>,
+            rxu::decay_t<Observable>>,
+        class Value = rxu::value_type_t<RefCount>,
+        class Result = observable<Value, RefCount>
+        >
+    static Result member(Observable&& o, ConnectableObservable&& other) {
+        return Result(RefCount(std::forward<ConnectableObservable>(other),
+                               std::forward<Observable>(o)));
+    }
+
     template<class... AN>
     static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
         std::terminate();
         return {};
-        static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
+        static_assert(sizeof...(AN) == 10000, "ref_count takes (optional ConnectableObservable)");
     }
 };
     
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 1f5c5e4..1eb47db 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -27,6 +27,8 @@
 #define RXCPP_USE_EXCEPTIONS 1
 #endif
 
+#define RXCPP_NORETURN __declspec(noreturn)
+
 #elif defined(__clang__)
 
 #if __has_feature(cxx_rvalue_references)
@@ -42,6 +44,12 @@
 #define RXCPP_USE_EXCEPTIONS 1
 #endif
 
+#if __has_feature(cxx_attributes)
+#define RXCPP_NORETURN [[noreturn]]
+#else
+#define RXCPP_NORETURN __attribute__ ((noreturn))
+#endif
+
 #elif defined(__GNUG__)
 
 #define GCC_VERSION (__GNUC__ * 10000 + \
@@ -64,6 +72,8 @@
 #define RXCPP_USE_EXCEPTIONS 1
 #endif
 
+#define RXCPP_NORETURN __attribute__ ((noreturn))
+
 #endif
 
 //
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 97bcabd..4f42007 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -574,6 +574,17 @@
         static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
     }
 
+    /*! @copydoc rx-ref_count.hpp
+     */
+    template<class... AN>
+    auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
+        /// \cond SHOW_SERVICE_MEMBERS
+        -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+        /// \endcond
+    {
+        return      observable_member(ref_count_tag{},                *this, std::forward<AN>(an)...);
+    }
+
     /*! @copydoc rxcpp::operators::as_blocking
      */
     template<class... AN>
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index ee4e53e..2d6bcb6 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -117,6 +117,14 @@
             std::terminate();
         }
     }
+
+    explicit subscription(std::shared_ptr<base_subscription_state> s)
+        : state(std::move(s))
+    {
+        if (!state) {
+            std::terminate();
+        }
+    }
 public:
 
     subscription()
@@ -178,9 +186,23 @@
     weak_state_type get_weak() {
         return state;
     }
+
+    // Atomically promote weak subscription to strong.
+    // Calls std::terminate if w has already expired.
     static subscription lock(weak_state_type w) {
         return subscription(w);
     }
+
+    // Atomically try to promote weak subscription to strong.
+    // Returns an empty maybe<> if w has already expired.
+    static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
+        auto strong_subscription = w.lock();
+        if (!strong_subscription) {
+            return rxu::detail::maybe<subscription>{};
+        } else {
+            return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
+        }
+    }
 };
 
 inline bool operator<(const subscription& lhs, const subscription& rhs) {
@@ -223,8 +245,14 @@
     typedef subscription::weak_state_type weak_subscription;
     struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
     {
+        // invariant: cannot access this data without the lock held.
         std::set<subscription> subscriptions;
+        // double checked locking:
+        //    issubscribed must be loaded again after each lock acquisition.
+        // invariant:
+        //    never call subscription::unsubscribe with lock held.
         std::mutex lock;
+        // invariant: transitions from 'true' to 'false' exactly once, at any time.
         std::atomic<bool> issubscribed;
 
         ~composite_subscription_state()
@@ -242,29 +270,78 @@
         {
         }
 
+        // Atomically add 's' to the set of subscriptions.
+        //
+        // If unsubscribe() has already occurred, this immediately
+        // calls s.unsubscribe().
+        //
+        // cs.unsubscribe() [must] happens-before s.unsubscribe()
+        //
+        // Due to the un-atomic nature of calling 's.unsubscribe()',
+        // it is possible to observe the unintuitive
+        // add(s)=>s.unsubscribe() prior
+        // to any of the unsubscribe()=>sN.unsubscribe().
         inline weak_subscription add(subscription s) {
-            if (!issubscribed) {
+            if (!issubscribed) {  // load.acq [seq_cst]
                 s.unsubscribe();
             } else if (s.is_subscribed()) {
                 std::unique_lock<decltype(lock)> guard(lock);
-                subscriptions.insert(s);
+                if (!issubscribed) {  // load.acq [seq_cst]
+                    // unsubscribe was called concurrently.
+                    guard.unlock();
+                    // invariant: do not call unsubscribe with lock held.
+                    s.unsubscribe();
+                } else {
+                    subscriptions.insert(s);
+                }
             }
             return s.get_weak();
         }
 
+        // Atomically remove 'w' from the set of subscriptions.
+        //
+        // This does nothing if 'w' was already previously removed,
+        // or refers to an expired value.
         inline void remove(weak_subscription w) {
-            if (issubscribed && !w.expired()) {
-                auto s = subscription::lock(w);
+            if (issubscribed) { // load.acq [seq_cst]
+                rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
+
+                if (maybe_subscription.empty()) {
+                  // Do nothing if the subscription has already expired.
+                  return;
+                }
+
                 std::unique_lock<decltype(lock)> guard(lock);
-                subscriptions.erase(std::move(s));
+                // invariant: subscriptions must be accessed under the lock.
+
+                if (issubscribed) { // load.acq [seq_cst]
+                  subscription& s = maybe_subscription.get();
+                  subscriptions.erase(std::move(s));
+                } // else unsubscribe() was called concurrently; this becomes a no-op.
             }
         }
 
+        // Atomically clear all subscriptions that were observably added
+        // (and not subsequently observably removed).
+        //
+        // Un-atomically call unsubscribe on those subscriptions.
+        //
+        // forall subscriptions in {add(s1),add(s2),...}
+        //                         - {remove(s3), remove(s4), ...}:
+        //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
+        //
+        // cs.unsubscribe() observed-before cs.clear ==> do nothing.
         inline void clear() {
-            if (issubscribed) {
+            if (issubscribed) { // load.acq [seq_cst]
                 std::unique_lock<decltype(lock)> guard(lock);
 
+                if (!issubscribed) { // load.acq [seq_cst]
+                  // unsubscribe was called concurrently.
+                  return;
+                }
+
                 std::set<subscription> v(std::move(subscriptions));
+                // invariant: do not call unsubscribe with lock held.
                 guard.unlock();
                 std::for_each(v.begin(), v.end(),
                               [](const subscription& s) {
@@ -272,11 +349,29 @@
             }
         }
 
+        // Atomically clear all subscriptions that were observably added
+        // (and not subsequently observably removed).
+        //
+        // Un-atomically call unsubscribe on those subscriptions.
+        //
+        // Switches to an 'unsubscribed' state, all subsequent
+        // adds are immediately unsubscribed.
+        //
+        // cs.unsubscribe() [must] happens-before
+        //     cs.add(s) ==> s.unsubscribe()
+        //
+        // forall subscriptions in {add(s1),add(s2),...}
+        //                         - {remove(s3), remove(s4), ...}:
+        //   cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
         inline void unsubscribe() {
-            if (issubscribed.exchange(false)) {
+            if (issubscribed.exchange(false)) {  // cas.acq_rel [seq_cst]
                 std::unique_lock<decltype(lock)> guard(lock);
 
+                // is_subscribed can only transition to 'false' once,
+                // does not need an extra atomic access here.
+
                 std::set<subscription> v(std::move(subscriptions));
+                // invariant: do not call unsubscribe with lock held.
                 guard.unlock();
                 std::for_each(v.begin(), v.end(),
                               [](const subscription& s) {
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 9ce455f..e5867e5 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -899,7 +899,7 @@
 }
 
 // Replace std::rethrow_exception to be compatible with our error_ptr typedef.
-[[noreturn]] inline void rethrow_exception(error_ptr e) {
+RXCPP_NORETURN inline void rethrow_exception(error_ptr e) {
 #if RXCPP_USE_EXCEPTIONS
   std::rethrow_exception(e);
 #else
@@ -917,7 +917,7 @@
 // A replacement for the "throw" keyword which is illegal when
 // exceptions are disabled with -fno-exceptions.
 template <typename E>
-[[noreturn]] inline void throw_exception(E&& e) {
+RXCPP_NORETURN inline void throw_exception(E&& e) {
 #if RXCPP_USE_EXCEPTIONS
   throw std::forward<E>(e);
 #else
@@ -930,7 +930,7 @@
 
 // TODO: Do we really need this? rxu::rethrow_exception(rxu::current_exception())
 // would have the same semantics in either case.
-[[noreturn]] inline void rethrow_current_exception() {
+RXCPP_NORETURN inline void rethrow_current_exception() {
 #if RXCPP_USE_EXCEPTIONS
   std::rethrow_exception(std::current_exception());
 #else
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index e31ed55..5145e92 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -72,6 +72,7 @@
             state->lifetime.add([keepAlive](){
                 std::unique_lock<std::mutex> guard(keepAlive->lock);
                 auto expired = std::move(keepAlive->q);
+                keepAlive->q = new_worker_state::queue_item_time{};
                 if (!keepAlive->q.empty()) std::terminate();
                 keepAlive->wake.notify_one();
 
diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
index b7f7d68..020b0f8 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
@@ -43,6 +43,7 @@
         mutable std::list<time_point_type> time_points;
         mutable count_type count;
         mutable period_type period;
+        mutable composite_subscription replayLifetime;
     public:
         mutable coordination_type coordination;
         mutable coordinator_type coordinator;
@@ -56,9 +57,13 @@
         }
 
     public:
-        explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator)
+        ~replay_observer_state(){
+            replayLifetime.unsubscribe();
+        }
+        explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
             : count(_count)
             , period(_period)
+            , replayLifetime(_replayLifetime)
             , coordination(std::move(_coordination))
             , coordinator(std::move(_coordinator))
         {
@@ -66,6 +71,7 @@
 
         void add(T v) const {
             std::unique_lock<std::mutex> guard(lock);
+
             if (!count.empty()) {
                 if (values.size() == count.get())
                     remove_oldest();
@@ -89,11 +95,12 @@
     std::shared_ptr<replay_observer_state> state;
 
 public:
-    replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs)
-        : base_type(cs)
+    replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
+        : base_type(subscriberLifetime)
     {
-        auto coordinator = coordination.create_coordinator(cs);
-        state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator));
+        replayLifetime.add(subscriberLifetime);
+        auto coordinator = coordination.create_coordinator(replayLifetime);
+        state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
     }
 
     subscriber<T> get_subscriber() const {
@@ -129,22 +136,22 @@
 
 public:
     explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
-        : s(count_type(), period_type(), cn, cs)
+        : s(count_type(), period_type(), cn, cs, composite_subscription{})
     {
     }
 
     replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
-        : s(count_type(std::move(count)), period_type(), cn, cs)
+        : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
     {
     }
 
     replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
-        : s(count_type(), period_type(period), cn, cs)
+        : s(count_type(), period_type(period), cn, cs, composite_subscription{})
     {
     }
 
     replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
-        : s(count_type(count), period_type(period), cn, cs)
+        : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
     {
     }
 
@@ -163,9 +170,8 @@
     observable<T> get_observable() const {
         auto keepAlive = s;
         auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
-            if (keepAlive.get_subscription().is_subscribed()) {
-                for (auto&& value: get_values())
-                    o.on_next(value);
+            for (auto&& value: get_values()) {
+                o.on_next(value);
             }
             keepAlive.add(keepAlive.get_subscriber(), std::move(o));
         });
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index dcb998f..c2d1530 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -88,6 +88,15 @@
     ${TEST_DIR}/operators/zip.cpp
 )
 
+set(TEST_COMPILE_DEFINITIONS "")
+set(TEST_COMMAND_ARGUMENTS "")
+
+if (NOT RX_USE_EXCEPTIONS)
+    MESSAGE( STATUS "no exceptions" )
+    list(APPEND TEST_COMPILE_DEFINITIONS CATCH_CONFIG_DISABLE_EXCEPTIONS)
+    list(APPEND TEST_COMMAND_ARGUMENTS -e)
+endif()
+
 
 add_executable(rxcppv2_test ${TEST_DIR}/test.cpp ${TEST_SOURCES})
 add_executable(rxcpp::tests ALIAS rxcppv2_test)
@@ -98,6 +107,7 @@
 )
 target_compile_options(rxcppv2_test PUBLIC ${RX_COMPILE_OPTIONS})
 target_compile_features(rxcppv2_test PUBLIC ${RX_COMPILE_FEATURES})
+target_compile_definitions(rxcppv2_test PUBLIC ${TEST_COMPILE_DEFINITIONS})
 target_include_directories(rxcppv2_test
     PUBLIC ${RX_SRC_DIR} ${RX_CATCH_DIR}
     )
@@ -110,7 +120,7 @@
     set(ONE_TEST_FULL_NAME "rxcpp_test_${ONE_TEST_NAME}")
     add_executable( ${ONE_TEST_FULL_NAME} ${ONE_TEST_SOURCE} )
     add_executable( rxcpp::${ONE_TEST_NAME} ALIAS ${ONE_TEST_FULL_NAME})
-    target_compile_definitions(${ONE_TEST_FULL_NAME} PUBLIC "CATCH_CONFIG_MAIN")
+    target_compile_definitions(${ONE_TEST_FULL_NAME} PUBLIC "CATCH_CONFIG_MAIN" ${TEST_COMPILE_DEFINITIONS})
     target_compile_options(${ONE_TEST_FULL_NAME} PUBLIC ${RX_COMPILE_OPTIONS})
     target_compile_features(${ONE_TEST_FULL_NAME} PUBLIC ${RX_COMPILE_FEATURES})
     target_include_directories(${ONE_TEST_FULL_NAME}
@@ -118,7 +128,7 @@
         )
     target_link_libraries(${ONE_TEST_FULL_NAME} ${CMAKE_THREAD_LIBS_INIT})
 
-    add_test(NAME ${ONE_TEST_NAME} COMMAND ${ONE_TEST_FULL_NAME})
+    add_test(NAME ${ONE_TEST_NAME} COMMAND ${ONE_TEST_FULL_NAME} ${TEST_COMMAND_ARGUMENTS})
 endforeach(ONE_TEST_SOURCE ${TEST_SOURCES})
 
 
diff --git a/Rx/v2/test/operators/buffer.cpp b/Rx/v2/test/operators/buffer.cpp
index 2aec5d1..e1b980c 100644
--- a/Rx/v2/test/operators/buffer.cpp
+++ b/Rx/v2/test/operators/buffer.cpp
@@ -455,7 +455,7 @@
     }
 }
 
-SCENARIO("buffer with time on intervals", "[buffer_with_time][operators][long][hide]"){
+SCENARIO("buffer with time on intervals", "[buffer_with_time][operators][long][!hide]"){
     GIVEN("7 intervals of 2 seconds"){
         WHEN("the period is 2sec and the initial is 5sec"){
             // time:   |-----------------|
@@ -500,7 +500,7 @@
     }
 }
 
-SCENARIO("buffer with time on intervals, implicit coordination", "[buffer_with_time][operators][long][hide]"){
+SCENARIO("buffer with time on intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
     GIVEN("7 intervals of 2 seconds"){
         WHEN("the period is 2sec and the initial is 5sec"){
             // time:   |-----------------|
@@ -543,7 +543,7 @@
     }
 }
 
-SCENARIO("buffer with time on overlapping intervals", "[buffer_with_time][operators][long][hide]"){
+SCENARIO("buffer with time on overlapping intervals", "[buffer_with_time][operators][long][!hide]"){
     GIVEN("5 intervals of 2 seconds"){
         WHEN("the period is 2sec and the initial is 5sec"){
             // time:   |-------------|
@@ -587,7 +587,7 @@
     }
 }
 
-SCENARIO("buffer with time on overlapping intervals, implicit coordination", "[buffer_with_time][operators][long][hide]"){
+SCENARIO("buffer with time on overlapping intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
     GIVEN("5 intervals of 2 seconds"){
         WHEN("the period is 2sec and the initial is 5sec"){
             // time:   |-------------|
@@ -631,7 +631,7 @@
     }
 }
 
-SCENARIO("buffer with time on intervals, error", "[buffer_with_time][operators][long][hide]"){
+SCENARIO("buffer with time on intervals, error", "[buffer_with_time][operators][long][!hide]"){
     GIVEN("5 intervals of 2 seconds"){
         WHEN("the period is 2sec and the initial is 5sec"){
             // time:   |-------------|
diff --git a/Rx/v2/test/operators/concat.cpp b/Rx/v2/test/operators/concat.cpp
index 86f4a7e..88dcac7 100644
--- a/Rx/v2/test/operators/concat.cpp
+++ b/Rx/v2/test/operators/concat.cpp
@@ -5,7 +5,7 @@
 
 const int static_onnextcalls = 1000000;
 
-SCENARIO("synchronize concat ranges", "[hide][range][synchronize][concat][perf]"){
+SCENARIO("synchronize concat ranges", "[!hide][range][synchronize][concat][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
@@ -33,7 +33,7 @@
     }
 }
 
-SCENARIO("observe_on concat ranges", "[hide][range][observe_on][concat][perf]"){
+SCENARIO("observe_on concat ranges", "[!hide][range][observe_on][concat][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
@@ -61,7 +61,7 @@
     }
 }
 
-SCENARIO("serialize concat ranges", "[hide][range][serialize][concat][perf]"){
+SCENARIO("serialize concat ranges", "[!hide][range][serialize][concat][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp
index cc163b1..cfd6f4f 100644
--- a/Rx/v2/test/operators/concat_map.cpp
+++ b/Rx/v2/test/operators/concat_map.cpp
@@ -8,7 +8,7 @@
 
 static const int static_tripletCount = 100;
 
-SCENARIO("concat_transform pythagorian ranges", "[hide][range][concat_transform][pythagorian][perf]"){
+SCENARIO("concat_transform pythagorian ranges", "[!hide][range][concat_transform][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -53,7 +53,7 @@
     }
 }
 
-SCENARIO("synchronize concat_transform pythagorian ranges", "[hide][range][concat_transform][synchronize][pythagorian][perf]"){
+SCENARIO("synchronize concat_transform pythagorian ranges", "[!hide][range][concat_transform][synchronize][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -101,7 +101,7 @@
     }
 }
 
-SCENARIO("observe_on concat_transform pythagorian ranges", "[hide][range][concat_transform][observe_on][pythagorian][perf]"){
+SCENARIO("observe_on concat_transform pythagorian ranges", "[!hide][range][concat_transform][observe_on][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -150,7 +150,7 @@
     }
 }
 
-SCENARIO("serialize concat_transform pythagorian ranges", "[hide][range][concat_transform][serialize][pythagorian][perf]"){
+SCENARIO("serialize concat_transform pythagorian ranges", "[!hide][range][concat_transform][serialize][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 2e01a30..e1837b0 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -8,7 +8,7 @@
 
 static const int static_tripletCount = 100;
 
-SCENARIO("pythagorian for loops", "[hide][for][pythagorian][perf]"){
+SCENARIO("pythagorian for loops", "[!hide][for][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("a for loop"){
         WHEN("generating pythagorian triplets"){
@@ -45,7 +45,7 @@
     }
 }
 
-SCENARIO("merge_transform pythagorian ranges", "[hide][range][merge_transform][pythagorian][perf]"){
+SCENARIO("merge_transform pythagorian ranges", "[!hide][range][merge_transform][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -89,7 +89,7 @@
     }
 }
 
-SCENARIO("synchronize merge_transform pythagorian ranges", "[hide][range][merge_transform][synchronize][pythagorian][perf]"){
+SCENARIO("synchronize merge_transform pythagorian ranges", "[!hide][range][merge_transform][synchronize][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -137,7 +137,7 @@
     }
 }
 
-SCENARIO("observe_on merge_transform pythagorian ranges", "[hide][range][merge_transform][observe_on][pythagorian][perf]"){
+SCENARIO("observe_on merge_transform pythagorian ranges", "[!hide][range][merge_transform][observe_on][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
@@ -185,7 +185,7 @@
     }
 }
 
-SCENARIO("serialize merge_transform pythagorian ranges", "[hide][range][merge_transform][serialize][pythagorian][perf]"){
+SCENARIO("serialize merge_transform pythagorian ranges", "[!hide][range][merge_transform][serialize][pythagorian][perf]"){
     const int& tripletCount = static_tripletCount;
     GIVEN("some ranges"){
         WHEN("generating pythagorian triplets"){
diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp
index b575cd9..645bda4 100644
--- a/Rx/v2/test/operators/group_by.cpp
+++ b/Rx/v2/test/operators/group_by.cpp
@@ -11,7 +11,7 @@
 #include <locale>
 #include <sstream>
 
-SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[hide][pi][group_by][observe_on][long][perf]"){
+SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){
     GIVEN("a for loop"){
         WHEN("partitioning pi series across all hardware threads"){
 
@@ -93,7 +93,7 @@
     }
 }
 
-SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[hide][pi][observe_on][long][perf]"){
+SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){
     GIVEN("a for loop"){
         WHEN("partitioning pi series across all hardware threads"){
 
diff --git a/Rx/v2/test/operators/merge.cpp b/Rx/v2/test/operators/merge.cpp
index 9a7f28c..917d74d 100644
--- a/Rx/v2/test/operators/merge.cpp
+++ b/Rx/v2/test/operators/merge.cpp
@@ -6,7 +6,7 @@
 const int static_onnextcalls = 1000000;
 
 
-SCENARIO("synchronize merge ranges", "[hide][range][synchronize][merge][perf]"){
+SCENARIO("synchronize merge ranges", "[!hide][range][synchronize][merge][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
@@ -34,7 +34,7 @@
     }
 }
 
-SCENARIO("observe_on merge ranges", "[hide][range][observe_on][merge][perf]"){
+SCENARIO("observe_on merge ranges", "[!hide][range][observe_on][merge][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
@@ -62,7 +62,7 @@
     }
 }
 
-SCENARIO("serialize merge ranges", "[hide][range][serialize][merge][perf]"){
+SCENARIO("serialize merge ranges", "[!hide][range][serialize][merge][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("some ranges"){
         WHEN("generating ints"){
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index b53b884..83172ec 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -3,8 +3,6 @@
 #include <rxcpp/operators/rx-merge_delay_error.hpp>
 #include <rxcpp/operators/rx-observe_on.hpp>
 
-const int static_onnextcalls = 1000000;
-
 //merge_delay_error must work the very same way as `merge()` except the error handling
 
 SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index ffa85aa..aac2d40 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -5,7 +5,7 @@
 
 const int static_onnextcalls = 100000;
 
-SCENARIO("range observed on new_thread", "[hide][range][observe_on_debug][observe_on][long][perf]"){
+SCENARIO("range observed on new_thread", "[!hide][range][observe_on_debug][observe_on][long][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a range"){
         WHEN("multicasting a million ints"){
diff --git a/Rx/v2/test/operators/publish.cpp b/Rx/v2/test/operators/publish.cpp
index 87c8fff..fe578d7 100644
--- a/Rx/v2/test/operators/publish.cpp
+++ b/Rx/v2/test/operators/publish.cpp
@@ -2,9 +2,11 @@
 #include <rxcpp/operators/rx-publish.hpp>
 #include <rxcpp/operators/rx-connect_forever.hpp>
 #include <rxcpp/operators/rx-ref_count.hpp>
+#include <rxcpp/operators/rx-map.hpp>
+#include <rxcpp/operators/rx-merge.hpp>
 
 
-SCENARIO("publish range", "[hide][range][subject][publish][subject][operators]"){
+SCENARIO("publish range", "[!hide][range][subject][publish][subject][operators]"){
     GIVEN("a range"){
         WHEN("published"){
             auto published = rxs::range<int>(0, 10).publish();
@@ -38,6 +40,104 @@
     }
 }
 
+SCENARIO("publish ref_count", "[range][subject][publish][ref_count][operators]"){
+    GIVEN("a range"){
+        WHEN("ref_count is used"){
+            auto published = rxs::range<int>(0, 3).publish().ref_count();
+
+            std::vector<int> results;
+            published.subscribe(
+            // on_next
+                [&](int v){
+                    results.push_back(v);
+                },
+            // on_completed
+                [](){});
+
+            std::vector<int> expected_results;
+            expected_results.push_back(0);
+            expected_results.push_back(1);
+            expected_results.push_back(2);
+            expected_results.push_back(3);
+
+            CHECK(results == expected_results);
+        }
+        WHEN("ref_count(other) is used"){
+            auto published = rxs::range<double>(0, 10).publish();
+            auto map_to_int = published.map([](double v) { return (long) v; });
+
+            // Ensures that 'ref_count(other)' has the source value type,
+            // not the publisher's value type.
+            auto with_ref_count = map_to_int.ref_count(published);
+
+            std::vector<long> results;
+
+            with_ref_count.subscribe(
+            // on_next
+                [&](long v){
+                    results.push_back(v);
+                },
+            // on_completed
+                [](){});
+
+            std::vector<long> expected_results;
+            for (long i = 0; i <= 10; ++i) {
+              expected_results.push_back(i);
+            }
+            CHECK(results == expected_results);
+        }
+        WHEN("ref_count(other) is used in a diamond"){
+            auto source = rxs::range<double>(0, 3);
+
+            int published_on_next_count = 0;
+            // Ensure we only subscribe once to 'published' when its in a diamond.
+            auto next = source.map(
+                [&](double v) {
+                    published_on_next_count++;
+                    return v;
+                }
+            );
+            auto published = next.publish();
+
+            // Ensures that 'x.ref_count(other)' has the 'x' value type, not the other's value
+            // type.
+            auto map_to_int = published.map([](double v) { return (long) v; });
+
+            auto left = map_to_int.map([](long v) { return v * 2; });
+            auto right = map_to_int.map([](long v) { return v * 100; });
+
+            auto merge = left.merge(right);
+            auto with_ref_count = merge.ref_count(published);
+
+            std::vector<long> results;
+
+            with_ref_count.subscribe(
+            // on_next
+                [&](long v){
+                    results.push_back(v);
+                },
+            // on_completed
+                [](){});
+
+            // Ensure we only subscribe once to 'published' when its in a diamond.
+            CHECK(published_on_next_count == 4);
+
+            std::vector<long> expected_results;
+            expected_results.push_back(0);
+            expected_results.push_back(0);
+            expected_results.push_back(2);
+            expected_results.push_back(100);
+            expected_results.push_back(4);
+            expected_results.push_back(200);
+            expected_results.push_back(6);
+            expected_results.push_back(300);
+
+            // Ensure left,right is interleaved without being biased towards one side.
+            CHECK(results == expected_results);
+        }
+    }
+}
+
 SCENARIO("publish basic", "[publish][multicast][subject][operators]"){
     GIVEN("a test hot observable of longs"){
         auto sc = rxsc::make_test();
diff --git a/Rx/v2/test/operators/scan.cpp b/Rx/v2/test/operators/scan.cpp
index f1d64e9..1cb38a5 100644
--- a/Rx/v2/test/operators/scan.cpp
+++ b/Rx/v2/test/operators/scan.cpp
@@ -3,7 +3,7 @@
 #include <rxcpp/operators/rx-take.hpp>
 #include <rxcpp/operators/rx-scan.hpp>
 
-SCENARIO("scan: issue 41", "[scan][operators][issue][hide]"){
+SCENARIO("scan: issue 41", "[scan][operators][issue][!hide]"){
     GIVEN("map of scan of interval"){
         auto sc = rxsc::make_current_thread();
         auto so = rxcpp::synchronize_in_one_worker(sc);
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
index 936fbc1..ef8a8c7 100644
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ b/Rx/v2/test/operators/subscribe_on.cpp
@@ -8,7 +8,7 @@
 
 static const int static_subscriptions = 50000;
 
-SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
+SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
     const int& subscriptions = static_subscriptions;
     GIVEN("a for loop"){
         WHEN("subscribe 50K times"){
@@ -48,7 +48,7 @@
     }
 }
 
-SCENARIO("for loop subscribes to map with subscribe_on", "[hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
+SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
     const int& subscriptions = static_subscriptions;
     GIVEN("a for loop"){
         WHEN("subscribe 50K times"){
diff --git a/Rx/v2/test/sources/interval.cpp b/Rx/v2/test/sources/interval.cpp
index 4a7c8dc..9ab2fca 100644
--- a/Rx/v2/test/sources/interval.cpp
+++ b/Rx/v2/test/sources/interval.cpp
@@ -1,6 +1,6 @@
 #include "../test.h"
 
-SCENARIO("schedule_periodically", "[hide][periodically][scheduler][long][perf][sources]"){
+SCENARIO("schedule_periodically", "[!hide][periodically][scheduler][long][perf][sources]"){
     GIVEN("schedule_periodically"){
         WHEN("the period is 1sec and the initial is 2sec"){
             using namespace std::chrono;
@@ -21,7 +21,7 @@
     }
 }
 
-SCENARIO("schedule_periodically by duration", "[hide][periodically][scheduler][long][perf][sources]"){
+SCENARIO("schedule_periodically by duration", "[!hide][periodically][scheduler][long][perf][sources]"){
     GIVEN("schedule_periodically_duration"){
         WHEN("the period is 1sec and the initial is 2sec"){
             using namespace std::chrono;
@@ -64,7 +64,7 @@
     }
 }
 
-SCENARIO("intervals", "[hide][periodically][interval][scheduler][long][perf][sources]"){
+SCENARIO("intervals", "[!hide][periodically][interval][scheduler][long][perf][sources]"){
     GIVEN("10 intervals of 1 seconds"){
         WHEN("the period is 1sec and the initial is 2sec"){
             using namespace std::chrono;
diff --git a/Rx/v2/test/sources/timer.cpp b/Rx/v2/test/sources/timer.cpp
index 4f62b10..3f46b21 100644
--- a/Rx/v2/test/sources/timer.cpp
+++ b/Rx/v2/test/sources/timer.cpp
@@ -1,6 +1,6 @@
 #include "../test.h"
 
-SCENARIO("timer", "[hide][periodically][timer][scheduler][long][perf][sources]"){
+SCENARIO("timer", "[!hide][periodically][timer][scheduler][long][perf][sources]"){
     GIVEN("the timer of 1 sec"){
         WHEN("the period is 1 sec"){
             using namespace std::chrono;
diff --git a/Rx/v2/test/subjects/subject.cpp b/Rx/v2/test/subjects/subject.cpp
index 9e21614..09318a3 100644
--- a/Rx/v2/test/subjects/subject.cpp
+++ b/Rx/v2/test/subjects/subject.cpp
@@ -10,7 +10,7 @@
 const int static_onnextcalls = 10000000;
 static int aliased = 0;
 
-SCENARIO("for loop locks mutex", "[hide][for][mutex][long][perf]"){
+SCENARIO("for loop locks mutex", "[!hide][for][mutex][long][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("locking mutex 100 million times"){
@@ -52,7 +52,7 @@
     }
 };
 }
-SCENARIO("for loop calls void on_next(int)", "[hide][for][asyncobserver][baseline][perf]"){
+SCENARIO("for loop calls void on_next(int)", "[!hide][for][asyncobserver][baseline][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("calling on_next 100 million times"){
@@ -137,7 +137,7 @@
     }
 };
 }
-SCENARIO("for loop calls ready on_next(int)", "[hide][for][asyncobserver][ready][perf]"){
+SCENARIO("for loop calls ready on_next(int)", "[!hide][for][asyncobserver][ready][perf]"){
     static const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("calling on_next 100 million times"){
@@ -191,7 +191,7 @@
         onnext(v); return ready.get_future();}
 };
 }
-SCENARIO("for loop calls std::future<unit> on_next(int)", "[hide][for][asyncobserver][future][long][perf]"){
+SCENARIO("for loop calls std::future<unit> on_next(int)", "[!hide][for][asyncobserver][future][long][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("calling on_next 100 million times"){
@@ -218,7 +218,7 @@
     }
 }
 
-SCENARIO("for loop calls observer", "[hide][for][observer][perf]"){
+SCENARIO("for loop calls observer", "[!hide][for][observer][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("observing 100 million ints"){
@@ -244,7 +244,7 @@
     }
 }
 
-SCENARIO("for loop calls subscriber", "[hide][for][subscriber][perf]"){
+SCENARIO("for loop calls subscriber", "[!hide][for][subscriber][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop"){
         WHEN("observing 100 million ints"){
@@ -270,7 +270,7 @@
     }
 }
 
-SCENARIO("range calls subscriber", "[hide][range][subscriber][perf]"){
+SCENARIO("range calls subscriber", "[!hide][range][subscriber][perf]"){
     const int& onnextcalls = static_onnextcalls;
     GIVEN("a range"){
         WHEN("observing 100 million ints"){
@@ -296,7 +296,7 @@
     }
 }
 
-SCENARIO("for loop calls subject", "[hide][for][subject][subjects][long][perf]"){
+SCENARIO("for loop calls subject", "[!hide][for][subject][subjects][long][perf]"){
     static const int& onnextcalls = static_onnextcalls;
     GIVEN("a for loop and a subject"){
         WHEN("multicasting a million ints"){
@@ -370,7 +370,7 @@
     }
 }
 
-SCENARIO("range calls subject", "[hide][range][subject][subjects][long][perf]"){
+SCENARIO("range calls subject", "[!hide][range][subject][subjects][long][perf]"){
     static const int& onnextcalls = static_onnextcalls;
     GIVEN("a range and a subject"){
         WHEN("multicasting a million ints"){
diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp
index ab70448..33218a7 100644
--- a/Rx/v2/test/subscriptions/subscription.cpp
+++ b/Rx/v2/test/subscriptions/subscription.cpp
@@ -8,7 +8,7 @@
 
 #include <sstream>
 
-SCENARIO("observe subscription", "[hide]"){
+SCENARIO("observe subscription", "[!hide]"){
     GIVEN("observable of ints"){
         WHEN("subscribe"){
             auto observers = std::make_shared<std::list<rxcpp::subscriber<int>>>();
@@ -26,7 +26,7 @@
 
 static const int static_subscriptions = 10000;
 
-SCENARIO("for loop subscribes to map", "[hide][for][just][subscribe][long][perf]"){
+SCENARIO("for loop subscribes to map", "[!hide][for][just][subscribe][long][perf]"){
     const int& subscriptions = static_subscriptions;
     GIVEN("a for loop"){
         WHEN("subscribe 100K times"){
@@ -71,7 +71,7 @@
     }
 }
 
-SCENARIO("for loop subscribes to combine_latest", "[hide][for][just][combine_latest][subscribe][long][perf]"){
+SCENARIO("for loop subscribes to combine_latest", "[!hide][for][just][combine_latest][subscribe][long][perf]"){
     const int& subscriptions = static_subscriptions;
     GIVEN("a for loop"){
         WHEN("subscribe 100K times"){
@@ -109,7 +109,7 @@
     }
 }
 
-SCENARIO("synchronized range debug", "[hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
+SCENARIO("synchronized range debug", "[!hide][subscribe][range][synchronize_debug][synchronize][long][perf]"){
     GIVEN("range"){
         WHEN("synchronized"){
             using namespace std::chrono;
@@ -218,7 +218,7 @@
     }
 }
 
-SCENARIO("observe_on range debug", "[hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
+SCENARIO("observe_on range debug", "[!hide][subscribe][range][observe_on_debug][observe_on][long][perf]"){
     GIVEN("range"){
         WHEN("observed on"){
             using namespace std::chrono;
diff --git a/appveyor.yml b/appveyor.yml
index 68ea434..42882b2 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -10,8 +10,6 @@
 
 environment:
   matrix:
-  - APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2013
-    VSVER: Visual Studio 12 2013 Win64
   - APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2015
     VSVER: Visual Studio 14 2015 Win64
   - APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2017
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 542a91a..3d07447 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -59,6 +59,7 @@
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-replay.hpp
+   ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-retry.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-scan.hpp
@@ -83,13 +84,16 @@
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
+   ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-zip.hpp
+   ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-composite_exception.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-connectable_observable.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-coordination.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-coroutine.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-includes.hpp
+   ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-lite.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-notification.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-observable.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/rx-observer.hpp
@@ -114,6 +118,7 @@
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/schedulers/rx-virtualtime.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-create.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-defer.hpp
+   ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-empty.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-error.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-interval.hpp
    ${RXCPP_DIR}/Rx/v2/src/rxcpp/sources/rx-iterate.hpp
@@ -130,8 +135,12 @@
 # Grouping all the source files puts them into a virtual folder in Visual Studio
 source_group("src" FILES ${RX_SOURCES})
 
+if (MSVC)
+# This 'RxCpp' build target only appears to be a virtual project for IDEs.
+# It won't actually build correctly since it is missing the shared.cmake integration.
 add_library(RxCpp SHARED ${RX_SOURCES})
 SET_TARGET_PROPERTIES(RxCpp PROPERTIES LINKER_LANGUAGE CXX)
+endif (MSVC)
 
 set(CMAKE_SKIP_INSTALL_ALL_DEPENDENCY TRUE CACHE BOOL "Don't require all projects to be built in order to install" FORCE)
 
diff --git a/projects/CMake/shared.cmake b/projects/CMake/shared.cmake
index ac0d09c..f7f7a31 100644
--- a/projects/CMake/shared.cmake
+++ b/projects/CMake/shared.cmake
@@ -1,5 +1,7 @@
 FIND_PACKAGE(Threads)
 
+option(RX_USE_EXCEPTIONS "Use C++ exceptions" ON)
+
 # define some compiler settings
 
 MESSAGE( STATUS "CMAKE_CXX_COMPILER_ID: " ${CMAKE_CXX_COMPILER_ID} )
@@ -13,13 +15,21 @@
         -Wno-error=unused-command-line-argument
         -ftemplate-depth=1024
         )
+    if (NOT RX_USE_EXCEPTIONS)
+        MESSAGE( STATUS "no exceptions" )
+        list(APPEND RX_COMPILE_OPTIONS -fno-exceptions)
+    endif()
 elseif (CMAKE_CXX_COMPILER_ID MATCHES "GNU")
     MESSAGE( STATUS "gnu compiler version: " ${CMAKE_CXX_COMPILER_VERSION} )
     MESSAGE( STATUS "using gnu settings" )
     set(RX_COMPILE_OPTIONS
         -Wall -Wextra -Werror -Wunused
         )
-elseif (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
+    if (NOT RX_USE_EXCEPTIONS)
+        MESSAGE( STATUS "no exceptions" )
+        list(APPEND RX_COMPILE_OPTIONS -fno-exceptions)
+    endif()
+  elseif (CMAKE_CXX_COMPILER_ID MATCHES "MSVC")
     MESSAGE( STATUS "msvc compiler version: " ${CMAKE_CXX_COMPILER_VERSION} )
     MESSAGE( STATUS "using msvc settings" )
     set(RX_COMPILE_OPTIONS
@@ -29,9 +39,13 @@
         /bigobj
         /DUNICODE /D_UNICODE # it is a new millenium
         )
+    if (NOT RX_USE_EXCEPTIONS)
+        MESSAGE( STATUS "no exceptions" )
+        list(APPEND RX_COMPILE_OPTIONS /EHs-c-)
+    endif()
     if (NOT CMAKE_CXX_COMPILER_VERSION VERSION_LESS "19.0.23506.0")
-        set(RX_COMPILE_OPTIONS
-            ${RX_COMPILE_OPTIONS}
+        MESSAGE( STATUS "with coroutines" )
+        list(APPEND RX_COMPILE_OPTIONS
             /await # enable coroutines
             )
     endif()
@@ -54,4 +68,4 @@
 
 set(IX_SRC_DIR ${RXCPP_DIR}/Ix/CPP/src)
 set(RX_SRC_DIR ${RXCPP_DIR}/Rx/v2/src)
-set(RX_CATCH_DIR ${RXCPP_DIR}/ext/catch/include)
+set(RX_CATCH_DIR ${RXCPP_DIR}/ext/catch/single_include/catch2)
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 6620a7a..1c1ba4c 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -84,6 +84,7 @@
         ${DOXY_EXAMPLES_SRC_DIR}/publish.cpp
         ${DOXY_EXAMPLES_SRC_DIR}/range.cpp
         ${DOXY_EXAMPLES_SRC_DIR}/reduce.cpp
+        ${DOXY_EXAMPLES_SRC_DIR}/ref_count.cpp
         ${DOXY_EXAMPLES_SRC_DIR}/repeat.cpp
         ${DOXY_EXAMPLES_SRC_DIR}/replay.cpp
         ${DOXY_EXAMPLES_SRC_DIR}/retry.cpp