| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Event loop""" |
| |
| # TODO: optimize this mechanism |
| # |
| # - Use shared memory to manage reference counts instead of constantly chatting |
| # with resource manager |
| # |
| # - Use persistent IO buffers. |
| # |
| # - Benchmark RTTs. |
| |
| import threading |
| import logging |
| import select |
| import asyncio |
| import weakref |
| |
| from concurrent.futures import CancelledError |
| from functools import partial |
| from itertools import count, chain |
| from enum import Enum |
| from contextlib import contextmanager |
| from socket import SHUT_WR |
| |
| from cytoolz import first |
| |
| from .channel import ( |
| Channel, |
| MessageSendContext, |
| ) |
| |
| from .shm import ( |
| PeerDiedException, |
| ResourceRecvContext, |
| ResourceSendingMessageContextMixin, |
| SendMsgError, |
| SharedObject, |
| ShmSharedMemorySendContextMixin, |
| _assert_valid_oid, |
| cache_object_until_death, |
| get_process_resmanc, |
| ) |
| |
| from .util import ( |
| CannotPickle, |
| ChainableFuture, |
| ClosingContextManager, |
| SafeClosingObject, |
| cached_property, |
| decode_exception, |
| die_due_to_fatal_exception, |
| encode_exception, |
| maybe_running_event_loop, |
| maybe_this_thread_event_loop, |
| running_event_loop, |
| the, |
| tls, |
| weak_bind, |
| ) |
| |
| from .waitsem import WaitableSemaphore |
| |
| log = logging.getLogger(__name__) |
| |
| class MessageUndeliverableError(RuntimeError): |
| """Message could not be delivered to destination""" |
| pass |
| |
| class NoSuchObjectError(RuntimeError): |
| """Named object does not exist""" |
| pass |
| |
| class NoSuchMethodError(RuntimeError): |
| """Method in object does not exist""" |
| pass |
| |
| class ApartmentQuitError(RuntimeError): |
| """Operation on an apartment that's been closed""" |
| pass |
| |
| def _do_call_oneway(call_gate, call_info, ret_filter): |
| assert not ret_filter |
| try: |
| return call_gate.send_message(OneWayMessage(call_info)) |
| except PeerDiedException: |
| log.debug("failed to send one-way message", exc_info=True) |
| |
| def _do_call_async(call_gate, call_info, ret_filter): |
| apartment = Apartment.current() |
| future, reply_id = apartment.make_reply_future(call_gate) |
| future.async_death = True |
| if not future.done(): |
| try: |
| call_gate.send_message( |
| TwoWayMessage(apartment.call_gate, |
| reply_id, |
| (), |
| ret_filter, |
| call_info)) |
| except Exception as ex: |
| future.set_exception(ex) |
| return future |
| |
| def _do_call_sync(call_gate, call_info, ret_filter): |
| apartment = Apartment.current() |
| future, reply_id = apartment.make_reply_future(call_gate) |
| assert not future.async_death |
| if not future.done(): |
| try: |
| call_gate.send_message( |
| TwoWayMessage(apartment.call_gate, |
| reply_id, |
| tls.active_calls, |
| ret_filter, |
| call_info)) |
| apartment.wait_for_reply(reply_id) |
| except Exception as ex: |
| future.set_exception(ex) |
| assert future.done() |
| return future.result() |
| |
| # TODO(dancol): native support for AIO futures? |
| _do_call_aio = _do_call_async |
| |
| class FunctionProxy(object): |
| """Proxy for a function""" |
| def __init__(self, function, call_gate): |
| assert isinstance(call_gate, CallGate) |
| self.__function = function |
| self.__call_gate = call_gate |
| self.__ret_filter = None |
| self.__do_call = _do_call_sync |
| |
| def set_ret_filter(self, ret_filter): |
| """Set the return filter for this call.""" |
| self.__ret_filter = ret_filter |
| return self |
| |
| def proxy_ret(self): |
| """Return a proxy for this call's real return value""" |
| return self.set_ret_filter(proxy_for) |
| |
| @property |
| def call_oneway(self): |
| """Set up a one-way call""" |
| self.__do_call = _do_call_oneway |
| return self |
| |
| @property |
| def call_async(self): |
| """Set up an asynchronous call yielding a regular future""" |
| self.__do_call = _do_call_async |
| return self |
| |
| @property |
| def call_sync(self): |
| """Set up a synchronous blocking call""" |
| self.__do_call = _do_call_sync |
| return self |
| |
| |
| @property |
| def call_aio(self): |
| """Set up an asynchronous call yielding an asyncio future""" |
| self.__do_call = _do_call_aio |
| return self |
| |
| def __call__(self, *args, **kwargs): |
| return self.__do_call(self.__call_gate, |
| (self.__function, args, kwargs), |
| self.__ret_filter) |
| |
| class Proxy(SharedObject): |
| """Proxy for an Apartment-owned object""" |
| |
| def __init__(self, proxied_cls, call_gate): |
| assert isinstance(proxied_cls, type) |
| assert isinstance(call_gate, CallGate) |
| self.__proxied_cls = proxied_cls |
| self.__call_gate = call_gate |
| |
| @staticmethod |
| def get_proxied_class(proxy): |
| """Return the type of the object proxied""" |
| assert isinstance(proxy, Proxy) |
| return proxy.__proxied_cls # pylint: disable=protected-access |
| |
| @staticmethod |
| def get_call_gate(proxy): |
| """Return the call gateway used to call methods""" |
| assert isinstance(proxy, Proxy) |
| return proxy.__call_gate # pylint: disable=protected-access |
| |
| def __getattr__(self, name): |
| if name.startswith("__"): |
| raise AttributeError(name) # No attribute access! |
| return self.__call_gate.fn((self, name)) |
| |
| def __call__(self, *args, **kwargs): # Hack: should be generic! |
| return self.__call_gate.fn((self, "__call__"))(*args, **kwargs) |
| |
| def __repr__(self): |
| return "<Proxy for {!r}>".format(self.__proxied_cls) |
| |
| def __str__(self): |
| return repr(self) |
| |
| _resman_send_eager = True |
| |
| class Message(object): |
| """Message sent between apartments""" |
| |
| def dispatch(self, apartment): |
| """Handle the message""" |
| raise NotImplementedError("abstract") |
| |
| def apartment_quit_rundown(self): |
| """Called when we're tearing down the apartment""" |
| |
| class RunAsyncFunctionMessage(Message): |
| """Synthetic message implementing Apartment.run_async()""" |
| def __init__(self, function): |
| self.function = function |
| self.future = ChainableFuture() |
| self.future.set_running_or_notify_cancel() |
| |
| def dispatch(self, _apartment): |
| try: |
| result = self.function() |
| except BaseException as ex: |
| self.future.set_exception(ex) |
| else: |
| self.future.set_result(result) |
| |
| def apartment_quit_rundown(self): |
| self.future.set_exception(ApartmentQuitError()) |
| |
| class OneWayMessage(Message): |
| """Message for function call without reply""" |
| def __init__(self, call): |
| Message.__init__(self) |
| self.call = call |
| |
| def dispatch(self, apartment): |
| """Handle the message""" |
| result = apartment.do_call(self.call) |
| if asyncio.iscoroutine(result): |
| asyncio.ensure_future(result) # Schedule by side effect |
| |
| class TwoWayMessage(Message): |
| """Message for function call with reply""" |
| def __init__(self, reply_call_gate, # pylint: disable=too-many-arguments |
| reply_id, |
| active_calls, |
| ret_filter, |
| call): |
| Message.__init__(self) |
| assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero |
| assert isinstance(active_calls, tuple) |
| self.reply_call_gate = reply_call_gate |
| self.reply_id = reply_id |
| self.active_calls = active_calls |
| self.ret_filter = ret_filter |
| self.call = call |
| |
| def __repr__(self): |
| return "<TwoWayMessage rcg={!r} rid={!r} ac={!r} rf={!r} c={!r}>".format( |
| self.reply_call_gate, |
| self.reply_id, |
| self.active_calls, |
| self.ret_filter, |
| self.call) |
| |
| def __send_reply(self, message): |
| try: |
| self.reply_call_gate.send_message(message) |
| except PeerDiedException: |
| log.debug("dropping two-way message reply because sender died") |
| |
| def __send_error_reply(self, ex): |
| self.__send_reply(ErrorReplyMessage(self.reply_id, encode_exception(ex))) |
| |
| def __send_success_reply(self, result): |
| if self.ret_filter: |
| result = self.ret_filter(result) |
| self.__send_reply(SuccessReplyMessage(self.reply_id, result)) |
| |
| def __on_task_done(self, future): |
| if future.cancelled(): |
| self.__send_error_reply(CancelledError()) |
| elif future.exception(): |
| self.__send_error_reply(future.exception()) |
| else: |
| self.__send_success_reply(future.result()) |
| |
| def dispatch(self, apartment): |
| # pylint: disable=redefined-variable-type |
| try: |
| call_id = (self.reply_call_gate.oid, self.reply_id) |
| with tls.set_active_calls(tls.active_calls + |
| self.active_calls + |
| (call_id,)): |
| result = apartment.do_call(self.call) |
| except Exception as ex: |
| self.__send_error_reply(ex) |
| else: |
| if asyncio.iscoroutine(result): |
| # TODO(dancol): cancel future and send reply on detach! |
| asyncio.ensure_future(result).add_done_callback(self.__on_task_done) |
| else: |
| self.__send_success_reply(result) |
| |
| class SuccessReplyMessage(Message): |
| """Message for indicating a two-way message's success""" |
| def __init__(self, reply_id, value): |
| Message.__init__(self) |
| assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero |
| self.reply_id = reply_id |
| self.value = value |
| |
| def dispatch(self, apartment): |
| future = apartment.get_call_future(self.reply_id) |
| if future: |
| future.set_result(self.value) |
| else: |
| if __debug__: # pylint: disable=else-if-used |
| log.debug("reply id %s: success but now irrelevant", |
| self.reply_id) |
| |
| class ErrorReplyMessage(Message): |
| """Message for indicating a two-way message's failure""" |
| def __init__(self, reply_id, saved_exc): |
| Message.__init__(self) |
| assert the(int, reply_id) > 0 # pylint: disable=compare-to-zero |
| self.reply_id = reply_id |
| self.saved_exc = saved_exc |
| |
| def dispatch(self, apartment): |
| future = apartment.get_call_future(self.reply_id) |
| if future: |
| future.set_exception(decode_exception(self.saved_exc)) |
| else: |
| if __debug__: # pylint: disable=else-if-used |
| log.debug("reply id %s: failed but now irrelevant", |
| self.reply_id, exc_info=decode_exception(self.saved_exc)) |
| |
| class ApartmentHandle(SharedObject): |
| """IPC-capable handle to an Apartment |
| |
| Useful mostly for death notifications. |
| """ |
| def __init__(self, name): |
| super().__init__() |
| self.name = the(str, name) |
| |
| class ApartmentFactory(object): |
| """Factory for making Apartment objects and their CallGate instances. |
| |
| ApartmentFactory instances can be sent between processes. Apartment |
| objects themselves cannot be. Call gates can also be sent between |
| processes, of course. |
| |
| An ApartmentFactory can be used to produce only one Apartment. |
| """ |
| def __init__(self, name="apartment", process_resmanc_oid=None): |
| """Make an ApartmentFactory.""" |
| self.__handle = ApartmentHandle(name) |
| self.__read_channel, write_channel = Channel.make_pair(duplex=False) |
| self.__call_gate = CallGate( |
| write_channel, |
| self.__handle.oid, |
| process_resmanc_oid or get_process_resmanc().id) |
| |
| @property |
| def call_gate(self): |
| """The call gate for making calls to the new apartment""" |
| return self.__call_gate |
| |
| def make_apartment(self): |
| """Make the Apartment for this factory""" |
| assert get_process_resmanc().id == self.__call_gate.process_oid, \ |
| ("the host process for an apartment must be specified at factory " |
| "creation time") |
| apartment = Apartment(self.__handle, |
| self.__read_channel, |
| self.__call_gate) |
| del self.__handle |
| del self.__read_channel |
| return apartment |
| |
| class CallGate(SharedObject): |
| """Handle for making function calls to an apartment""" |
| |
| # The CallGate object is distinct from the Apartment proper because |
| # we can share CallGate instances across processes (via shm.py |
| # magic), but we can't share full Apartment objects, which have |
| # process affinity and which would be kept alive indefinitely by |
| # sharing in any case. (A CallGate amounts to a weak reference to |
| # an Apartment.) |
| |
| def __init__(self, write_channel, apartment_oid, process_oid): |
| assert isinstance(write_channel, Channel) |
| assert _assert_valid_oid(apartment_oid) |
| assert _assert_valid_oid(process_oid) |
| self.__write_channel = write_channel |
| self.__apartment_oid = apartment_oid |
| self.__process_oid = process_oid |
| |
| @property |
| def channel(self): |
| """Channel object to which we write commands""" |
| return self.__write_channel |
| |
| @property |
| def apartment_oid(self): |
| """The OID of the apartment that receives calls made through us""" |
| return self.__apartment_oid |
| |
| @property |
| def process_oid(self): |
| """The OID of the process that owns the apartment""" |
| return self.__process_oid |
| |
| def send_message(self, message): |
| """Send a message to the apartment. |
| |
| Internal use only. |
| """ |
| # TODO(dancol): this call can block. sometimes that's okay, but |
| # we really should have an all-asyncio message path to avoid any |
| # possible deadlock in replies. |
| assert isinstance(message, Message) |
| self.__write_channel.send(message, ApartmentSendContext.to(self)) |
| |
| def fn(self, function): |
| """Retrieve an object for making function calls""" |
| return FunctionProxy(function, self) |
| |
| def _resman_after_pull_hook(self): |
| cache_object_until_death(self, self.apartment_oid) |
| |
| def poll_for_send(self): |
| """Wait until we can write a message""" |
| self.__write_channel.poll_for_send() |
| |
| class AttachInfo(object): |
| """Per-event-loop apartment attachment information""" |
| def __init__(self, apartment): |
| self.apartment = the(Apartment, apartment) |
| self.count = 0 |
| |
| def maybe_get_attach_info(loop): |
| """Get event loop LOOP's apartment info object""" |
| assert isinstance(loop, asyncio.AbstractEventLoop) |
| return getattr(loop, "_apartment_attach", None) |
| |
| def set_attach_info(loop, apartment): |
| """Set the apartment associated with the event loop""" |
| assert isinstance(loop, asyncio.AbstractEventLoop) |
| if apartment: |
| assert not maybe_get_attach_info(loop) |
| setattr(loop, "_apartment_attach", the(AttachInfo, apartment)) |
| else: |
| assert maybe_get_attach_info(loop) |
| delattr(loop, "_apartment_attach") |
| |
| class ApartmentSendContext( |
| ShmSharedMemorySendContextMixin, |
| MessageSendContext, |
| ResourceSendingMessageContextMixin, |
| ): |
| """Send context for transferring resource-bearing messages via resman""" |
| |
| def __init__(self, call_gate): |
| super().__init__() |
| self.__call_gate = the(CallGate, call_gate) |
| |
| def sendmsg(self, pickle_data, block, channel): |
| if not self.resources: |
| return super().sendmsg(pickle_data, block, channel) |
| try: |
| while not get_process_resmanc().cg_sendmsg_nonblock( |
| self.__call_gate.oid, |
| bytes(pickle_data), |
| self.fdhs, |
| self.resources): |
| self.__call_gate.poll_for_send() |
| except SendMsgError as ex: |
| raise ex.__cause__ or ex |
| |
| @classmethod |
| def to(cls, call_gate): |
| """Make a factory function""" |
| return partial(cls, call_gate) |
| |
| class Apartment(ClosingContextManager, CannotPickle): |
| """A habitat for objects. |
| |
| The Apartment object is basically the same as the COM concept of the |
| same name; it allows for cross-process coherent object method |
| calling. To create an object usable from multiple processes, create |
| the object, then call proxy_for(that_object). You can then send the |
| resulting Proxy object to other processes via the low-level shm |
| mechanism, pickle, and so on, and these other processes can call |
| methds on that proxy and get replies. |
| |
| The object, at this point, "lives" inside the apartment that was |
| active at the point proxy_for was called. All calls to object |
| methods (via the proxy) take place in threads attached to the |
| apartment that owns the object. If a method (via a proxy) is called |
| from outside the object's home apartment, the sending apartment |
| sends a message to the object's home apartment and waits for |
| a reply. |
| |
| Each apartment is associated with an asyncio event loop, which takes |
| care of actual IO multiplexing. We used to do the IO ourselves, but |
| it turns out that asyncio does an adequate job already. |
| |
| [1] shm preserves object identity across process boundaries |
| """ |
| |
| @staticmethod |
| def current(): |
| """Return current thread's running apartment |
| |
| It is invalid to call this function unless we're actually running |
| an event loop associated with an apartment. |
| """ |
| # pylint: disable=protected-access |
| return running_event_loop()._apartment_attach.apartment |
| |
| @staticmethod |
| def maybe_current(): |
| """Return running apartment or None""" |
| loop = maybe_running_event_loop() |
| if loop: |
| return getattr(loop, "_apartment_attach", None) |
| return None |
| |
| def __init__(self, # pylint: disable=too-many-arguments |
| handle, |
| read_channel, |
| call_gate): |
| assert isinstance(read_channel, Channel) |
| |
| self.__handle = the(ApartmentHandle, handle) |
| self.__read_channel = read_channel |
| self.__call_gate = call_gate |
| self.__message_queue_semaphore = WaitableSemaphore() |
| |
| # Protects everything below |
| self.__lock = threading.Lock() |
| |
| self.__number_attached = 0 |
| self.__weak_last_detach_callbacks = weakref.WeakSet() |
| |
| # Tracks objects that live here |
| self.__owned = {} |
| |
| # Pending futures |
| self.__reply_id_sequence = count(1) |
| self.__pending_calls = {} |
| self.__message_queue = [] |
| |
| # All death callbacks we've added, kept so that we can remove |
| # callbacks from resmanc when we close this apartment. |
| self.__death_callback_keys = weakref.WeakSet() |
| |
| # For blocking calls |
| self.__wsem_by_reply_id = {} |
| self.__wsem_free_list = [] |
| |
| @cached_property |
| def name(self): |
| """The name of this apartment""" |
| return self.__handle.name |
| |
| @cached_property |
| def id(self): |
| """The ID of the apartment handle""" |
| return self.__handle.oid |
| |
| def call_async(self, fn): |
| """Call function FN inside the apartment |
| |
| Safe to call from any thread. Bypasses CallGate pickle, reference |
| tracking, and so on. |
| |
| Return a future supplying the result of that call. |
| """ |
| return self.__enqueue_message(RunAsyncFunctionMessage(fn)).future |
| |
| def __read_message_nonblock(self): |
| try: |
| return self.__read_channel.recv(ResourceRecvContext, block=False) |
| except BlockingIOError: |
| return None |
| |
| def add_weak_last_detach_callback(self, callback): |
| """Add a callback run when last thread detaches""" |
| self.__weak_last_detach_callbacks.add(callback) |
| |
| def remove_weak_last_detach_callback(self, callback): |
| """Remove a callback added with add_weak_last_detach_callback""" |
| self.__weak_last_detach_callbacks.discard(callback) |
| |
| def __add_death_callback(self, oid, callback): |
| key = get_process_resmanc().add_death_callback(oid, callback) |
| self.__death_callback_keys.add(key) |
| return key |
| |
| def make_proxy(self, obj): |
| """Make a proxy for calling OBJ via this apartment""" |
| assert Apartment.current() is self |
| proxy = Proxy(type(obj), self.__call_gate) |
| proxy_oid = proxy.oid |
| assert proxy_oid not in self.__owned |
| self.__owned[proxy_oid] = obj |
| try: |
| def _on_proxy_death(): |
| with self.__lock: |
| if not self.closed: |
| del self.__owned[proxy_oid] |
| self.__add_death_callback(proxy_oid, _on_proxy_death) |
| except: |
| del self.__owned[proxy.oid] |
| raise |
| return proxy |
| |
| def unproxy(self, proxy): |
| """Return object behind proxy""" |
| assert Apartment.current() is self |
| assert isinstance(proxy, Proxy) |
| return self.__owned[proxy.oid] |
| |
| def make_reply_future(self, call_gate): |
| """Make a future that we resolve when we receive a reply. |
| |
| Return a pair (FUTURE, REPLY_ID) where FUTURE is the future |
| resolved upon reply and REPLY_ID is the ID that other apartments |
| should include in their messages in order to activate FUTURE. |
| """ |
| assert isinstance(call_gate, CallGate) |
| assert Apartment.current() is self, "must be attached" |
| |
| # No synchronization necessary here: the counter increment is |
| # atomic, and the reply_id is unique, so we can't collide on |
| # self.__pending_calls slots. |
| |
| reply_future = ChainableFuture() |
| reply_future.set_running_or_notify_cancel() # Cancel not supported |
| reply_future.async_death = False |
| reply_id = next(self.__reply_id_sequence) |
| |
| def _on_call_target_death(): |
| def _set_exception(): |
| reply_future.set_exception(ApartmentQuitError()) |
| if reply_future.async_death: |
| self.call_async(_set_exception) |
| else: |
| _set_exception() |
| |
| # N.B. _on_call_target_death can run before this call even |
| # returns. If it does, _on_reply_future_done will also |
| # run immediately. |
| death_key = self.__add_death_callback(call_gate.apartment_oid, |
| _on_call_target_death) |
| |
| self.__pending_calls[reply_id] = reply_future |
| def _on_reply_future_done(future): |
| assert future is reply_future |
| get_process_resmanc().remove_death_callback(death_key) |
| del self.__pending_calls[reply_id] |
| wsem = self.__wsem_by_reply_id.get(reply_id) |
| if wsem: |
| wsem.release() # Wake synchronous waiter |
| reply_future.add_done_callback(_on_reply_future_done) |
| |
| return reply_future, reply_id |
| |
| def __enqueue_message(self, message): |
| """Add the message to the queue""" |
| with self.__lock: |
| if self.closed: |
| message.apartment_quit_rundown() |
| else: |
| self.__message_queue.append(message) |
| self.__message_queue_semaphore.release() |
| return message |
| |
| def __get_wsem_locked(self): |
| if self.__wsem_free_list: |
| wsem = self.__wsem_free_list.pop() |
| else: |
| wsem = WaitableSemaphore() |
| assert not wsem.get_value() |
| return wsem |
| |
| @contextmanager |
| def __wsem_for_reply_id(self, reply_id): |
| """Signal a waitable semaphore when we get a reply. |
| |
| Yield the waitable semaphore or None if we've already processed |
| the reply by the time we try to make the wsem arrangement. |
| """ |
| with self.__lock: |
| if reply_id in self.__pending_calls: |
| wsem = self.__get_wsem_locked() |
| assert reply_id not in self.__wsem_by_reply_id |
| self.__wsem_by_reply_id[reply_id] = wsem |
| else: |
| wsem = None |
| try: |
| yield wsem |
| finally: |
| if wsem: |
| with self.__lock: |
| wsem = self.__wsem_by_reply_id.pop(reply_id) |
| wsem.try_acquire_all() # Reset in case we were signaled |
| assert not wsem.get_value() |
| self.__wsem_free_list.append(wsem) |
| |
| def wait_for_reply(self, reply_id): |
| """Wait for a response for a message""" |
| # The operation of this routine is tricky. Keep in mind that |
| # apartments can be multithreaded, so multiple threads may be pulling |
| # from the dispatch queue. Also keep in mind that we track call |
| # causality and want to process on this thread (or any other) |
| # "callback" messages having to do with any blocking calls |
| # pending, so we want to keep pumping _some_ messages while we |
| # wait for a reply. |
| # |
| # The simplest way of dealing with various TOCTOU races between |
| # checking for a reply and poll() is to associate a waitable |
| # semaphore with the reply we're expecting. _Any_ thread that |
| # receives a reply ups the semaphore. Creating a waitable |
| # semaphore is expensive, so we keep a free list. |
| # |
| # While we wait, we retrieve messages and see whether each one |
| # might be relevant to the current wait, as indicated by the |
| # _predicate function. We enqueue any messages that aren't |
| # immediately relevant. Some other thread doing the same thing may |
| # have already enqueued a message that we care about, so we scan |
| # the queue as well as freshly-arriving messages. |
| assert Apartment.current() is self |
| my_call_id = (self.__call_gate.oid, reply_id) |
| |
| def _predicate(message): |
| """Return true to dispatch MESSAGE on the current thread""" |
| if isinstance(message, TwoWayMessage): |
| if my_call_id in message.active_calls: |
| return True # Callback |
| if (message.reply_call_gate is self.__call_gate |
| and message.reply_id == reply_id): |
| return True # Self-call |
| if (isinstance(message, (SuccessReplyMessage, ErrorReplyMessage)) |
| and message.reply_id in self.__wsem_by_reply_id): |
| return True # This is a reply to blocking call |
| return False # Enqueue it |
| |
| # TODO(dancol): preserve callback stack nesting: force synchronous |
| # callbacks to run on this thread, not some other thread in |
| # this apartment. |
| |
| with self.__wsem_for_reply_id(reply_id) as wsem: |
| if not wsem: |
| return # Already completed |
| # TODO(dancol): what happens if we're part of a pool and the pool |
| # wants to exit? |
| poll = select.poll() |
| # Note that we *don't* watch the message queue semaphore here. |
| # If we did, we'd busy-wait. We do want to know when our peer |
| # dies though, so we have a special death callback that tickles |
| # wsem directly if that happens. |
| for wait in chain([wsem.fileno()], self.__read_channel.fd_numbers): |
| poll.register(wait, select.POLLIN) |
| while True: |
| if wsem.get_value(): |
| return # Have our value. |
| self.__dispatch_some_messages(_predicate) |
| message = self.__read_message_nonblock() |
| if message: |
| if _predicate(message): |
| message.dispatch(self) |
| else: |
| self.__enqueue_message(message) |
| poll.poll() |
| |
| def __dispatch_some_messages(self, predicate): |
| """Scan the message queue and dispatch some messages |
| |
| Dispatch on the current thread any messages for which PREDICATE |
| returns true. |
| """ |
| to_dispatch = [] |
| with self.__lock: |
| i = 0 |
| message_queue = self.__message_queue |
| message_queue_semaphore = self.__message_queue_semaphore |
| assert message_queue_semaphore.get_value() == len(message_queue) |
| while i < len(message_queue): |
| if predicate(message_queue[i]): |
| # Here, we're using the try_acquire as a plain decrement in |
| # order to keep the semaphore and message queue lengths |
| # consistent as we pluck messages out of the queue. |
| acq = message_queue_semaphore.try_acquire() |
| assert acq, "checked count above" |
| to_dispatch.append(message_queue.pop(i)) |
| else: |
| i += 1 |
| assert message_queue_semaphore.get_value() == len(message_queue) |
| for message in to_dispatch: |
| message.dispatch(self) |
| |
| def do_call(self, call): |
| """Actually dispatch a call, resolving function to proxy if needed""" |
| # TODO(dancol): get rid of special case for method calls |
| assert Apartment.current() is self |
| function, args, kwargs = call |
| if isinstance(function, tuple): |
| proxy, method_name = function |
| try: |
| obj = self.__owned[proxy.oid] # Atomic |
| except KeyError: |
| raise NoSuchObjectError(repr(proxy)) |
| try: |
| function = getattr(obj, method_name) |
| except AttributeError: |
| raise NoSuchObjectError(method_name) |
| return function(*args, **kwargs) |
| |
| def get_call_future(self, reply_id): |
| """Get the future corresponding to REPLY_ID |
| |
| Return None if we're not waiting for any such reply. |
| """ |
| assert Apartment.current() is self |
| return self.__pending_calls.get(reply_id) |
| |
| def __pop_queued_message(self): |
| # Need to take the outer lock here to synchronize with |
| # __dispatch_some_messages's message queue scanning. try_acquire |
| # is cheap in the common case that we have an empty queue. |
| with self.__lock: |
| if self.__message_queue_semaphore.try_acquire(): |
| return self.__message_queue.pop(0) |
| return None |
| |
| def __check_full(self): |
| try: |
| self.__check_messages_only() |
| message = self.__read_message_nonblock() |
| if message: |
| message.dispatch(self) |
| except: |
| die_due_to_fatal_exception("dispatching messages") |
| |
| def __check_messages_only(self): |
| try: |
| assert Apartment.current() is self |
| while True: |
| message = self.__pop_queued_message() |
| if not message: |
| break |
| message.dispatch(self) |
| except: |
| die_due_to_fatal_exception("dispatching messages") |
| |
| @property |
| def call_gate(self): |
| """The call gate for making calls to this apartment""" |
| return self.__call_gate |
| |
| def __get_fds_and_callbacks(self): |
| for fd_number in self.__read_channel.fd_numbers: |
| yield fd_number, self.__check_full |
| yield self.__message_queue_semaphore.fileno(), self.__check_messages_only |
| |
| def __register_event_loop_callbacks(self, loop): |
| try: |
| for fd, callback in self.__get_fds_and_callbacks(): |
| assert not loop.remove_reader(fd), \ |
| "fd {!r} should not be registered".format(fd) |
| loop.add_reader(fd, callback) |
| except: |
| self.__unregister_event_loop_callbacks(loop) |
| raise |
| |
| def __unregister_event_loop_callbacks(self, loop): |
| try: |
| for fd, _callback in self.__get_fds_and_callbacks(): |
| loop.remove_reader(fd) # Noop if not registered |
| except: |
| die_due_to_fatal_exception("unrolling apartment attach") |
| |
| @property |
| def number_attached(self): |
| """Number of event loops attached to this apartment""" |
| return self.__number_attached |
| |
| def attach(self): |
| """Attach this apartment to this thread's current event loop. |
| |
| Each attach must be paired with a detach. Attach and detach can |
| nest if balanced. If a thread exits while attached to an |
| apartment, it will be automatically detached. |
| """ |
| loop = maybe_this_thread_event_loop() |
| if not loop: |
| raise AssertionError("no event loop for thread") |
| info = maybe_get_attach_info(loop) |
| if info: |
| if info.apartment is not self: |
| raise AssertionError("wrong apartment") |
| assert info.count >= 1 |
| info.count += 1 |
| return info |
| info = AttachInfo(self) |
| with self.__lock: |
| if self.closed: |
| raise RuntimeError("can attach only to running apartments") |
| self.__number_attached += 1 |
| try: |
| self.__register_event_loop_callbacks(loop) |
| except: |
| self.__number_attached -= 1 |
| raise |
| assert not info.count |
| set_attach_info(loop, info) |
| info.count = 1 |
| return info |
| |
| def detach(self): |
| """Remove association between current event loop and apartment""" |
| loop = maybe_this_thread_event_loop() |
| if not loop: |
| raise AssertionError("no event loop for thread") |
| info = maybe_get_attach_info(loop) |
| if not info: |
| raise AssertionError("no apartment attached to event loop") |
| if info.apartment is not self: |
| raise AssertionError("wrong apartment") |
| assert info.count >= 1 |
| if info.count > 1: |
| info.count -= 1 |
| return |
| callbacks = () |
| with self.__lock: |
| if self.__number_attached == 1: |
| callbacks = tuple(self.__weak_last_detach_callbacks) |
| self.__weak_last_detach_callbacks.clear() |
| if callbacks: |
| if loop.is_running(): |
| for callback in callbacks: |
| callback() |
| else: |
| async def _run_callbacks(): |
| for callback in callbacks: |
| callback() |
| loop.run_until_complete(_run_callbacks()) |
| assert info.count == 1 |
| with self.__lock: |
| set_attach_info(loop, None) |
| self.__unregister_event_loop_callbacks(loop) |
| assert not self.closed |
| assert self.__number_attached >= 1 |
| self.__number_attached -= 1 |
| info.count = 0 |
| |
| @contextmanager |
| def attached(self): |
| """Convenience context manager for attaching""" |
| self.attach() |
| try: |
| yield self |
| finally: |
| self.detach() |
| |
| def _do_close(self): |
| with self.__lock: |
| # The attach check synchronizes the call mechanism against |
| # detach below, almost like a big reader-writer lock. |
| if self.__number_attached: |
| raise AssertionError( |
| "cannot close apartment with attached event loops") |
| |
| # Shut down the socket and drain the event queue so that we |
| # correctly process any incoming reference counts. |
| self.__call_gate.channel.shutdown(SHUT_WR) |
| while True: |
| try: |
| if not self.__read_message_nonblock(): |
| break |
| except PeerDiedException: |
| break |
| |
| while self.__message_queue: |
| self.__message_queue.pop().apartment_quit_rundown() |
| |
| # We're not going to resolve these futures |
| for future in list(self.__pending_calls.values()): |
| future.set_exception(ApartmentQuitError()) |
| assert not self.__pending_calls # Mutated by future callbacks |
| |
| # Make sure all death callbacks disappear |
| for death_key in tuple(self.__death_callback_keys): |
| get_process_resmanc().remove_death_callback(death_key) |
| death_key = None |
| assert not self.__death_callback_keys |
| |
| self.__owned.clear() |
| self.__read_channel.close() |
| del self.__handle |
| super()._do_close() |
| |
| class ThreadPoolQuittingError(RuntimeError): |
| """Exception raised when a thread tries to join a quitting thread pool""" |
| |
| class ThreadPool(ClosingContextManager, CannotPickle): |
| """Owns an apartment made of threads""" |
| __init_done = False |
| def __init__(self, nr_threads=0, name="tp"): |
| self.__monitor = threading.Condition(threading.Lock()) |
| self.__apartment = ApartmentFactory(name).make_apartment() |
| self.__threads = {} |
| self.__state = self._State.RUNNING |
| self.__init_done = True |
| try: |
| for _ in range(nr_threads): |
| self.add_thread() |
| except: |
| self.close() |
| raise |
| |
| @property |
| def name(self): |
| """Name of this thread pool""" |
| return self.apartment.name |
| |
| @property |
| def call_gate(self): |
| """Object for calling into apartment""" |
| return self.apartment.call_gate |
| |
| @property |
| def fn(self): |
| """Shortcut for call_gate.fn""" |
| return self.call_gate.fn |
| |
| @property |
| def apartment(self): |
| """The apartment to which threadpool threads attach""" |
| return self.__apartment |
| |
| def add_thread(self): |
| """Add a thread to the apartment""" |
| init_future = ChainableFuture() |
| thread = threading.Thread( |
| target=self.__run_pool_thread, |
| args=[[self, init_future]], |
| daemon=True, |
| name="{}/{}".format(self.name, len(self.__threads)), |
| ) |
| thread.start() |
| return init_future.result() |
| |
| def __thread_init(self, args): |
| self_ref, init_future = args |
| args.clear() # Break reference cycle |
| loop = None |
| set_loop = False |
| locked = False |
| try: |
| thread = threading.current_thread() |
| assert self_ref is self |
| assert not maybe_this_thread_event_loop() |
| self.__monitor.acquire() |
| locked = True |
| if self.__state is not self._State.RUNNING: |
| raise ThreadPoolQuittingError |
| assert thread not in self.__threads |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| set_loop = True |
| self.__apartment.attach() |
| self.__threads[thread] = loop |
| init_future.set_result(None) |
| self.__monitor.release() |
| return True |
| except BaseException as ex: |
| if set_loop: |
| asyncio.set_event_loop(None) |
| if loop: |
| loop.close() |
| if locked: |
| self.__monitor.release() |
| init_future.set_exception(ex) |
| return False |
| |
| @staticmethod |
| def __run_pool_thread(args): |
| try: |
| # pylint: disable=protected-access |
| if args[0].__thread_init(args): |
| maybe_this_thread_event_loop().run_forever() |
| loop = maybe_this_thread_event_loop() |
| maybe_get_attach_info(loop).apartment.detach() |
| asyncio.set_event_loop(None) |
| loop.close() |
| except: |
| die_due_to_fatal_exception("running ThreadPool worker") |
| |
| @staticmethod |
| def __thread_kill(): |
| maybe_running_event_loop().stop() |
| |
| def __kill_thread_gracefully(self, thread): |
| assert thread in self.__threads |
| self.__threads[thread].call_soon_threadsafe(self.__thread_kill) |
| thread.join() |
| del self.__threads[thread] |
| |
| def _do_close(self): |
| # pylint: disable=redefined-variable-type |
| with self.__monitor: |
| try: |
| state = self.__state |
| except AttributeError: |
| return # Can happen during finalization |
| if state is self._State.QUIT: |
| return |
| if state is self._State.QUITTING: |
| self.__monitor.wait_for(lambda: self.__state is self._State.QUIT) |
| return |
| self.__state = self._State.QUITTING |
| while self.__threads: |
| self.__kill_thread_gracefully(first(self.__threads)) |
| with self.__monitor: |
| self.__apartment.close() |
| del self.__apartment |
| self.__state = self._State.QUIT |
| self.__monitor.notify_all() |
| super()._do_close() |
| |
| def __del__(self): |
| if self.__init_done: |
| self.close() |
| |
| class _State(Enum): |
| RUNNING = 0 |
| QUITTING = 1 |
| QUIT = 2 |
| |
| def proxy_for(obj): |
| """Shortcut for making a proxy that lives in the current apartment.""" |
| if not isinstance(obj, Proxy): |
| obj = Apartment.current().make_proxy(obj) |
| return obj |
| |
| def unproxy(proxy): |
| """Get bare object behind proxy. |
| |
| PROXY must be owned by the current apartment. |
| """ |
| if isinstance(proxy, Proxy): |
| return Apartment.current().unproxy(proxy) |
| return proxy |
| |
| class AutoUnproxy(object): |
| """Magically unpickles as unproxied object""" |
| def __init__(self, proxy): |
| self.__proxy = the(Proxy, proxy) |
| |
| def __reduce__(self): |
| return unproxy, (self.__proxy,) |
| |
| auto_unproxy = AutoUnproxy # Alias # pylint: disable=invalid-name |
| |
| class ApartmentAffinityMixin(SafeClosingObject): |
| """Mixin for tying object to an apartment""" |
| def __init__(self): |
| super().__init__() |
| self._apartment = Apartment.current() |
| self.__close_callback = weak_bind(self.close) |
| self._apartment.add_weak_last_detach_callback(self.__close_callback) |
| |
| def _check_apartment(self): |
| assert Apartment.current() is self._apartment |
| return True |
| |
| def _do_close(self): |
| self._check_apartment() |
| self._apartment.remove_weak_last_detach_callback(self.__close_callback) |
| self._apartment = None |
| super()._do_close() |