blob: bf76dfebd6a8ecaa8456f411b3c5362b266a3d48 [file] [log] [blame]
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
# long as they are in the same thread with same magic. This is not a supported
# use case. So, the gRPC Python Async Stack should use a single event loop
# picked by "init_grpc_aio".
cdef object _grpc_aio_loop # asyncio.AbstractEventLoop
cdef int64_t _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
cdef object _grpc_initialization_lock = threading.Lock()
class AsyncIOEngine(enum.Enum):
DEFAULT = 'default'
CUSTOM_IO_MANAGER = 'custom'
POLLER = 'poller'
def init_grpc_aio():
global _grpc_aio_initialized
global _grpc_aio_loop
global _event_loop_thread_ident
global grpc_aio_engine
with _grpc_initialization_lock:
# Marks this function as called
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Picks the engine for gRPC AsyncIO Stack
for engine_type in AsyncIOEngine:
if engine_type.value == _GRPC_ASYNCIO_ENGINE:
grpc_aio_engine = engine_type
break
if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
_event_loop_thread_ident = threading.current_thread().ident
if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
# Activates asyncio IO manager.
# NOTE(lidiz) Custom IO manager must be activated before the first
# `grpc_init()`. Otherwise, some special configurations in Core won't
# pick up the change, and resulted in SEGFAULT or ABORT.
install_asyncio_iomgr()
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(False)
# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
else:
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
def grpc_aio_loop():
"""Returns the one-and-only gRPC Aio event loop."""
return _grpc_aio_loop
def grpc_schedule_coroutine(object coro):
"""Thread-safely schedules coroutine to gRPC Aio event loop.
If invoked within the same thread as the event loop, return an
Asyncio.Task. Otherwise, return a concurrent.futures.Future (the sync
Future). For non-asyncio threads, sync Future objects are probably easier
to handle (without worrying other thread-safety stuff).
"""
if _event_loop_thread_ident != threading.current_thread().ident:
return asyncio.run_coroutine_threadsafe(coro, _grpc_aio_loop)
else:
return _grpc_aio_loop.create_task(coro)
def grpc_call_soon_threadsafe(object func, *args):
# TODO(lidiz) After we are confident, we can drop this assert. Otherwsie,
# we should limit this function to non-grpc-event-loop thread.
assert _event_loop_thread_ident != threading.current_thread().ident
return _grpc_aio_loop.call_soon_threadsafe(func, *args)