Improve readability
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
index 708a274..2b3be97 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
@@ -23,7 +23,7 @@
cdef object _poller
cdef object _poller_running
- cdef _polling(self)
+ cdef void _poll(self) except *
cdef class CallbackCompletionQueue(BaseCompletionQueue):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
index 85ae0c4..c09c276 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pyx.pxi
@@ -36,11 +36,11 @@
self._shutdown_completed = asyncio.get_event_loop().create_future()
self._poller = None
self._poller_running = asyncio.get_event_loop().create_future()
- self._poller = threading.Thread(target=self._polling_wrapper)
+ self._poller = threading.Thread(target=self._poll_wrapper)
self._poller.daemon = True
self._poller.start()
- cdef _polling(self):
+ cdef void _poll(self) except *:
cdef grpc_event event
cdef CallbackContext *context
cdef object waiter
@@ -53,7 +53,7 @@
NULL)
if event.type == GRPC_QUEUE_TIMEOUT:
- raise NotImplementedError()
+ raise AssertionError("Core should not return timeout error!")
elif event.type == GRPC_QUEUE_SHUTDOWN:
self._shutdown = True
grpc_call_soon_threadsafe(self._shutdown_completed.set_result, None)
@@ -64,8 +64,8 @@
<CallbackWrapper>context.callback_wrapper,
event.success)
- def _polling_wrapper(self):
- self._polling()
+ def _poll_wrapper(self):
+ self._poll()
async def shutdown(self):
grpc_completion_queue_shutdown(self._cq)
@@ -103,4 +103,4 @@
elif grpc_aio_engine is AsyncIOEngine.POLLER:
return PollerCompletionQueue()
else:
- raise ValueError('Unexpected engine type [%s]' % grpc_aio_engine)
+ raise ValueError('Unsupported engine type [%s]' % grpc_aio_engine)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
index 27a37df..bf76dfe 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
@@ -15,13 +15,14 @@
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
-# long as they are in the same thread with same magic. However, I don't think
-# we should support this use case. So, the gRPC Python Async Stack should use
-# a single event loop picked by "init_grpc_aio".
-cdef object _grpc_aio_loop
-cdef object _event_loop_thread_ident
+# long as they are in the same thread with same magic. This is not a supported
+# use case. So, the gRPC Python Async Stack should use a single event loop
+# picked by "init_grpc_aio".
+cdef object _grpc_aio_loop # asyncio.AbstractEventLoop
+cdef int64_t _event_loop_thread_ident
cdef str _GRPC_ASYNCIO_ENGINE = os.environ.get('GRPC_ASYNCIO_ENGINE', 'default').lower()
grpc_aio_engine = None
+cdef object _grpc_initialization_lock = threading.Lock()
class AsyncIOEngine(enum.Enum):
@@ -36,51 +37,50 @@
global _event_loop_thread_ident
global grpc_aio_engine
- # Marks this function as called
- if _grpc_aio_initialized:
- return
- else:
- _grpc_aio_initialized = True
+ with _grpc_initialization_lock:
+ # Marks this function as called
+ if _grpc_aio_initialized:
+ return
+ else:
+ _grpc_aio_initialized = True
- # Picks the engine for gRPC AsyncIO Stack
- for engine_type in AsyncIOEngine:
- if engine_type.value == _GRPC_ASYNCIO_ENGINE:
- grpc_aio_engine = engine_type
- break
- if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
- grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
+ # Picks the engine for gRPC AsyncIO Stack
+ for engine_type in AsyncIOEngine:
+ if engine_type.value == _GRPC_ASYNCIO_ENGINE:
+ grpc_aio_engine = engine_type
+ break
+ if grpc_aio_engine is None or grpc_aio_engine is AsyncIOEngine.DEFAULT:
+ grpc_aio_engine = AsyncIOEngine.CUSTOM_IO_MANAGER
- # Anchors the event loop that the gRPC library going to use.
- _grpc_aio_loop = asyncio.get_event_loop()
- _event_loop_thread_ident = threading.current_thread().ident
+ # Anchors the event loop that the gRPC library going to use.
+ _grpc_aio_loop = asyncio.get_event_loop()
+ _event_loop_thread_ident = threading.current_thread().ident
- if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
- # Activates asyncio IO manager.
- # NOTE(lidiz) Custom IO manager must be activated before the first
- # `grpc_init()`. Otherwise, some special configurations in Core won't
- # pick up the change, and resulted in SEGFAULT or ABORT.
- install_asyncio_iomgr()
+ if grpc_aio_engine is AsyncIOEngine.CUSTOM_IO_MANAGER:
+ # Activates asyncio IO manager.
+ # NOTE(lidiz) Custom IO manager must be activated before the first
+ # `grpc_init()`. Otherwise, some special configurations in Core won't
+ # pick up the change, and resulted in SEGFAULT or ABORT.
+ install_asyncio_iomgr()
- # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
- # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
- # library won't shutdown cleanly.
- grpc_init()
+ # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
+ # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
+ # library won't shutdown cleanly.
+ grpc_init()
- # Timers are triggered by the Asyncio loop. We disable
- # the background thread that is being used by the native
- # gRPC iomgr.
- grpc_timer_manager_set_threading(False)
+ # Timers are triggered by the Asyncio loop. We disable
+ # the background thread that is being used by the native
+ # gRPC iomgr.
+ grpc_timer_manager_set_threading(False)
- # gRPC callbaks are executed within the same thread used by the Asyncio
- # event loop, as it is being done by the other Asyncio callbacks.
- Executor.SetThreadingAll(False)
- else:
- # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
- # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
- # library won't shutdown cleanly.
- grpc_init()
-
- _grpc_aio_initialized = False
+ # gRPC callbaks are executed within the same thread used by the Asyncio
+ # event loop, as it is being done by the other Asyncio callbacks.
+ Executor.SetThreadingAll(False)
+ else:
+ # TODO(https://github.com/grpc/grpc/issues/22244) we need a the
+ # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
+ # library won't shutdown cleanly.
+ grpc_init()
def grpc_aio_loop():