blob: 4014e8236809ad60194b8d732e64b8e4c4d3b60e [file] [log] [blame]
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Invocation-side implementation of gRPC Asyncio Python."""
import asyncio
import time
from typing import Any, Optional, Sequence, Text, Tuple
import grpc
from grpc import _common
from grpc._cython import cygrpc
from . import _base_call
from ._call import UnaryStreamCall, UnaryUnaryCall
from ._interceptor import (InterceptedUnaryUnaryCall,
UnaryUnaryClientInterceptor)
from ._typing import (ChannelArgumentType, DeserializingFunction, MetadataType,
SerializingFunction)
from ._utils import _timeout_to_deadline
class UnaryUnaryMultiCallable:
"""Factory an asynchronous unary-unary RPC stub call from client-side."""
_channel: cygrpc.AioChannel
_method: bytes
_request_serializer: SerializingFunction
_response_deserializer: DeserializingFunction
_interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]
_loop: asyncio.AbstractEventLoop
def __init__(self, channel: cygrpc.AioChannel, method: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]
) -> None:
self._loop = asyncio.get_event_loop()
self._channel = channel
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._interceptors = interceptors
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[MetadataType] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryUnaryCall:
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow
for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC. Only valid for
secure Channel.
wait_for_ready: This is an EXPERIMENTAL argument. An optional
flag to enable wait for ready mechanism
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This is an EXPERIMENTAL option.
Returns:
A Call object instance which is an awaitable object.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
if metadata:
raise NotImplementedError("TODO: metadata not implemented yet")
if wait_for_ready:
raise NotImplementedError(
"TODO: wait_for_ready not implemented yet")
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
if not self._interceptors:
return UnaryUnaryCall(
request,
_timeout_to_deadline(timeout),
credentials,
self._channel,
self._method,
self._request_serializer,
self._response_deserializer,
)
else:
return InterceptedUnaryUnaryCall(
self._interceptors,
request,
timeout,
credentials,
self._channel,
self._method,
self._request_serializer,
self._response_deserializer,
)
class UnaryStreamMultiCallable:
"""Afford invoking a unary-stream RPC from client-side in an asynchronous way."""
def __init__(self, channel: cygrpc.AioChannel, method: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction) -> None:
self._channel = channel
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._loop = asyncio.get_event_loop()
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[MetadataType] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryStreamCall:
"""Asynchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow
for the RPC.
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC. Only valid for
secure Channel.
wait_for_ready: This is an EXPERIMENTAL argument. An optional
flag to enable wait for ready mechanism
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This is an EXPERIMENTAL option.
Returns:
A Call object instance which is an awaitable object.
"""
if metadata:
raise NotImplementedError("TODO: metadata not implemented yet")
if wait_for_ready:
raise NotImplementedError(
"TODO: wait_for_ready not implemented yet")
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
deadline = _timeout_to_deadline(timeout)
return UnaryStreamCall(
request,
deadline,
credentials,
self._channel,
self._method,
self._request_serializer,
self._response_deserializer,
)
class Channel:
"""Asynchronous Channel implementation.
A cygrpc.AioChannel-backed implementation.
"""
_channel: cygrpc.AioChannel
_unary_unary_interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]
def __init__(self, target: Text, options: Optional[ChannelArgumentType],
credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression],
interceptors: Optional[Sequence[UnaryUnaryClientInterceptor]]):
"""Constructor.
Args:
target: The target to which to connect.
options: Configuration options for the channel.
credentials: A cygrpc.ChannelCredentials or None.
compression: An optional value indicating the compression method to be
used over the lifetime of the channel.
interceptors: An optional list of interceptors that would be used for
intercepting any RPC executed with that channel.
"""
if compression:
raise NotImplementedError("TODO: compression not implemented yet")
if interceptors is None:
self._unary_unary_interceptors = None
else:
self._unary_unary_interceptors = list(
filter(
lambda interceptor: isinstance(interceptor,
UnaryUnaryClientInterceptor),
interceptors))
invalid_interceptors = set(interceptors) - set(
self._unary_unary_interceptors)
if invalid_interceptors:
raise ValueError(
"Interceptor must be "+\
"UnaryUnaryClientInterceptors, the following are invalid: {}"\
.format(invalid_interceptors))
self._channel = cygrpc.AioChannel(_common.encode(target), options,
credentials)
def check_connectivity_state(self, try_to_connect: bool = False
) -> grpc.ChannelConnectivity:
"""Check the connectivity state of a channel.
This is an EXPERIMENTAL API.
Args:
try_to_connect: a bool indicate whether the Channel should try to connect to peer or not.
Returns:
A ChannelConnectivity object.
"""
result = self._channel.check_connectivity_state(try_to_connect)
return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
async def watch_connectivity_state(
self,
last_observed_state: grpc.ChannelConnectivity,
timeout_seconds: Optional[float] = None,
) -> Optional[grpc.ChannelConnectivity]:
"""Watch for a change in connectivity state.
This is an EXPERIMENTAL API.
Once the channel connectivity state is different from
last_observed_state, the function will return the new connectivity
state. If deadline expires BEFORE the state is changed, None will be
returned.
Args:
try_to_connect: a bool indicate whether the Channel should try to connect to peer or not.
Returns:
A ChannelConnectivity object or None.
"""
deadline = time.time(
) + timeout_seconds if timeout_seconds is not None else None
result = await self._channel.watch_connectivity_state(
last_observed_state.value[0], deadline)
if result is None:
return None
else:
return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
result]
def unary_unary(
self,
method: Text,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryUnaryMultiCallable:
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
Args:
method: The name of the RPC method.
request_serializer: Optional behaviour for serializing the request
message. Request goes unserialized in case None is passed.
response_deserializer: Optional behaviour for deserializing the
response message. Response goes undeserialized in case None
is passed.
Returns:
A UnaryUnaryMultiCallable value for the named unary-unary method.
"""
return UnaryUnaryMultiCallable(self._channel, _common.encode(method),
request_serializer,
response_deserializer,
self._unary_unary_interceptors)
def unary_stream(
self,
method: Text,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryStreamMultiCallable:
return UnaryStreamMultiCallable(self._channel, _common.encode(method),
request_serializer,
response_deserializer)
def stream_unary(
self,
method: Text,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None):
"""Placeholder method for stream-unary calls."""
def stream_stream(
self,
method: Text,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None):
"""Placeholder method for stream-stream calls."""
async def _close(self):
# TODO: Send cancellation status
self._channel.close()
async def __aenter__(self):
"""Starts an asynchronous context manager.
Returns:
Channel the channel that was instantiated.
"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Finishes the asynchronous context manager by closing gracefully the channel."""
await self._close()
async def close(self):
"""Closes this Channel and releases all resources held by it.
Closing the Channel will proactively terminate all RPCs active with the
Channel and it is not valid to invoke new RPCs with the Channel.
This method is idempotent.
"""
await self._close()