| // Copyright (C) 2020 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| #include "sync_coro.h" |
| |
| #include <array> |
| #include <boost/coroutine2/all.hpp> |
| #include <thread> |
| #include <utility> |
| |
| #include "pyerr.h" |
| #include "pyerrfmt.h" |
| #include "pylog.h" |
| #include "pyobj.h" |
| #include "pyparsetuple.h" |
| #include "pythread.h" |
| #include "simple_variant.h" |
| |
| // Facility for exposing synchronous code via the coroutine API. |
| |
| // Why not greenlets? Because we want a coroutine model, not a tree |
| // model. We use separately-allocated coroutine stacks, while |
| // greenlets uses the C stack and copies the lower stack segment to a |
| // heap object on switch. |
| |
| // TODO(dancol): remove this pile of hacks when we switch to co_await |
| // in C++ and we can get rid of the whole sync_coro thing. |
| |
| #if PY_VERSION_HEX >= 0x03090000 |
| # error "Port me to Python 3.9" |
| #elif PY_VERSION_HEX >= 0x030700A3 |
| # define TS_HAS_EXC_INFO 1 |
| #else |
| # define TS_HAS_EXC_INFO 0 |
| #endif |
| |
| namespace dctv { |
| |
| struct SynchronousCoroutine; |
| |
| struct SynchronousYieldFunction final : BasePyObject { |
| explicit SynchronousYieldFunction(SynchronousCoroutine* sync_coro); |
| unique_pyref call(pyref args, pyref kwargs); |
| void invalidate() noexcept; |
| static PyTypeObject pytype; |
| private: |
| SynchronousCoroutine* sync_coro; |
| }; |
| |
| struct SynchronousCoroutine final : BasePyObject, |
| HasPyCtor, |
| SupportsWeakRefs { |
| explicit SynchronousCoroutine(pyref args, pyref kwargs={}); |
| ~SynchronousCoroutine() noexcept; |
| unique_pyref send(pyref obj); |
| unique_pyref next(); |
| unique_pyref throw_(pyref type, pyref value={}, pyref traceback={}); |
| void close() noexcept; |
| |
| unique_pyref sync_yield(unique_pyref value); |
| |
| static PyTypeObject pytype; |
| private: |
| // DummyAdlHack works around |
| // https://github.com/boostorg/coroutine2/issues/26 by preferring |
| // stdlib's tuple apply(). |
| using DummyAdlHack = std::array<int, 0>; |
| using StackSwitcher = boost::coroutines2::coroutine<DummyAdlHack>; |
| struct State; |
| |
| struct ThreadCoroutine { |
| inline ThreadCoroutine() noexcept = default; |
| inline ThreadCoroutine(ThreadCoroutine&& other) noexcept; |
| inline ThreadCoroutine& operator=(ThreadCoroutine&& other) noexcept; |
| inline void start_thread(std::thread&& thread) noexcept; |
| inline void join() noexcept; |
| inline void notify_before_exit() noexcept; |
| void set_state(State* state_slot, State&& new_state) noexcept; |
| template<typename Predicate> |
| void set_state_and_wait(State* state_slot, |
| State&& new_state, |
| Predicate&& predicate) noexcept; |
| inline std::thread::id get_coroutine_tid() const noexcept; |
| private: |
| Mutex mutex; |
| Condition condition; |
| std::thread thread; |
| }; |
| |
| struct StackCoroutine { |
| explicit inline StackCoroutine( |
| StackSwitcher::push_type&& pusher) noexcept; |
| inline void set_puller(StackSwitcher::pull_type* puller) noexcept; |
| inline void join() noexcept; |
| inline void notify_before_exit() noexcept; |
| void set_state(State* state_slot, State&& new_state) noexcept; |
| template<typename Predicate> |
| void set_state_and_wait(State* state_slot, |
| State&& new_state, |
| Predicate&& predicate) noexcept; |
| inline std::thread::id get_coroutine_tid() const noexcept; |
| private: |
| StackSwitcher::push_type pusher; |
| StackSwitcher::pull_type* puller = nullptr; |
| std::thread::id coroutine_tid = std::thread::id(); |
| }; |
| |
| using Coroutine = SimpleVariant<ThreadCoroutine, StackCoroutine>; |
| |
| using ValueOrException = SimpleVariant<unique_pyref, PyExceptionInfo>; |
| |
| struct BaseState { |
| VARIANT_VIRTUAL inline bool is_running() const noexcept |
| { return false; } |
| VARIANT_VIRTUAL inline bool safe_to_close() const noexcept |
| { return false; } |
| VARIANT_VIRTUAL inline bool can_complete_sync_yield() const noexcept |
| { return false; } |
| VARIANT_VIRTUAL inline State send( |
| SynchronousCoroutine* /*sync_coro*/, |
| ValueOrException /*value_or_exception*/) // NOLINT |
| { assume(false); abort(); } |
| VARIANT_VIRTUAL inline unique_pyref sync_yield( |
| SynchronousCoroutine* /*sync_coro*/, |
| unique_pyref /*yield_value*/) |
| { assume(false); abort(); } |
| VARIANT_VIRTUAL inline unique_pyref iter_next( |
| SynchronousCoroutine* /*sync_coro*/) |
| { assume(false); abort(); } |
| VARIANT_VIRTUAL inline unique_pyref complete_sync_yield( |
| SynchronousCoroutine* /*sync_coro*/) |
| { assume(false); abort(); } |
| }; |
| |
| struct TerminalState : BaseState { |
| inline State send( |
| SynchronousCoroutine* /*sync_coro*/, |
| ValueOrException /*value_or_exception*/) VARIANT_OVERRIDE |
| { throw_pyerr_msg(PyExc_RuntimeError, "coroutine terminated"); } |
| inline bool safe_to_close() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| }; |
| |
| struct States { |
| struct Initial final : BaseState { |
| inline Initial(pyref args, pyref kwargs); |
| inline State send(SynchronousCoroutine* sync_coro, |
| ValueOrException value_or_exception) |
| VARIANT_OVERRIDE; |
| inline bool safe_to_close() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| private: |
| unique_pyref function; |
| bool on_stack; |
| }; |
| |
| struct WaitingForInjection final : BaseState { |
| inline State send(SynchronousCoroutine* sync_coro, |
| ValueOrException value_or_exception) |
| VARIANT_OVERRIDE; |
| }; |
| |
| struct HaveInjectedValue final : BaseState { |
| explicit inline HaveInjectedValue(unique_pyref value) |
| : value(std::move(value)) |
| {} |
| inline bool is_running() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| inline bool can_complete_sync_yield() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| inline unique_pyref complete_sync_yield(SynchronousCoroutine* sync_coro) |
| VARIANT_OVERRIDE; |
| private: |
| unique_pyref value; |
| }; |
| |
| struct HaveInjectedException final : BaseState { |
| explicit inline HaveInjectedException(PyExceptionInfo exception) |
| : exception(std::move(exception)) |
| {} |
| bool is_running() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| bool can_complete_sync_yield() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| unique_pyref complete_sync_yield(SynchronousCoroutine* sync_coro) |
| VARIANT_OVERRIDE; |
| private: |
| PyExceptionInfo exception; |
| }; |
| |
| struct Running final : BaseState { |
| inline bool is_running() const noexcept VARIANT_OVERRIDE |
| { return true; } |
| inline unique_pyref sync_yield( |
| SynchronousCoroutine* sync_coro, |
| unique_pyref yield_value) VARIANT_OVERRIDE; |
| }; |
| |
| struct YieldingValue final : BaseState { |
| explicit YieldingValue(unique_pyref yield_value) |
| : yield_value(std::move(yield_value)) |
| {} |
| inline unique_pyref iter_next( |
| SynchronousCoroutine* sync_coro) VARIANT_OVERRIDE; |
| private: |
| unique_pyref yield_value; |
| }; |
| |
| struct ExitedWithException final : TerminalState { |
| explicit ExitedWithException(PyExceptionInfo exit_exception) |
| : exit_exception(std::move(exit_exception)) |
| {} |
| inline unique_pyref iter_next( |
| SynchronousCoroutine* sync_coro) VARIANT_OVERRIDE; |
| private: |
| PyExceptionInfo exit_exception; |
| }; |
| |
| struct ExitedWithValue final : TerminalState { |
| explicit ExitedWithValue(unique_pyref exit_value) |
| : exit_value(std::move(exit_value).notnull()) |
| {} |
| inline unique_pyref iter_next( |
| SynchronousCoroutine* sync_coro) VARIANT_OVERRIDE; |
| private: |
| unique_pyref exit_value; |
| }; |
| }; |
| |
| // Numbers in comments are index numbers useful for inspecting the |
| // variant state in a debugger. |
| using StateVariant = SimpleVariant< |
| States::Initial /* 0 */, |
| States::WaitingForInjection /* 1 */, |
| States::HaveInjectedValue /* 2 */, |
| States::HaveInjectedException /* 3 */, |
| States::Running /* 4 */, |
| States::YieldingValue /* 5 */, |
| States::ExitedWithException /* 6 */, |
| States::ExitedWithValue /* 7 */>; |
| |
| struct State : private StateVariant { |
| using StateVariant::StateVariant; |
| using variant_base = BaseState; |
| |
| VARIANT_FORWARD(is_running); |
| VARIANT_FORWARD(safe_to_close); |
| VARIANT_FORWARD(can_complete_sync_yield); |
| VARIANT_FORWARD(send); |
| VARIANT_FORWARD(sync_yield); |
| VARIANT_FORWARD(iter_next); |
| VARIANT_FORWARD(complete_sync_yield); |
| }; |
| |
| unique_pyref do_send(ValueOrException value_or_exception); |
| unique_pyref py_throw(pyref args); |
| unique_pyref py_close() noexcept; |
| void run_thread(unique_pyref&& function) noexcept; |
| void set_state(State&& new_state) noexcept; |
| State state; |
| Coroutine coroutine; |
| static PyMethodDef pymethods[]; |
| static PyAsyncMethods pyasync; |
| }; |
| |
| SynchronousYieldFunction::SynchronousYieldFunction( |
| SynchronousCoroutine* sync_coro) |
| : sync_coro(sync_coro) |
| {} |
| |
| unique_pyref |
| SynchronousYieldFunction::call(pyref args, pyref kwargs) |
| { |
| PARSEPYARGS( |
| (pyref, value) |
| )(args, kwargs); |
| if (!this->sync_coro) |
| throw_pyerr_msg(PyExc_RuntimeError, |
| "stale synchronous coroutine yielder"); |
| return this->sync_coro->sync_yield(value.addref()); |
| } |
| |
| void |
| SynchronousYieldFunction::invalidate() noexcept |
| { |
| this->sync_coro = nullptr; |
| } |
| |
| SynchronousCoroutine::ThreadCoroutine& |
| SynchronousCoroutine::ThreadCoroutine::operator=( |
| ThreadCoroutine&& other) noexcept |
| { |
| if (this != &other) { |
| this->thread = std::move(other.thread); |
| } |
| return *this; |
| } |
| |
| void |
| SynchronousCoroutine::ThreadCoroutine::start_thread( |
| std::thread&& thread) noexcept |
| { |
| assert(!this->thread.joinable()); |
| this->thread = std::move(thread); |
| } |
| |
| |
| SynchronousCoroutine::ThreadCoroutine::ThreadCoroutine( |
| ThreadCoroutine&& other) noexcept |
| : thread(std::move(other.thread)) |
| { |
| // We can't move mutexes and conditions, so just build them anew. |
| // That's okay, because we move only during initialization. |
| } |
| |
| void |
| SynchronousCoroutine::ThreadCoroutine::join() noexcept |
| { |
| this->thread.join(); |
| } |
| |
| void |
| SynchronousCoroutine::ThreadCoroutine::notify_before_exit() noexcept |
| { |
| this->condition.notify_one(); |
| } |
| |
| void |
| SynchronousCoroutine::ThreadCoroutine::set_state( |
| State* state_slot, |
| State&& new_state) noexcept |
| { |
| UniqueLock lock(this->mutex); |
| *state_slot = std::move(new_state); |
| } |
| |
| template<typename Predicate> |
| void |
| SynchronousCoroutine::ThreadCoroutine::set_state_and_wait( |
| State* state_slot, |
| State&& new_state, |
| Predicate&& predicate) noexcept |
| { |
| // Reduce ping-ponging by keeping the GIL released across replacing |
| // the state and waiting for the new state. |
| using std::swap; |
| assert_gil_held(); |
| UniqueLock lock(this->mutex); |
| with_gil_released([&] { |
| swap(*state_slot, new_state); |
| this->condition.notify_one(); |
| this->condition.wait_for( |
| lock, std::forward<Predicate>(predicate)); |
| }); |
| } |
| |
| std::thread::id |
| SynchronousCoroutine::ThreadCoroutine::get_coroutine_tid() const noexcept |
| { |
| return this->thread.get_id(); |
| } |
| |
| SynchronousCoroutine::StackCoroutine::StackCoroutine( |
| StackSwitcher::push_type&& pusher) noexcept |
| : pusher(std::move(pusher)) |
| {} |
| |
| void |
| SynchronousCoroutine::StackCoroutine::join() noexcept |
| { |
| assume(!this->pusher); |
| } |
| |
| void |
| SynchronousCoroutine::StackCoroutine::notify_before_exit() noexcept |
| { |
| this->coroutine_tid = std::thread::id(); |
| } |
| |
| void |
| SynchronousCoroutine::StackCoroutine::set_state( |
| State* state_slot, |
| State&& new_state) noexcept |
| { |
| *state_slot = std::move(new_state); |
| } |
| |
| template<typename Functor> |
| static |
| void |
| with_saved_py_thread_state(Functor&& f) |
| { |
| PyThreadState* tstate = PyThreadState_GET(); |
| assert(!tstate->use_tracing); |
| #if PY_VERSION_HEX < 0x03080000 |
| assert(!tstate->in_coroutine_wrapper); |
| #endif |
| auto saved_recursion_depth = tstate->recursion_depth; |
| auto saved_frame = tstate->frame; |
| #if PY_VERSION_HEX >= 0x03080000 |
| auto saved_exc_type = tstate->curexc_type; |
| auto saved_exc_value = tstate->curexc_value; |
| auto saved_exc_traceback = tstate->curexc_traceback; |
| #elif PY_VERSION_HEX >= 0x03070000 |
| auto saved_exc_info = tstate->exc_info; |
| auto saved_exc_state = tstate->exc_state; |
| #else |
| auto saved_exc_type = tstate->exc_type; |
| auto saved_exc_value = tstate->exc_value; |
| auto saved_exc_traceback = tstate->exc_traceback; |
| #endif |
| f(); |
| assert(tstate == PyThreadState_GET()); |
| #if PY_VERSION_HEX >= 0x03080000 |
| tstate->curexc_traceback = saved_exc_traceback; |
| tstate->curexc_value = saved_exc_value; |
| tstate->curexc_type = saved_exc_type; |
| #elif PY_VERSION_HEX >= 0x03070000 |
| tstate->exc_state = saved_exc_state; |
| tstate->exc_info = saved_exc_info; |
| #else |
| tstate->exc_traceback = saved_exc_traceback; |
| tstate->exc_value = saved_exc_value; |
| tstate->exc_type = saved_exc_type; |
| #endif |
| tstate->frame = saved_frame; |
| tstate->recursion_depth = saved_recursion_depth; |
| } |
| |
| template<typename Predicate> |
| void |
| SynchronousCoroutine::StackCoroutine::set_state_and_wait( |
| State* state_slot, |
| State&& new_state, |
| Predicate&& predicate) noexcept |
| { |
| with_saved_py_thread_state([&] { |
| bool we_are_coroutine = state_slot->is_running(); |
| *state_slot = std::move(new_state); |
| while (!predicate()) { |
| if (we_are_coroutine) { |
| assert(std::this_thread::get_id() == this->coroutine_tid); |
| this->coroutine_tid = std::thread::id(); |
| (*this->puller)(); |
| this->coroutine_tid = std::this_thread::get_id(); |
| } else { |
| assert(this->coroutine_tid == std::thread::id()); |
| this->pusher(DummyAdlHack()); |
| assert(this->coroutine_tid == std::thread::id()); |
| } |
| } |
| }); |
| } |
| |
| void |
| SynchronousCoroutine::StackCoroutine::set_puller( |
| StackSwitcher::pull_type* puller) noexcept |
| { |
| assume(!this->puller); |
| this->puller = puller; |
| this->coroutine_tid = std::this_thread::get_id(); |
| } |
| |
| std::thread::id |
| SynchronousCoroutine::StackCoroutine::get_coroutine_tid() const noexcept |
| { |
| return this->coroutine_tid; |
| } |
| |
| SynchronousCoroutine::SynchronousCoroutine(pyref args, pyref kwargs) |
| : state(States::Initial(args, kwargs)) |
| {} |
| |
| SynchronousCoroutine::States::Initial::Initial(pyref args, pyref kwargs) |
| { |
| PARSEPYARGS( |
| (pyref, function) |
| (OPTIONAL_ARGS_FOLLOW) |
| (KWONLY_ARGS_FOLLOW) |
| (bool, on_stack, false) |
| )(args, kwargs); |
| this->function = function.addref(); |
| this->on_stack = on_stack; |
| } |
| |
| SynchronousCoroutine::State |
| SynchronousCoroutine::States::Initial::send( |
| SynchronousCoroutine* sync_coro, |
| ValueOrException value_or_exception) |
| { |
| bool initial_value_valid = false; |
| value_or_exception.visit([&](const auto& value) { |
| using Value = std::decay_t<decltype(value)>; |
| if constexpr (std::is_same_v<Value, unique_pyref>) { |
| if (value == Py_None) |
| initial_value_valid = true; |
| } |
| }); |
| |
| if (!initial_value_valid) |
| throw_pyerr_msg( |
| PyExc_TypeError, |
| "can't send non-None value to just-started coroutine"); |
| |
| // N.B. we hold the GIL throughout this function, ensuring that the |
| // thread remains dormant until the driver loop sends it to |
| // the coroutine. |
| if (this->on_stack) { |
| struct Functor { |
| Functor(unique_pyref fn, SynchronousCoroutine* sync_coro) |
| : fn(std::move(fn)), sync_coro(sync_coro) |
| {} |
| void operator()(StackSwitcher::pull_type& puller) const noexcept { |
| auto* coro = this->sync_coro->coroutine.get_if<StackCoroutine>(); |
| coro->set_puller(&puller); |
| this->sync_coro->run_thread(this->fn.addref()); |
| } |
| private: |
| unique_pyref fn; |
| SynchronousCoroutine* sync_coro; |
| }; |
| StackSwitcher::push_type pusher(Functor(this->function.addref(), sync_coro)); |
| StackCoroutine stack_coroutine { std::move(pusher) }; |
| sync_coro->coroutine = std::move(stack_coroutine); |
| } else { |
| // Rely on the default value being a ThreadCoroutine so that the |
| // mutex is pre-initialized. |
| assert(sync_coro->coroutine.get_if<ThreadCoroutine>()); |
| sync_coro->coroutine.get_if<ThreadCoroutine>()->start_thread( |
| std::thread([sync_coro](unique_pyref function) { |
| sync_coro->run_thread(std::move(function)); |
| }, addref(this->function))) ; |
| } |
| return States::Running(); |
| } |
| |
| SynchronousCoroutine::State |
| SynchronousCoroutine::States::WaitingForInjection::send( |
| SynchronousCoroutine* /*sync_coro*/, |
| ValueOrException value_or_exception) |
| { |
| return value_or_exception.visit([&](auto& value) { |
| using Value = std::decay_t<decltype(value)>; |
| if constexpr (std::is_same_v<Value, unique_pyref>) { |
| return State(States::HaveInjectedValue{std::move(value)}); |
| } else { // NOLINT |
| return State(States::HaveInjectedException{std::move(value)}); |
| } |
| }); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::HaveInjectedValue::complete_sync_yield( |
| SynchronousCoroutine* sync_coro) |
| { |
| unique_pyref value = std::move(this->value); |
| sync_coro->set_state(States::Running()); |
| return value; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::HaveInjectedException::complete_sync_yield( |
| SynchronousCoroutine* sync_coro) |
| { |
| std::move(this->exception).restore(); |
| sync_coro->set_state(States::Running()); |
| return {}; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::Running::sync_yield( |
| SynchronousCoroutine* sync_coro, |
| unique_pyref yield_value) |
| { |
| State new_state = States::YieldingValue(std::move(yield_value)); |
| sync_coro->coroutine.visit([&](auto& coro) { |
| coro.set_state_and_wait( |
| &sync_coro->state, |
| std::move(new_state), |
| [&] { return sync_coro->state.can_complete_sync_yield(); }); |
| }); |
| return sync_coro->state.complete_sync_yield(sync_coro); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::YieldingValue::iter_next( |
| SynchronousCoroutine* sync_coro) |
| { |
| unique_pyref yield_value = std::move(this->yield_value.notnull()); |
| sync_coro->set_state(States::WaitingForInjection()); |
| return yield_value; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::ExitedWithValue::iter_next( |
| SynchronousCoroutine* sync_coro) |
| { |
| sync_coro->coroutine.visit([&](auto& coro){ |
| coro.join(); |
| }); |
| _PyGen_SetStopIterationValue(this->exit_value.get()); |
| return {}; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::States::ExitedWithException::iter_next( |
| SynchronousCoroutine* sync_coro) |
| { |
| sync_coro->coroutine.visit([&](auto& coro){ |
| coro.join(); |
| }); |
| std::move(this->exit_exception).restore(); |
| return {}; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::do_send(ValueOrException value_or_exception) |
| { |
| State new_state = this->state.send(this, std::move(value_or_exception)); |
| assume(new_state.is_running()); |
| this->coroutine.visit([&](auto& coro) { |
| coro.set_state_and_wait( |
| &this->state, |
| std::move(new_state), |
| [&]{ |
| return !this->state.is_running(); |
| }); |
| }); |
| return this->state.iter_next(this); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::send(pyref obj) |
| { |
| return this->do_send(addref(obj)); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::next() |
| { |
| return this->do_send(addref(Py_None)); |
| } |
| |
| static |
| PyExceptionInfo |
| exception_info_from_throw_args(pyref type, pyref value, pyref traceback) |
| { |
| PyExceptionInfo info; |
| |
| // See the logic in _gen_throw in Python. We replicate the type |
| // coercion logic. |
| |
| type.notnull(); |
| if (value == Py_None) |
| value = pyref(); |
| if (traceback == Py_None) |
| traceback = pyref(); |
| else if (traceback) |
| check_pytype(&PyTraceBack_Type, traceback); |
| |
| if (PyExceptionClass_Check(type.get())) { |
| info.type = addref(type); |
| info.value = addref(value); |
| info.traceback = addref(traceback); |
| if (info.value || info.traceback) |
| info.normalize(); |
| } else if (PyExceptionInstance_Check(type.get())) { |
| if (value && value != Py_None) |
| throw_pyerr_msg(PyExc_TypeError, |
| "instance exception may not have a separate value"); |
| info.type = addref(pytype(type)); |
| info.value = addref(type); |
| info.traceback = addref(traceback); |
| if (!info.traceback) |
| info.traceback = adopt(PyException_GetTraceback(info.value.get())); |
| } else { |
| throw_pyerr_fmt(PyExc_TypeError, |
| "bad throw() arguments: (%s, %s, %s)", |
| repr(type), repr(value), repr(traceback)); |
| } |
| |
| assume(info.type); |
| return info; |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::throw_(pyref type, pyref value, pyref traceback) |
| { |
| return this->do_send( |
| exception_info_from_throw_args(type, value, traceback)); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::py_throw(pyref args) |
| { |
| PARSEPYARGS( |
| (pyref, type) |
| (OPTIONAL_ARGS_FOLLOW) |
| (pyref, value) |
| (pyref, traceback) |
| )(args); |
| return this->throw_(type, value, traceback); |
| } |
| |
| void |
| SynchronousCoroutine::close() noexcept |
| { |
| if (!this->state.safe_to_close()) { |
| PyExceptionSaver saved; |
| while (!this->state.safe_to_close()) { |
| unique_pyref ret; |
| try { |
| ret = this->throw_(PyExc_GeneratorExit); |
| } catch (...) { |
| _set_pending_cxx_exception_as_pyexception(); |
| } |
| if (ret) { |
| PyErr_WarnFormat( |
| nullptr, 2, |
| "ignoring value yielded during coroutine close: %R", |
| ret.get()); |
| PyErr_Clear(); |
| } else { |
| assert(PyErr_Occurred()); |
| PyExceptionInfo ex_info = PyExceptionInfo::fetch(); |
| if (ex_info.type != PyExc_GeneratorExit) { |
| std::move(ex_info).restore(); |
| pyref this_type = ::dctv::pytype(this); |
| PyErr_WriteUnraisable(this_type.get()); |
| } |
| } |
| } |
| } |
| assume(this->state.safe_to_close()); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::py_close() noexcept |
| { |
| // TODO(dancol): just autogenerate "return None" in wraperr. |
| this->close(); |
| return addref(Py_None); |
| } |
| |
| unique_pyref |
| SynchronousCoroutine::sync_yield(unique_pyref value) |
| { |
| std::thread::id correct_tid = this->coroutine.visit([&](auto& coro) { |
| return coro.get_coroutine_tid(); |
| }); |
| if (correct_tid != std::this_thread::get_id()) |
| throw_pyerr_msg(PyExc_RuntimeError, "sync yield on wrong thread"); |
| return this->state.sync_yield(this, std::move(value)); |
| } |
| |
| SynchronousCoroutine::~SynchronousCoroutine() noexcept |
| { |
| this->close(); |
| } |
| |
| void |
| SynchronousCoroutine::run_thread(unique_pyref&& in_function) noexcept |
| { |
| with_gil_acquired([&] { |
| unique_pyref function = std::move(in_function); |
| unique_pyref value; |
| try { |
| auto yielder = make_pyobj<SynchronousYieldFunction>(this); |
| FINALLY(yielder->invalidate()); |
| value = call(function, yielder); |
| } catch (...) { |
| assert(this->state.is_running()); |
| _set_pending_cxx_exception_as_pyexception(); |
| PyExceptionInfo errinfo = PyExceptionInfo::fetch(); |
| this->set_state(States::ExitedWithException(std::move(errinfo))); |
| return; |
| } |
| assert(this->state.is_running()); |
| this->set_state(States::ExitedWithValue(std::move(value))); |
| function.reset(); // Before releasing GIL |
| }); |
| this->coroutine.visit([&](auto& coro) { |
| coro.notify_before_exit(); |
| }); |
| } |
| |
| void |
| SynchronousCoroutine::set_state(State&& state) noexcept |
| { |
| this->coroutine.visit([&](auto& coro) { |
| coro.set_state(&this->state, std::move(state)); |
| }); |
| } |
| |
| PyTypeObject SynchronousYieldFunction::pytype = |
| make_py_type<SynchronousYieldFunction>( |
| "dctv._native.SynchronousYieldFunction", |
| "Synchronous yield-alike", |
| [](PyTypeObject* t) { |
| t->tp_call = wraperr<&SynchronousYieldFunction::call>(); |
| }); |
| |
| PyMethodDef SynchronousCoroutine::pymethods[] = { |
| make_methoddef("send", |
| wraperr<&SynchronousCoroutine::send>(), |
| METH_O, |
| "Send value into the coroutine"), |
| make_methoddef("throw", |
| wraperr<&SynchronousCoroutine::py_throw>(), |
| METH_VARARGS, |
| "Send exception into the coroutine"), |
| make_methoddef("close", |
| wraperr<&SynchronousCoroutine::py_close>(), |
| METH_NOARGS, |
| "Close the coroutine"), |
| { 0 }, |
| }; |
| |
| PyAsyncMethods SynchronousCoroutine::pyasync = { |
| return_self /* am_await */, |
| nullptr /* am_aiter */, |
| nullptr /* am_anext */, |
| }; |
| |
| PyTypeObject SynchronousCoroutine::pytype = |
| make_py_type<SynchronousCoroutine>( |
| "dctv._native.SynchronousCoroutine", |
| "Coroutine interface to synchronous code", |
| [](PyTypeObject* t) { |
| t->tp_iter = return_self; |
| t->tp_as_async = &SynchronousCoroutine::pyasync; |
| t->tp_iternext = wraperr<&SynchronousCoroutine::next>(); |
| t->tp_methods = SynchronousCoroutine::pymethods; |
| }); |
| |
| void |
| init_sync_coro(pyref m) |
| { |
| register_type(m, &SynchronousYieldFunction::pytype); |
| register_type(m, &SynchronousCoroutine::pytype); |
| } |
| |
| } // namespace dctv |