blob: c938f55adff087544a1317273ae35c8f2e4e5944 [file] [log] [blame]
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class _WatchConnectivityFailed(Exception):
"""Dedicated exception class for watch connectivity failed.
It might be failed due to deadline exceeded.
"""
cdef CallbackFailureHandler _WATCH_CONNECTIVITY_FAILURE_HANDLER = CallbackFailureHandler(
'watch_connectivity_state',
'Timed out',
_WatchConnectivityFailed)
cdef class AioChannel:
def __cinit__(self, bytes target, tuple options, ChannelCredentials credentials):
if options is None:
options = ()
cdef _ChannelArgs channel_args = _ChannelArgs(options)
self._target = target
self.cq = CallbackCompletionQueue()
self._loop = asyncio.get_event_loop()
self._status = AIO_CHANNEL_STATUS_READY
if credentials is None:
self.channel = grpc_insecure_channel_create(
<char *>target,
channel_args.c_args(),
NULL)
else:
self.channel = grpc_secure_channel_create(
<grpc_channel_credentials *> credentials.c(),
<char *>target,
channel_args.c_args(),
NULL)
def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"
def check_connectivity_state(self, bint try_to_connect):
"""A Cython wrapper for Core's check connectivity state API."""
return grpc_channel_check_connectivity_state(
self.channel,
try_to_connect,
)
async def watch_connectivity_state(self,
grpc_connectivity_state last_observed_state,
object deadline):
"""Watch for one connectivity state change.
Keeps mirroring the behavior from Core, so we can easily switch to
other design of API if necessary.
"""
if self._status == AIO_CHANNEL_STATUS_DESTROYED:
# TODO(lidiz) switch to UsageError
raise RuntimeError('Channel is closed.')
cdef gpr_timespec c_deadline = _timespec_from_time(deadline)
cdef object future = self._loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(
future,
_WATCH_CONNECTIVITY_FAILURE_HANDLER)
grpc_channel_watch_connectivity_state(
self.channel,
last_observed_state,
c_deadline,
self.cq.c_ptr(),
wrapper.c_functor())
try:
await future
except _WatchConnectivityFailed:
return False
else:
return True
def close(self):
grpc_channel_destroy(self.channel)
self._status = AIO_CHANNEL_STATUS_DESTROYED
def call(self,
bytes method,
object deadline,
object python_call_credentials):
"""Assembles a Cython Call object.
Returns:
The _AioCall object.
"""
if self._status == AIO_CHANNEL_STATUS_DESTROYED:
# TODO(lidiz) switch to UsageError
raise RuntimeError('Channel is closed.')
cdef CallCredentials cython_call_credentials
if python_call_credentials is not None:
cython_call_credentials = python_call_credentials._credentials
else:
cython_call_credentials = None
cdef _AioCall call = _AioCall(self, deadline, method, cython_call_credentials)
return call