blob: f0becd4b59a11d05b33b6503f401a5616c5a3d9d [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Event loop"""
# TODO: optimize this mechanism
#
# - Use shared memory to manage reference counts instead of constantly chatting
# with resource manager
#
# - Use persistent IO buffers.
#
# - Benchmark RTTs.
import threading
import logging
import select
import asyncio
import weakref
from concurrent.futures import CancelledError
from functools import partial
from itertools import count, chain
from enum import Enum
from contextlib import contextmanager
from socket import SHUT_WR
from cytoolz import first
from .channel import (
Channel,
MessageSendContext,
)
from .shm import (
PeerDiedException,
ResourceRecvContext,
ResourceSendingMessageContextMixin,
SendMsgError,
SharedObject,
ShmSharedMemorySendContextMixin,
_assert_valid_oid,
cache_object_until_death,
get_process_resmanc,
)
from .util import (
CannotPickle,
ChainableFuture,
ClosingContextManager,
SafeClosingObject,
cached_property,
decode_exception,
die_due_to_fatal_exception,
encode_exception,
maybe_running_event_loop,
maybe_this_thread_event_loop,
running_event_loop,
the,
tls,
weak_bind,
)
from .waitsem import WaitableSemaphore
log = logging.getLogger(__name__)
class MessageUndeliverableError(RuntimeError):
"""Message could not be delivered to destination"""
pass
class NoSuchObjectError(RuntimeError):
"""Named object does not exist"""
pass
class NoSuchMethodError(RuntimeError):
"""Method in object does not exist"""
pass
class ApartmentQuitError(RuntimeError):
"""Operation on an apartment that's been closed"""
pass
def _do_call_oneway(call_gate, call_info, ret_filter):
assert not ret_filter
try:
return call_gate.send_message(OneWayMessage(call_info))
except PeerDiedException:
log.debug("failed to send one-way message", exc_info=True)
def _do_call_async(call_gate, call_info, ret_filter):
apartment = Apartment.current()
future, reply_id = apartment.make_reply_future(call_gate)
future.async_death = True
if not future.done():
try:
call_gate.send_message(
TwoWayMessage(apartment.call_gate,
reply_id,
(),
ret_filter,
call_info))
except Exception as ex:
future.set_exception(ex)
return future
def _do_call_sync(call_gate, call_info, ret_filter):
apartment = Apartment.current()
future, reply_id = apartment.make_reply_future(call_gate)
assert not future.async_death
if not future.done():
try:
call_gate.send_message(
TwoWayMessage(apartment.call_gate,
reply_id,
tls.active_calls,
ret_filter,
call_info))
apartment.wait_for_reply(reply_id)
except Exception as ex:
future.set_exception(ex)
assert future.done()
return future.result()
# TODO(dancol): native support for AIO futures?
_do_call_aio = _do_call_async
class FunctionProxy(object):
"""Proxy for a function"""
def __init__(self, function, call_gate):
assert isinstance(call_gate, CallGate)
self.__function = function
self.__call_gate = call_gate
self.__ret_filter = None
self.__do_call = _do_call_sync
def set_ret_filter(self, ret_filter):
"""Set the return filter for this call."""
self.__ret_filter = ret_filter
return self
def proxy_ret(self):
"""Return a proxy for this call's real return value"""
return self.set_ret_filter(proxy_for)
@property
def call_oneway(self):
"""Set up a one-way call"""
self.__do_call = _do_call_oneway
return self
@property
def call_async(self):
"""Set up an asynchronous call yielding a regular future"""
self.__do_call = _do_call_async
return self
@property
def call_sync(self):
"""Set up a synchronous blocking call"""
self.__do_call = _do_call_sync
return self
@property
def call_aio(self):
"""Set up an asynchronous call yielding an asyncio future"""
self.__do_call = _do_call_aio
return self
def __call__(self, *args, **kwargs):
return self.__do_call(self.__call_gate,
(self.__function, args, kwargs),
self.__ret_filter)
class Proxy(SharedObject):
"""Proxy for an Apartment-owned object"""
def __init__(self, proxied_cls, call_gate):
assert isinstance(proxied_cls, type)
assert isinstance(call_gate, CallGate)
self.__proxied_cls = proxied_cls
self.__call_gate = call_gate
@staticmethod
def get_proxied_class(proxy):
"""Return the type of the object proxied"""
assert isinstance(proxy, Proxy)
return proxy.__proxied_cls # pylint: disable=protected-access
@staticmethod
def get_call_gate(proxy):
"""Return the call gateway used to call methods"""
assert isinstance(proxy, Proxy)
return proxy.__call_gate # pylint: disable=protected-access
def __getattr__(self, name):
if name.startswith("__"):
raise AttributeError(name) # No attribute access!
return self.__call_gate.fn((self, name))
def __call__(self, *args, **kwargs): # Hack: should be generic!
return self.__call_gate.fn((self, "__call__"))(*args, **kwargs)
def __repr__(self):
return "<Proxy for {!r}>".format(self.__proxied_cls)
def __str__(self):
return repr(self)
_resman_send_eager = True
class Message(object):
"""Message sent between apartments"""
def dispatch(self, apartment):
"""Handle the message"""
raise NotImplementedError("abstract")
def apartment_quit_rundown(self):
"""Called when we're tearing down the apartment"""
class RunAsyncFunctionMessage(Message):
"""Synthetic message implementing Apartment.run_async()"""
def __init__(self, function):
self.function = function
self.future = ChainableFuture()
self.future.set_running_or_notify_cancel()
def dispatch(self, _apartment):
try:
result = self.function()
except BaseException as ex:
self.future.set_exception(ex)
else:
self.future.set_result(result)
def apartment_quit_rundown(self):
self.future.set_exception(ApartmentQuitError())
class OneWayMessage(Message):
"""Message for function call without reply"""
def __init__(self, call):
Message.__init__(self)
self.call = call
def dispatch(self, apartment):
"""Handle the message"""
result = apartment.do_call(self.call)
if asyncio.iscoroutine(result):
asyncio.ensure_future(result) # Schedule by side effect
class TwoWayMessage(Message):
"""Message for function call with reply"""
def __init__(self, reply_call_gate, # pylint: disable=too-many-arguments
reply_id,
active_calls,
ret_filter,
call):
Message.__init__(self)
assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero
assert isinstance(active_calls, tuple)
self.reply_call_gate = reply_call_gate
self.reply_id = reply_id
self.active_calls = active_calls
self.ret_filter = ret_filter
self.call = call
def __repr__(self):
return "<TwoWayMessage rcg={!r} rid={!r} ac={!r} rf={!r} c={!r}>".format(
self.reply_call_gate,
self.reply_id,
self.active_calls,
self.ret_filter,
self.call)
def __send_reply(self, message):
try:
self.reply_call_gate.send_message(message)
except PeerDiedException:
log.debug("dropping two-way message reply because sender died")
def __send_error_reply(self, ex):
self.__send_reply(ErrorReplyMessage(self.reply_id, encode_exception(ex)))
def __send_success_reply(self, result):
if self.ret_filter:
result = self.ret_filter(result)
self.__send_reply(SuccessReplyMessage(self.reply_id, result))
def __on_task_done(self, future):
if future.cancelled():
self.__send_error_reply(CancelledError())
elif future.exception():
self.__send_error_reply(future.exception())
else:
self.__send_success_reply(future.result())
def dispatch(self, apartment):
# pylint: disable=redefined-variable-type
try:
call_id = (self.reply_call_gate.oid, self.reply_id)
with tls.set_active_calls(tls.active_calls +
self.active_calls +
(call_id,)):
result = apartment.do_call(self.call)
except Exception as ex:
self.__send_error_reply(ex)
else:
if asyncio.iscoroutine(result):
# TODO(dancol): cancel future and send reply on detach!
asyncio.ensure_future(result).add_done_callback(self.__on_task_done)
else:
self.__send_success_reply(result)
class SuccessReplyMessage(Message):
"""Message for indicating a two-way message's success"""
def __init__(self, reply_id, value):
Message.__init__(self)
assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero
self.reply_id = reply_id
self.value = value
def dispatch(self, apartment):
future = apartment.get_call_future(self.reply_id)
if future:
future.set_result(self.value)
else:
if __debug__: # pylint: disable=else-if-used
log.debug("reply id %s: success but now irrelevant",
self.reply_id)
class ErrorReplyMessage(Message):
"""Message for indicating a two-way message's failure"""
def __init__(self, reply_id, saved_exc):
Message.__init__(self)
assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero
self.reply_id = reply_id
self.saved_exc = saved_exc
def dispatch(self, apartment):
future = apartment.get_call_future(self.reply_id)
if future:
future.set_exception(decode_exception(self.saved_exc))
else:
if __debug__: # pylint: disable=else-if-used
log.debug("reply id %s: failed but now irrelevant",
self.reply_id, exc_info=decode_exception(self.saved_exc))
class ApartmentHandle(SharedObject):
"""IPC-capable handle to an Apartment
Useful mostly for death notifications.
"""
def __init__(self, name):
super().__init__()
self.name = the(str, name)
class ApartmentFactory(object):
"""Factory for making Apartment objects and their CallGate instances.
ApartmentFactory instances can be sent between processes. Apartment
objects themselves cannot be. Call gates can also be sent between
processes, of course.
An ApartmentFactory can be used to produce only one Apartment.
"""
def __init__(self, name="apartment", process_resmanc_oid=None):
"""Make an ApartmentFactory."""
self.__handle = ApartmentHandle(name)
self.__read_channel, write_channel = Channel.make_pair(duplex=False)
self.__call_gate = CallGate(
write_channel,
self.__handle.oid,
process_resmanc_oid or get_process_resmanc().id)
@property
def call_gate(self):
"""The call gate for making calls to the new apartment"""
return self.__call_gate
def make_apartment(self):
"""Make the Apartment for this factory"""
assert get_process_resmanc().id == self.__call_gate.process_oid, \
("the host process for an apartment must be specified at factory "
"creation time")
apartment = Apartment(self.__handle,
self.__read_channel,
self.__call_gate)
del self.__handle
del self.__read_channel
return apartment
class CallGate(SharedObject):
"""Handle for making function calls to an apartment"""
# The CallGate object is distinct from the Apartment proper because
# we can share CallGate instances across processes (via shm.py
# magic), but we can't share full Apartment objects, which have
# process affinity and which would be kept alive indefinitely by
# sharing in any case. (A CallGate amounts to a weak reference to
# an Apartment.)
def __init__(self, write_channel, apartment_oid, process_oid):
assert isinstance(write_channel, Channel)
assert _assert_valid_oid(apartment_oid)
assert _assert_valid_oid(process_oid)
self.__write_channel = write_channel
self.__apartment_oid = apartment_oid
self.__process_oid = process_oid
@property
def channel(self):
"""Channel object to which we write commands"""
return self.__write_channel
@property
def apartment_oid(self):
"""The OID of the apartment that receives calls made through us"""
return self.__apartment_oid
@property
def process_oid(self):
"""The OID of the process that owns the apartment"""
return self.__process_oid
def send_message(self, message):
"""Send a message to the apartment.
Internal use only.
"""
# TODO(dancol): this call can block. sometimes that's okay, but
# we really should have an all-asyncio message path to avoid any
# possible deadlock in replies.
assert isinstance(message, Message)
self.__write_channel.send(message, ApartmentSendContext.to(self))
def fn(self, function):
"""Retrieve an object for making function calls"""
return FunctionProxy(function, self)
def _resman_after_pull_hook(self):
cache_object_until_death(self, self.apartment_oid)
def poll_for_send(self):
"""Wait until we can write a message"""
self.__write_channel.poll_for_send()
class AttachInfo(object):
"""Per-event-loop apartment attachment information"""
def __init__(self, apartment):
self.apartment = the(Apartment, apartment)
self.count = 0
def maybe_get_attach_info(loop):
"""Get event loop LOOP's apartment info object"""
assert isinstance(loop, asyncio.AbstractEventLoop)
return getattr(loop, "_apartment_attach", None)
def set_attach_info(loop, apartment):
"""Set the apartment associated with the event loop"""
assert isinstance(loop, asyncio.AbstractEventLoop)
if apartment:
assert not maybe_get_attach_info(loop)
setattr(loop, "_apartment_attach", the(AttachInfo, apartment))
else:
assert maybe_get_attach_info(loop)
delattr(loop, "_apartment_attach")
class ApartmentSendContext(
ShmSharedMemorySendContextMixin,
MessageSendContext,
ResourceSendingMessageContextMixin,
):
"""Send context for transferring resource-bearing messages via resman"""
def __init__(self, call_gate):
super().__init__()
self.__call_gate = the(CallGate, call_gate)
def sendmsg(self, pickle_data, block, channel):
if not self.resources:
return super().sendmsg(pickle_data, block, channel)
try:
while not get_process_resmanc().cg_sendmsg_nonblock(
self.__call_gate.oid,
bytes(pickle_data),
self.fdhs,
self.resources):
self.__call_gate.poll_for_send()
except SendMsgError as ex:
raise ex.__cause__ or ex
@classmethod
def to(cls, call_gate):
"""Make a factory function"""
return partial(cls, call_gate)
class Apartment(ClosingContextManager, CannotPickle):
"""A habitat for objects.
The Apartment object is basically the same as the COM concept of the
same name; it allows for cross-process coherent object method
calling. To create an object usable from multiple processes, create
the object, then call proxy_for(that_object). You can then send the
resulting Proxy object to other processes via the low-level shm
mechanism, pickle, and so on, and these other processes can call
methds on that proxy and get replies.
The object, at this point, "lives" inside the apartment that was
active at the point proxy_for was called. All calls to object
methods (via the proxy) take place in threads attached to the
apartment that owns the object. If a method (via a proxy) is called
from outside the object's home apartment, the sending apartment
sends a message to the object's home apartment and waits for
a reply.
Each apartment is associated with an asyncio event loop, which takes
care of actual IO multiplexing. We used to do the IO ourselves, but
it turns out that asyncio does an adequate job already.
[1] shm preserves object identity across process boundaries
"""
@staticmethod
def current():
"""Return current thread's running apartment
It is invalid to call this function unless we're actually running
an event loop associated with an apartment.
"""
# pylint: disable=protected-access
return running_event_loop()._apartment_attach.apartment
@staticmethod
def maybe_current():
"""Return running apartment or None"""
loop = maybe_running_event_loop()
if loop:
return getattr(loop, "_apartment_attach", None)
return None
def __init__(self, # pylint: disable=too-many-arguments
handle,
read_channel,
call_gate):
assert isinstance(read_channel, Channel)
self.__handle = the(ApartmentHandle, handle)
self.__read_channel = read_channel
self.__call_gate = call_gate
self.__message_queue_semaphore = WaitableSemaphore()
# Protects everything below
self.__lock = threading.Lock()
self.__number_attached = 0
self.__weak_last_detach_callbacks = weakref.WeakSet()
# Tracks objects that live here
self.__owned = {}
# Pending futures
self.__reply_id_sequence = count(1)
self.__pending_calls = {}
self.__message_queue = []
# All death callbacks we've added, kept so that we can remove
# callbacks from resmanc when we close this apartment.
self.__death_callback_keys = weakref.WeakSet()
# For blocking calls
self.__wsem_by_reply_id = {}
self.__wsem_free_list = []
@cached_property
def name(self):
"""The name of this apartment"""
return self.__handle.name
@cached_property
def id(self):
"""The ID of the apartment handle"""
return self.__handle.oid
def call_async(self, fn):
"""Call function FN inside the apartment
Safe to call from any thread. Bypasses CallGate pickle, reference
tracking, and so on.
Return a future supplying the result of that call.
"""
return self.__enqueue_message(RunAsyncFunctionMessage(fn)).future
def __read_message_nonblock(self):
try:
return self.__read_channel.recv(ResourceRecvContext, block=False)
except BlockingIOError:
return None
def add_weak_last_detach_callback(self, callback):
"""Add a callback run when last thread detaches"""
self.__weak_last_detach_callbacks.add(callback)
def remove_weak_last_detach_callback(self, callback):
"""Remove a callback added with add_weak_last_detach_callback"""
self.__weak_last_detach_callbacks.discard(callback)
def __add_death_callback(self, oid, callback):
key = get_process_resmanc().add_death_callback(oid, callback)
self.__death_callback_keys.add(key)
return key
def make_proxy(self, obj):
"""Make a proxy for calling OBJ via this apartment"""
assert Apartment.current() is self
proxy = Proxy(type(obj), self.__call_gate)
proxy_oid = proxy.oid
assert proxy_oid not in self.__owned
self.__owned[proxy_oid] = obj
try:
def _on_proxy_death():
with self.__lock:
if not self.closed:
del self.__owned[proxy_oid]
self.__add_death_callback(proxy_oid, _on_proxy_death)
except:
del self.__owned[proxy.oid]
raise
return proxy
def unproxy(self, proxy):
"""Return object behind proxy"""
assert Apartment.current() is self
assert isinstance(proxy, Proxy)
return self.__owned[proxy.oid]
def make_reply_future(self, call_gate):
"""Make a future that we resolve when we receive a reply.
Return a pair (FUTURE, REPLY_ID) where FUTURE is the future
resolved upon reply and REPLY_ID is the ID that other apartments
should include in their messages in order to activate FUTURE.
"""
assert isinstance(call_gate, CallGate)
assert Apartment.current() is self, "must be attached"
# No synchronization necessary here: the counter increment is
# atomic, and the reply_id is unique, so we can't collide on
# self.__pending_calls slots.
reply_future = ChainableFuture()
reply_future.set_running_or_notify_cancel() # Cancel not supported
reply_future.async_death = False
reply_id = next(self.__reply_id_sequence)
def _on_call_target_death():
def _set_exception():
reply_future.set_exception(ApartmentQuitError())
if reply_future.async_death:
self.call_async(_set_exception)
else:
_set_exception()
# N.B. _on_call_target_death can run before this call even
# returns. If it does, _on_reply_future_done will also
# run immediately.
death_key = self.__add_death_callback(call_gate.apartment_oid,
_on_call_target_death)
self.__pending_calls[reply_id] = reply_future
def _on_reply_future_done(future):
assert future is reply_future
get_process_resmanc().remove_death_callback(death_key)
del self.__pending_calls[reply_id]
wsem = self.__wsem_by_reply_id.get(reply_id)
if wsem:
wsem.release() # Wake synchronous waiter
reply_future.add_done_callback(_on_reply_future_done)
return reply_future, reply_id
def __enqueue_message(self, message):
"""Add the message to the queue"""
with self.__lock:
if self.closed:
message.apartment_quit_rundown()
else:
self.__message_queue.append(message)
self.__message_queue_semaphore.release()
return message
def __get_wsem_locked(self):
if self.__wsem_free_list:
wsem = self.__wsem_free_list.pop()
else:
wsem = WaitableSemaphore()
assert not wsem.get_value()
return wsem
@contextmanager
def __wsem_for_reply_id(self, reply_id):
"""Signal a waitable semaphore when we get a reply.
Yield the waitable semaphore or None if we've already processed
the reply by the time we try to make the wsem arrangement.
"""
with self.__lock:
if reply_id in self.__pending_calls:
wsem = self.__get_wsem_locked()
assert reply_id not in self.__wsem_by_reply_id
self.__wsem_by_reply_id[reply_id] = wsem
else:
wsem = None
try:
yield wsem
finally:
if wsem:
with self.__lock:
wsem = self.__wsem_by_reply_id.pop(reply_id)
wsem.try_acquire_all() # Reset in case we were signaled
assert not wsem.get_value()
self.__wsem_free_list.append(wsem)
def wait_for_reply(self, reply_id):
"""Wait for a response for a message"""
# The operation of this routine is tricky. Keep in mind that
# apartments can be multithreaded, so multiple threads may be pulling
# from the dispatch queue. Also keep in mind that we track call
# causality and want to process on this thread (or any other)
# "callback" messages having to do with any blocking calls
# pending, so we want to keep pumping _some_ messages while we
# wait for a reply.
#
# The simplest way of dealing with various TOCTOU races between
# checking for a reply and poll() is to associate a waitable
# semaphore with the reply we're expecting. _Any_ thread that
# receives a reply ups the semaphore. Creating a waitable
# semaphore is expensive, so we keep a free list.
#
# While we wait, we retrieve messages and see whether each one
# might be relevant to the current wait, as indicated by the
# _predicate function. We enqueue any messages that aren't
# immediately relevant. Some other thread doing the same thing may
# have already enqueued a message that we care about, so we scan
# the queue as well as freshly-arriving messages.
assert Apartment.current() is self
my_call_id = (self.__call_gate.oid, reply_id)
def _predicate(message):
"""Return true to dispatch MESSAGE on the current thread"""
if isinstance(message, TwoWayMessage):
if my_call_id in message.active_calls:
return True # Callback
if (message.reply_call_gate is self.__call_gate
and message.reply_id == reply_id):
return True # Self-call
if (isinstance(message, (SuccessReplyMessage, ErrorReplyMessage))
and message.reply_id in self.__wsem_by_reply_id):
return True # This is a reply to blocking call
return False # Enqueue it
# TODO(dancol): preserve callback stack nesting: force synchronous
# callbacks to run on this thread, not some other thread in
# this apartment.
with self.__wsem_for_reply_id(reply_id) as wsem:
if not wsem:
return # Already completed
# TODO(dancol): what happens if we're part of a pool and the pool
# wants to exit?
poll = select.poll()
# Note that we *don't* watch the message queue semaphore here.
# If we did, we'd busy-wait. We do want to know when our peer
# dies though, so we have a special death callback that tickles
# wsem directly if that happens.
for wait in chain([wsem.fileno()], self.__read_channel.fd_numbers):
poll.register(wait, select.POLLIN)
while True:
if wsem.get_value():
return # Have our value.
self.__dispatch_some_messages(_predicate)
message = self.__read_message_nonblock()
if message:
if _predicate(message):
message.dispatch(self)
else:
self.__enqueue_message(message)
poll.poll()
def __dispatch_some_messages(self, predicate):
"""Scan the message queue and dispatch some messages
Dispatch on the current thread any messages for which PREDICATE
returns true.
"""
to_dispatch = []
with self.__lock:
i = 0
message_queue = self.__message_queue
message_queue_semaphore = self.__message_queue_semaphore
assert message_queue_semaphore.get_value() == len(message_queue)
while i < len(message_queue):
if predicate(message_queue[i]):
# Here, we're using the try_acquire as a plain decrement in
# order to keep the semaphore and message queue lengths
# consistent as we pluck messages out of the queue.
acq = message_queue_semaphore.try_acquire()
assert acq, "checked count above"
to_dispatch.append(message_queue.pop(i))
else:
i += 1
assert message_queue_semaphore.get_value() == len(message_queue)
for message in to_dispatch:
message.dispatch(self)
def do_call(self, call):
"""Actually dispatch a call, resolving function to proxy if needed"""
# TODO(dancol): get rid of special case for method calls
assert Apartment.current() is self
function, args, kwargs = call
if isinstance(function, tuple):
proxy, method_name = function
try:
obj = self.__owned[proxy.oid] # Atomic
except KeyError:
raise NoSuchObjectError(repr(proxy))
try:
function = getattr(obj, method_name)
except AttributeError:
raise NoSuchObjectError(method_name)
return function(*args, **kwargs)
def get_call_future(self, reply_id):
"""Get the future corresponding to REPLY_ID
Return None if we're not waiting for any such reply.
"""
assert Apartment.current() is self
return self.__pending_calls.get(reply_id)
def __pop_queued_message(self):
# Need to take the outer lock here to synchronize with
# __dispatch_some_messages's message queue scanning. try_acquire
# is cheap in the common case that we have an empty queue.
with self.__lock:
if self.__message_queue_semaphore.try_acquire():
return self.__message_queue.pop(0)
return None
def __check_full(self):
try:
self.__check_messages_only()
message = self.__read_message_nonblock()
if message:
message.dispatch(self)
except:
die_due_to_fatal_exception("dispatching messages")
def __check_messages_only(self):
try:
assert Apartment.current() is self
while True:
message = self.__pop_queued_message()
if not message:
break
message.dispatch(self)
except:
die_due_to_fatal_exception("dispatching messages")
@property
def call_gate(self):
"""The call gate for making calls to this apartment"""
return self.__call_gate
def __get_fds_and_callbacks(self):
for fd_number in self.__read_channel.fd_numbers:
yield fd_number, self.__check_full
yield self.__message_queue_semaphore.fileno(), self.__check_messages_only
def __register_event_loop_callbacks(self, loop):
try:
for fd, callback in self.__get_fds_and_callbacks():
assert not loop.remove_reader(fd), \
"fd {!r} should not be registered".format(fd)
loop.add_reader(fd, callback)
except:
self.__unregister_event_loop_callbacks(loop)
raise
def __unregister_event_loop_callbacks(self, loop):
try:
for fd, _callback in self.__get_fds_and_callbacks():
loop.remove_reader(fd) # Noop if not registered
except:
die_due_to_fatal_exception("unrolling apartment attach")
@property
def number_attached(self):
"""Number of event loops attached to this apartment"""
return self.__number_attached
def attach(self):
"""Attach this apartment to this thread's current event loop.
Each attach must be paired with a detach. Attach and detach can
nest if balanced. If a thread exits while attached to an
apartment, it will be automatically detached.
"""
loop = maybe_this_thread_event_loop()
if not loop:
raise AssertionError("no event loop for thread")
info = maybe_get_attach_info(loop)
if info:
if info.apartment is not self:
raise AssertionError("wrong apartment")
assert info.count >= 1
info.count += 1
return info
info = AttachInfo(self)
with self.__lock:
if self.closed:
raise RuntimeError("can attach only to running apartments")
self.__number_attached += 1
try:
self.__register_event_loop_callbacks(loop)
except:
self.__number_attached -= 1
raise
assert not info.count
set_attach_info(loop, info)
info.count = 1
return info
def detach(self):
"""Remove association between current event loop and apartment"""
loop = maybe_this_thread_event_loop()
if not loop:
raise AssertionError("no event loop for thread")
info = maybe_get_attach_info(loop)
if not info:
raise AssertionError("no apartment attached to event loop")
if info.apartment is not self:
raise AssertionError("wrong apartment")
assert info.count >= 1
if info.count > 1:
info.count -= 1
return
callbacks = ()
with self.__lock:
if self.__number_attached == 1:
callbacks = tuple(self.__weak_last_detach_callbacks)
self.__weak_last_detach_callbacks.clear()
if callbacks:
if loop.is_running():
for callback in callbacks:
callback()
else:
async def _run_callbacks():
for callback in callbacks:
callback()
loop.run_until_complete(_run_callbacks())
assert info.count == 1
with self.__lock:
set_attach_info(loop, None)
self.__unregister_event_loop_callbacks(loop)
assert not self.closed
assert self.__number_attached >= 1
self.__number_attached -= 1
info.count = 0
@contextmanager
def attached(self):
"""Convenience context manager for attaching"""
self.attach()
try:
yield self
finally:
self.detach()
def _do_close(self):
with self.__lock:
# The attach check synchronizes the call mechanism against
# detach below, almost like a big reader-writer lock.
if self.__number_attached:
raise AssertionError(
"cannot close apartment with attached event loops")
# Shut down the socket and drain the event queue so that we
# correctly process any incoming reference counts.
self.__call_gate.channel.shutdown(SHUT_WR)
while True:
try:
if not self.__read_message_nonblock():
break
except PeerDiedException:
break
while self.__message_queue:
self.__message_queue.pop().apartment_quit_rundown()
# We're not going to resolve these futures
for future in list(self.__pending_calls.values()):
future.set_exception(ApartmentQuitError())
assert not self.__pending_calls # Mutated by future callbacks
# Make sure all death callbacks disappear
for death_key in tuple(self.__death_callback_keys):
get_process_resmanc().remove_death_callback(death_key)
death_key = None
assert not self.__death_callback_keys
self.__owned.clear()
self.__read_channel.close()
del self.__handle
super()._do_close()
class ThreadPoolQuittingError(RuntimeError):
"""Exception raised when a thread tries to join a quitting thread pool"""
class ThreadPool(ClosingContextManager, CannotPickle):
"""Owns an apartment made of threads"""
__init_done = False
def __init__(self, nr_threads=0, name="tp"):
self.__monitor = threading.Condition(threading.Lock())
self.__apartment = ApartmentFactory(name).make_apartment()
self.__threads = {}
self.__state = self._State.RUNNING
self.__init_done = True
try:
for _ in range(nr_threads):
self.add_thread()
except:
self.close()
raise
@property
def name(self):
"""Name of this thread pool"""
return self.apartment.name
@property
def call_gate(self):
"""Object for calling into apartment"""
return self.apartment.call_gate
@property
def fn(self):
"""Shortcut for call_gate.fn"""
return self.call_gate.fn
@property
def apartment(self):
"""The apartment to which threadpool threads attach"""
return self.__apartment
def add_thread(self):
"""Add a thread to the apartment"""
init_future = ChainableFuture()
thread = threading.Thread(
target=self.__run_pool_thread,
args=[[self, init_future]],
daemon=True,
name="{}/{}".format(self.name, len(self.__threads)),
)
thread.start()
return init_future.result()
def __thread_init(self, args):
self_ref, init_future = args
args.clear() # Break reference cycle
loop = None
set_loop = False
locked = False
try:
thread = threading.current_thread()
assert self_ref is self
assert not maybe_this_thread_event_loop()
self.__monitor.acquire()
locked = True
if self.__state is not self._State.RUNNING:
raise ThreadPoolQuittingError
assert thread not in self.__threads
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
set_loop = True
self.__apartment.attach()
self.__threads[thread] = loop
init_future.set_result(None)
self.__monitor.release()
return True
except BaseException as ex:
if set_loop:
asyncio.set_event_loop(None)
if loop:
loop.close()
if locked:
self.__monitor.release()
init_future.set_exception(ex)
return False
@staticmethod
def __run_pool_thread(args):
try:
# pylint: disable=protected-access
if args[0].__thread_init(args):
maybe_this_thread_event_loop().run_forever()
loop = maybe_this_thread_event_loop()
maybe_get_attach_info(loop).apartment.detach()
asyncio.set_event_loop(None)
loop.close()
except:
die_due_to_fatal_exception("running ThreadPool worker")
@staticmethod
def __thread_kill():
maybe_running_event_loop().stop()
def __kill_thread_gracefully(self, thread):
assert thread in self.__threads
self.__threads[thread].call_soon_threadsafe(self.__thread_kill)
thread.join()
del self.__threads[thread]
def _do_close(self):
# pylint: disable=redefined-variable-type
with self.__monitor:
try:
state = self.__state
except AttributeError:
return # Can happen during finalization
if state is self._State.QUIT:
return
if state is self._State.QUITTING:
self.__monitor.wait_for(lambda: self.__state is self._State.QUIT)
return
self.__state = self._State.QUITTING
while self.__threads:
self.__kill_thread_gracefully(first(self.__threads))
with self.__monitor:
self.__apartment.close()
del self.__apartment
self.__state = self._State.QUIT
self.__monitor.notify_all()
super()._do_close()
def __del__(self):
if self.__init_done:
self.close()
class _State(Enum):
RUNNING = 0
QUITTING = 1
QUIT = 2
def proxy_for(obj):
"""Shortcut for making a proxy that lives in the current apartment."""
if not isinstance(obj, Proxy):
obj = Apartment.current().make_proxy(obj)
return obj
def unproxy(proxy):
"""Get bare object behind proxy.
PROXY must be owned by the current apartment.
"""
if isinstance(proxy, Proxy):
return Apartment.current().unproxy(proxy)
return proxy
class AutoUnproxy(object):
"""Magically unpickles as unproxied object"""
def __init__(self, proxy):
self.__proxy = the(Proxy, proxy)
def __reduce__(self):
return unproxy, (self.__proxy,)
auto_unproxy = AutoUnproxy # Alias # pylint: disable=invalid-name
class ApartmentAffinityMixin(SafeClosingObject):
"""Mixin for tying object to an apartment"""
def __init__(self):
super().__init__()
self._apartment = Apartment.current()
self.__close_callback = weak_bind(self.close)
self._apartment.add_weak_last_detach_callback(self.__close_callback)
def _check_apartment(self):
assert Apartment.current() is self._apartment
return True
def _do_close(self):
self._check_apartment()
self._apartment.remove_weak_last_detach_callback(self.__close_callback)
self._apartment = None
super()._do_close()