Merge remote-tracking branch 'upstream-master' into master

Change-Id: I50eb4a662df62db5d494601c99db09ceb78603fa
diff --git a/Ix/CPP/src/cpplinq/linq.hpp b/Ix/CPP/src/cpplinq/linq.hpp
index be77151..6552f79 100644
--- a/Ix/CPP/src/cpplinq/linq.hpp
+++ b/Ix/CPP/src/cpplinq/linq.hpp
@@ -471,22 +471,22 @@
 
     // TODO: skip_while(pred)
 
-    template<typename ITEM = typename element_type>
+    template<typename ITEM = element_type>
     typename std::enable_if<std::is_default_constructible<ITEM>::value, ITEM>::type sum() const {
         ITEM seed{};
         return sum(seed);
     }
 
-    typename element_type sum(typename element_type seed) const {
+    element_type sum(element_type seed) const {
         return std::accumulate(begin(), end(), seed);
     }
 
-    template <typename Selector, typename Result = std::result_of<Selector(typename element_type)>::type>
+    template <typename Selector, typename Result = typename std::result_of<Selector(element_type)>::type>
     typename std::enable_if<std::is_default_constructible<Result>::value, Result>::type sum(Selector sel) const {
         return from(begin(), end()).select(sel).sum();			
     }
 
-    template <typename Selector, typename Result = std::result_of<Selector(typename element_type)>::type>
+    template <typename Selector, typename Result = typename std::result_of<Selector(element_type)>::type>
     Result sum(Selector sel, Result seed) const {
         return from(begin(), end()).select(sel).sum(seed);			
     }
diff --git a/METADATA b/METADATA
index 7823817..2448caf 100644
--- a/METADATA
+++ b/METADATA
@@ -12,7 +12,7 @@
     type: GIT
     value: "https://github.com/Reactive-Extensions/RxCpp.git"
   }
-  version: "b84db4278e54e722fbbae794f573d1142261e9a3"
-  last_upgrade_date { year: 2018 month: 1 day: 30 }
+  version: "a7d5856385f126e874db6010d9dbfd37290c61de"
+  last_upgrade_date { year: 2018 month: 8 day: 10 }
   license_type: NOTICE
 }
diff --git a/README.md b/README.md
index 8106c4a..330ca9c 100644
--- a/README.md
+++ b/README.md
@@ -3,17 +3,21 @@
 Platform    | Status | 
 ----------- | :------------ |
 Windows | [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
-Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp)
+Linux & OSX | [![Linux & Osx Status](http://img.shields.io/travis/ReactiveX/RxCpp.svg?style=flat-square)](https://travis-ci.org/ReactiveX/RxCpp)
 
 Source        | Badges |
 ------------- | :--------------- |
-Github | [![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) <br/> [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases) <br/> [![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v4.0.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
-Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
+Github | [![GitHub license](https://img.shields.io/github/license/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp) <br/> [![GitHub release](https://img.shields.io/github/release/ReactiveX/RxCpp.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp/releases) <br/> [![GitHub commits](https://img.shields.io/github/commits-since/ReactiveX/RxCpp/4.1.0.svg?style=flat-square)](https://github.com/ReactiveX/RxCpp)
+Gitter.im | [![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/ReactiveX/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
 Packages | [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![vcpkg port](https://img.shields.io/badge/vcpkg-port-blue.svg?style=flat-square)](https://github.com/Microsoft/vcpkg/tree/master/ports/rxcpp)
-Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) <br/> [![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/)
+Documentation | [![rxcpp doxygen documentation](https://img.shields.io/badge/rxcpp-latest-brightgreen.svg?style=flat-square)](http://reactivex.github.io/RxCpp) <br/> [![reactivex intro](https://img.shields.io/badge/reactivex.io-intro-brightgreen.svg?style=flat-square)](http://reactivex.io/intro.html) [![rx marble diagrams](https://img.shields.io/badge/rxmarbles-diagrams-brightgreen.svg?style=flat-square)](http://rxmarbles.com/)
+
+# Usage
+
+__RxCpp__ is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the [Catch](https://github.com/philsquared/Catch) library.
 
 # Example
-Add ```Rx/v2/src``` to the include paths
+Add `Rx/v2/src` to the include paths
 
 [![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)
 
@@ -124,7 +128,7 @@
 ### Other language implementations
 
 * Java: [RxJava](https://github.com/ReactiveX/RxJava)
-* JavaScript: [RxJS](https://github.com/Reactive-Extensions/RxJS)
+* JavaScript: [rxjs](https://github.com/ReactiveX/rxjs)
 * C#: [Rx.NET](https://github.com/Reactive-Extensions/Rx.NET)
 * [More..](http://reactivex.io/languages.html)
 
@@ -144,11 +148,11 @@
 RxCpp uses a git submodule (in `ext/catch`) for the excellent [Catch](https://github.com/philsquared/Catch) library. The easiest way to ensure that the submodules are included in the clone is to add `--recursive` in the clone command.
 
 ```shell
-git clone --recursive https://github.com/Reactive-Extensions/RxCpp.git
+git clone --recursive https://github.com/ReactiveX/RxCpp.git
 cd RxCpp
 ```
 
-# Building RxCpp
+# Building RxCpp Unit Tests
 
 * RxCpp is regularly tested on OSX and Windows.
 * RxCpp is regularly built with Clang, Gcc and VC
@@ -220,7 +224,7 @@
 
 # Documentation
 
-RxCpp uses Doxygen to generate project [documentation](http://reactive-extensions.github.io/RxCpp).
+RxCpp uses Doxygen to generate project [documentation](http://reactivex.github.io/RxCpp).
 
 When Doxygen+Graphviz is installed, CMake creates a special build task named `doc`. It creates actual documentation and puts it to `projects/doxygen/html/` folder, which can be published to the `gh-pages` branch. Each merged pull request will build the docs and publish them.
 
@@ -230,7 +234,5 @@
 
 Before submitting a feature or substantial code contribution please  discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source.
 
-You will be prompted to submit a Contributor License Agreement form after submitting your pull request. This needs to only be done once for any Microsoft OSS project. Fill in the [Contributor License Agreement](https://cla2.msopentech.com/) (CLA).
-
 # Microsoft Open Source Code of Conduct
 This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. 
diff --git a/Rx/v2/examples/doxygen/buffer.cpp b/Rx/v2/examples/doxygen/buffer.cpp
index e88c2ed..58f023a 100644
--- a/Rx/v2/examples/doxygen/buffer.cpp
+++ b/Rx/v2/examples/doxygen/buffer.cpp
@@ -162,7 +162,6 @@
 
 SCENARIO("buffer period+count+coordination sample"){
     printf("//! [buffer period+count+coordination sample]\n");
-    auto start = std::chrono::steady_clock::now();
     auto int1 = rxcpp::observable<>::range(1L, 3L);
     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
     auto values = int1.
@@ -171,7 +170,7 @@
     values.
         as_blocking().
         subscribe(
-            [start](std::vector<long> v){
+            [](std::vector<long> v){
                 printf("OnNext:");
                 std::for_each(v.begin(), v.end(), [](long a){
                     printf(" %ld", a);
@@ -184,7 +183,6 @@
 
 SCENARIO("buffer period+count sample"){
     printf("//! [buffer period+count sample]\n");
-    auto start = std::chrono::steady_clock::now();
     auto int1 = rxcpp::observable<>::range(1L, 3L);
     auto int2 = rxcpp::observable<>::timer(std::chrono::milliseconds(50));
     auto values = int1.
@@ -192,7 +190,7 @@
         buffer_with_time_or_count(std::chrono::milliseconds(20), 2);
     values.
         subscribe(
-            [start](std::vector<long> v){
+            [](std::vector<long> v){
                 printf("OnNext:");
                 std::for_each(v.begin(), v.end(), [](long a){
                     printf(" %ld", a);
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index f702fba..d1c4ea4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -9,6 +9,7 @@
     \tparam KeySelector      the type of the key extracting function
     \tparam MarbleSelector   the type of the element extracting function
     \tparam BinaryPredicate  the type of the key comparing function
+    \tparam DurationSelector the type of the duration observable function
 
     \param  ks  a function that extracts the key for each item (optional)
     \param  ms  a function that extracts the return element for each item (optional)
@@ -63,7 +64,7 @@
     static const bool value = !std::is_same<type, tag_not_valid>::value;
 };
 
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
 struct group_by_traits
 {
     typedef T source_value_type;
@@ -71,6 +72,7 @@
     typedef rxu::decay_t<KeySelector> key_selector_type;
     typedef rxu::decay_t<MarbleSelector> marble_selector_type;
     typedef rxu::decay_t<BinaryPredicate> predicate_type;
+    typedef rxu::decay_t<DurationSelector> duration_selector_type;
 
     static_assert(is_group_by_selector_for<source_value_type, key_selector_type>::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)");
 
@@ -87,14 +89,15 @@
     typedef grouped_observable<key_type, marble_type> grouped_observable_type;
 };
 
-template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class T, class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
 struct group_by
 {
-    typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
+    typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
     typedef typename traits_type::key_selector_type key_selector_type;
     typedef typename traits_type::marble_selector_type marble_selector_type;
     typedef typename traits_type::marble_type marble_type;
     typedef typename traits_type::predicate_type predicate_type;
+    typedef typename traits_type::duration_selector_type duration_selector_type;
     typedef typename traits_type::subject_type subject_type;
     typedef typename traits_type::key_type key_type;
 
@@ -130,21 +133,23 @@
 
     struct group_by_values
     {
-        group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p)
+        group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
             : keySelector(std::move(ks))
             , marbleSelector(std::move(ms))
             , predicate(std::move(p))
+            , durationSelector(std::move(ds))
         {
         }
         mutable key_selector_type keySelector;
         mutable marble_selector_type marbleSelector;
         mutable predicate_type predicate;
+        mutable duration_selector_type durationSelector;
     };
 
     group_by_values initial;
 
-    group_by(key_selector_type ks, marble_selector_type ms, predicate_type p)
-        : initial(std::move(ks), std::move(ms), std::move(p))
+    group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
+        : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds))
     {
     }
 
@@ -206,7 +211,35 @@
                 }
                 auto sub = subject_type();
                 g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first;
-                dest.on_next(make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get())));
+                auto obs = make_dynamic_grouped_observable<key_type, marble_type>(group_by_observable(state, sub, selectedKey.get()));
+                auto durationObs = on_exception(
+                    [&](){
+                        return this->durationSelector(obs);},
+                    [this](rxu::error_ptr e){on_error(e);});
+                if (durationObs.empty()) {
+                    return;
+                }
+
+                dest.on_next(obs);
+                composite_subscription duration_sub;
+                auto ssub = state->source_lifetime.add(duration_sub);
+
+                auto expire_state = state;
+                auto expire_dest = g->second;
+                auto expire = [=]() {
+                    auto g = expire_state->groups.find(selectedKey.get());
+                    if (g != expire_state->groups.end()) {
+                        expire_state->groups.erase(g);
+                        expire_dest.on_completed();
+                    }
+                    expire_state->source_lifetime.remove(ssub);
+                };
+                auto robs = durationObs.get().take(1);
+                duration_sub.add(robs.subscribe(
+                    [](const typename decltype(robs)::value_type &){},
+                    [=](rxu::error_ptr) {expire();},
+                    [=](){expire();}
+                ));
             }
             auto selectedMarble = on_exception(
                 [&](){
@@ -243,33 +276,36 @@
     }
 };
 
-template<class KeySelector, class MarbleSelector, class BinaryPredicate>
+template<class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector>
 class group_by_factory
 {
     typedef rxu::decay_t<KeySelector> key_selector_type;
     typedef rxu::decay_t<MarbleSelector> marble_selector_type;
     typedef rxu::decay_t<BinaryPredicate> predicate_type;
+    typedef rxu::decay_t<DurationSelector> duration_selector_type;
     key_selector_type keySelector;
     marble_selector_type marbleSelector;
     predicate_type predicate;
+    duration_selector_type durationSelector;
 public:
-    group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p)
+    group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds)
         : keySelector(std::move(ks))
         , marbleSelector(std::move(ms))
         , predicate(std::move(p))
+        , durationSelector(std::move(ds))
     {
     }
     template<class Observable>
     struct group_by_factory_traits
     {
         typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
-        typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
-        typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
+        typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> traits_type;
+        typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector> group_by_type;
     };
     template<class Observable>
     auto operator()(Observable&& source)
-        -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)))) {
-        return      source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate)));
+        -> decltype(source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) {
+        return      source.template lift<typename group_by_factory_traits<Observable>::traits_type::grouped_observable_type>(typename group_by_factory_traits<Observable>::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)));
     }
 };
 
@@ -288,61 +324,75 @@
 template<> 
 struct member_overload<group_by_tag>
 {
-    template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, 
+    template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate, class DurationSelector,
         class SourceValue = rxu::value_type_t<Observable>,
-        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
-        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
         class Value = typename Traits::grouped_observable_type>
-    static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
-        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)))) {
-        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p)));
+    static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds)
+        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)))) {
+        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), std::forward<DurationSelector>(ds)));
     }
 
-    template<class Observable, class KeySelector, class MarbleSelector, 
-        class BinaryPredicate=rxu::less, 
+    template<class Observable, class KeySelector, class MarbleSelector, class BinaryPredicate,
+        class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
         class SourceValue = rxu::value_type_t<Observable>,
-        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
-        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
+        class Value = typename Traits::grouped_observable_type>
+    static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p)
+        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), std::forward<BinaryPredicate>(p), rxu::ret<observable<int, rxs::detail::never<int>>>()));
+    }
+
+    template<class Observable, class KeySelector, class MarbleSelector,
+        class BinaryPredicate=rxu::less, 
+        class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
+        class SourceValue = rxu::value_type_t<Observable>,
+        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
         class Value = typename Traits::grouped_observable_type>
     static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms)
-        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()))) {
-        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less()));
+        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), std::forward<MarbleSelector>(ms), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
     }
 
 
     template<class Observable, class KeySelector, 
         class MarbleSelector=rxu::detail::take_at<0>, 
         class BinaryPredicate=rxu::less, 
+        class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
         class SourceValue = rxu::value_type_t<Observable>,
-        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
-        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
         class Value = typename Traits::grouped_observable_type>
     static auto member(Observable&& o, KeySelector&& ks)
-        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()))) {
-        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less()));
+        -> decltype(o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+        return      o.template lift<Value>(GroupBy(std::forward<KeySelector>(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
     }
 
     template<class Observable, 
         class KeySelector=rxu::detail::take_at<0>, 
         class MarbleSelector=rxu::detail::take_at<0>, 
         class BinaryPredicate=rxu::less, 
+        class DurationSelector=rxu::ret<observable<int, rxs::detail::never<int>>>,
         class Enabled = rxu::enable_if_all_true_type_t<
             all_observables<Observable>>,
         class SourceValue = rxu::value_type_t<Observable>,
-        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate>,
-        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>>,
+        class Traits = rxo::detail::group_by_traits<SourceValue, rxu::decay_t<Observable>, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>,
+        class GroupBy = rxo::detail::group_by<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<KeySelector>, rxu::decay_t<MarbleSelector>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<DurationSelector>>,
         class Value = typename Traits::grouped_observable_type>
     static auto member(Observable&& o)
-        -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()))) {
-        return      o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less()));
+        -> decltype(o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()))) {
+        return      o.template lift<Value>(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret<observable<int, rxs::detail::never<int>>>()));
     }
 
     template<class... AN>
     static operators::detail::group_by_invalid_t<AN...> member(const AN&...) {
         std::terminate();
         return {};
-        static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool");
-    } 
+        static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable");
+    }
 
 };
 
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index c1d59a9..b50b773 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -317,8 +317,7 @@
 };
 
 inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
-    static observe_on_one_worker r(rxsc::make_run_loop(rl));
-    return r;
+    return observe_on_one_worker(rxsc::make_run_loop(rl));
 }
 
 inline observe_on_one_worker observe_on_event_loop() {
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 3bbb448..97bcabd 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -173,28 +173,9 @@
         -> void {
         std::mutex lock;
         std::condition_variable wake;
+        bool disposed = false;
         rxu::error_ptr error;
 
-        struct tracking
-        {
-            ~tracking()
-            {
-                if (!disposed || !wakened) std::terminate();
-            }
-            tracking()
-            {
-                disposed = false;
-                wakened = false;
-                false_wakes = 0;
-                true_wakes = 0;
-            }
-            std::atomic_bool disposed;
-            std::atomic_bool wakened;
-            std::atomic_int false_wakes;
-            std::atomic_int true_wakes;
-        };
-        auto track = std::make_shared<tracking>();
-
         auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
 
         // keep any error to rethrow at the end.
@@ -213,31 +194,19 @@
 
         auto cs = scbr.get_subscription();
         cs.add(
-            [&, track](){
-                // OSX geting invalid x86 op if notify_one is after the disposed = true
-                // presumably because the condition_variable may already have been awakened
-                // and is now sitting in a while loop on disposed
+            [&](){
+                std::unique_lock<std::mutex> guard(lock);
                 wake.notify_one();
-                track->disposed = true;
+                disposed = true;
             });
 
-        std::unique_lock<std::mutex> guard(lock);
         source.subscribe(std::move(scbr));
 
+        std::unique_lock<std::mutex> guard(lock);
         wake.wait(guard,
-            [&, track](){
-                // this is really not good.
-                // false wakeups were never followed by true wakeups so..
-
-                // anyways this gets triggered before disposed is set now so wait.
-                while (!track->disposed) {
-                    ++track->false_wakes;
-                }
-                ++track->true_wakes;
-                return true;
+            [&](){
+                return disposed;
             });
-        track->wakened = true;
-        if (!track->disposed || !track->wakened) std::terminate();
 
         if (error) {rxu::rethrow_exception(error);}
     }
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
index 0f239be..fc68979 100644
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ b/Rx/v2/src/rxcpp/rx-scheduler.hpp
@@ -458,12 +458,19 @@
         public:
             ~exit_recursed_scope_type()
             {
+                if (that != nullptr) {
                     that->requestor = nullptr;
+                }
             }
             exit_recursed_scope_type(const recursed_scope_type* that)
                 : that(that)
             {
             }
+            exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
+                : that(other.that)
+            {
+                other.that = nullptr;
+            }
         };
     public:
         recursed_scope_type()
diff --git a/Rx/v2/src/rxcpp/rx-subscription.hpp b/Rx/v2/src/rxcpp/rx-subscription.hpp
index 9c00469..ee4e53e 100644
--- a/Rx/v2/src/rxcpp/rx-subscription.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscription.hpp
@@ -379,7 +379,7 @@
 
     composite_subscription()
         : inner_type()
-        , subscription(*static_cast<const inner_type* const>(this))
+        , subscription(*static_cast<const inner_type*>(this))
     {
     }
 
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index cd5f39b..9ce455f 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -428,6 +428,15 @@
         { return std::forward<LHS>(lhs) < std::forward<RHS>(rhs); }
 };
 
+template <class T>
+struct ret
+{
+    template <class LHS>
+    auto operator()(LHS&& ) const
+        -> decltype(T())
+        { return T(); }
+};
+
 template<class T = void>
 struct equal_to
 {
diff --git a/Rx/v2/test/operators/merge_delay_error.cpp b/Rx/v2/test/operators/merge_delay_error.cpp
index 7c7a58d..b53b884 100644
--- a/Rx/v2/test/operators/merge_delay_error.cpp
+++ b/Rx/v2/test/operators/merge_delay_error.cpp
@@ -7,7 +7,7 @@
 
 //merge_delay_error must work the very same way as `merge()` except the error handling
 
-SCENARIO("merge delay error completes", "[merge][join][operators]"){
+SCENARIO("merge_delay_error completes", "[merge][join][operators]"){
     GIVEN("1 hot observable with 3 cold observables of ints."){
         auto sc = rxsc::make_test();
         auto w = sc.create_worker();
@@ -117,7 +117,7 @@
     }
 }
 
-SCENARIO("variadic merge delay error completes with error", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with error", "[merge][join][operators]"){
     GIVEN("1 hot observable with 3 cold observables of ints."){
         auto sc = rxsc::make_test();
         auto w = sc.create_worker();
@@ -211,7 +211,7 @@
     }
 }
 
-SCENARIO("variadic merge delay error completes with 2 errors", "[merge][join][operators]"){
+SCENARIO("variadic merge_delay_error completes with 2 errors", "[merge][join][operators]"){
     GIVEN("1 hot observable with 3 cold observables of ints."){
         auto sc = rxsc::make_test();
         auto w = sc.create_worker();
diff --git a/Rx/v2/test/operators/observe_on.cpp b/Rx/v2/test/operators/observe_on.cpp
index 644ab93..ffa85aa 100644
--- a/Rx/v2/test/operators/observe_on.cpp
+++ b/Rx/v2/test/operators/observe_on.cpp
@@ -1,5 +1,6 @@
 #include "../test.h"
 #include <rxcpp/operators/rx-take.hpp>
+#include <rxcpp/operators/rx-map.hpp>
 #include <rxcpp/operators/rx-observe_on.hpp>
 
 const int static_onnextcalls = 100000;
@@ -136,4 +137,57 @@
 
         }
     }
-}
\ No newline at end of file
+}
+
+class nocompare {
+public:
+    int v;
+};
+
+SCENARIO("observe_on no-comparison", "[observe][observe_on]"){
+    GIVEN("a source"){
+        auto sc = rxsc::make_test();
+        auto so = rx::observe_on_one_worker(sc);
+        auto w = sc.create_worker();
+        const rxsc::test::messages<nocompare> in;
+        const rxsc::test::messages<int> out;
+
+        auto xs = sc.make_hot_observable({
+            in.next(150, nocompare{1}),
+            in.next(210, nocompare{2}),
+            in.next(240, nocompare{3}),
+            in.completed(300)
+        });
+
+        WHEN("observe_on is specified"){
+
+            auto res = w.start(
+                [so, xs]() {
+                    return xs
+                         | rxo::observe_on(so)
+                         | rxo::map([](nocompare v){ return v.v; })
+                         | rxo::as_dynamic();
+                }
+            );
+
+            THEN("the output contains items sent while subscribed"){
+                auto required = rxu::to_vector({
+                    out.next(211, 2),
+                    out.next(241, 3),
+                    out.completed(301)
+                });
+                auto actual = res.get_observer().messages();
+                REQUIRE(required == actual);
+            }
+
+            THEN("there was 1 subscription/unsubscription to the source"){
+                auto required = rxu::to_vector({
+                    out.subscribe(200, 300)
+                });
+                auto actual = xs.subscriptions();
+                REQUIRE(required == actual);
+            }
+
+        }
+    }
+}