blob: 5145e924da524534d87934c53f20fd8296757056 [file] [log] [blame]
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
#if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
#define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP
#include "../rx-includes.hpp"
namespace rxcpp {
namespace schedulers {
typedef std::function<std::thread(std::function<void()>)> thread_factory;
struct new_thread : public scheduler_interface
{
private:
typedef new_thread this_type;
new_thread(const this_type&);
struct new_worker : public worker_interface
{
private:
typedef new_worker this_type;
typedef detail::action_queue queue_type;
new_worker(const this_type&);
struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
{
typedef detail::schedulable_queue<
typename clock_type::time_point> queue_item_time;
typedef queue_item_time::item_type item_type;
virtual ~new_worker_state()
{
}
explicit new_worker_state(composite_subscription cs)
: lifetime(cs)
{
}
composite_subscription lifetime;
mutable std::mutex lock;
mutable std::condition_variable wake;
mutable queue_item_time q;
std::thread worker;
recursion r;
};
std::shared_ptr<new_worker_state> state;
public:
virtual ~new_worker()
{
}
explicit new_worker(std::shared_ptr<new_worker_state> ws)
: state(ws)
{
}
new_worker(composite_subscription cs, thread_factory& tf)
: state(std::make_shared<new_worker_state>(cs))
{
auto keepAlive = state;
state->lifetime.add([keepAlive](){
std::unique_lock<std::mutex> guard(keepAlive->lock);
auto expired = std::move(keepAlive->q);
keepAlive->q = new_worker_state::queue_item_time{};
if (!keepAlive->q.empty()) std::terminate();
keepAlive->wake.notify_one();
if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
guard.unlock();
keepAlive->worker.join();
}
else {
keepAlive->worker.detach();
}
});
state->worker = tf([keepAlive](){
// take ownership
queue_type::ensure(std::make_shared<new_worker>(keepAlive));
// release ownership
RXCPP_UNWIND_AUTO([]{
queue_type::destroy();
});
for(;;) {
std::unique_lock<std::mutex> guard(keepAlive->lock);
if (keepAlive->q.empty()) {
keepAlive->wake.wait(guard, [keepAlive](){
return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
});
}
if (!keepAlive->lifetime.is_subscribed()) {
break;
}
auto& peek = keepAlive->q.top();
if (!peek.what.is_subscribed()) {
keepAlive->q.pop();
continue;
}
if (clock_type::now() < peek.when) {
keepAlive->wake.wait_until(guard, peek.when);
continue;
}
auto what = peek.what;
keepAlive->q.pop();
keepAlive->r.reset(keepAlive->q.empty());
guard.unlock();
what(keepAlive->r.get_recurse());
}
});
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual void schedule(const schedulable& scbl) const {
schedule(now(), scbl);
}
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
if (scbl.is_subscribed()) {
std::unique_lock<std::mutex> guard(state->lock);
state->q.push(new_worker_state::item_type(when, scbl));
state->r.reset(false);
}
state->wake.notify_one();
}
};
mutable thread_factory factory;
public:
new_thread()
: factory([](std::function<void()> start){
return std::thread(std::move(start));
})
{
}
explicit new_thread(thread_factory tf)
: factory(tf)
{
}
virtual ~new_thread()
{
}
virtual clock_type::time_point now() const {
return clock_type::now();
}
virtual worker create_worker(composite_subscription cs) const {
return worker(cs, std::make_shared<new_worker>(cs, factory));
}
};
inline scheduler make_new_thread() {
static scheduler instance = make_scheduler<new_thread>();
return instance;
}
inline scheduler make_new_thread(thread_factory tf) {
return make_scheduler<new_thread>(tf);
}
}
}
#endif