| # Copyright 2015 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| cimport cpython |
| |
| import threading |
| import time |
| |
| _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( |
| 'Internal gRPC call error %d. ' + |
| 'Please report to https://github.com/grpc/grpc/issues') |
| |
| |
| cdef str _call_error_metadata(metadata): |
| return 'metadata was invalid: %s' % metadata |
| |
| |
| cdef str _call_error_no_metadata(c_call_error): |
| return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error |
| |
| |
| cdef str _call_error(c_call_error, metadata): |
| if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: |
| return _call_error_metadata(metadata) |
| else: |
| return _call_error_no_metadata(c_call_error) |
| |
| |
| cdef _check_call_error_no_metadata(c_call_error): |
| if c_call_error != GRPC_CALL_OK: |
| return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error |
| else: |
| return None |
| |
| |
| cdef _check_and_raise_call_error_no_metadata(c_call_error): |
| error = _check_call_error_no_metadata(c_call_error) |
| if error is not None: |
| raise ValueError(error) |
| |
| |
| cdef _check_call_error(c_call_error, metadata): |
| if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: |
| return _call_error_metadata(metadata) |
| else: |
| return _check_call_error_no_metadata(c_call_error) |
| |
| |
| cdef void _raise_call_error_no_metadata(c_call_error) except *: |
| raise ValueError(_call_error_no_metadata(c_call_error)) |
| |
| |
| cdef void _raise_call_error(c_call_error, metadata) except *: |
| raise ValueError(_call_error(c_call_error, metadata)) |
| |
| |
| cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): |
| grpc_completion_queue_shutdown(c_completion_queue) |
| grpc_completion_queue_destroy(c_completion_queue) |
| |
| |
| cdef class _CallState: |
| |
| def __cinit__(self): |
| self.due = set() |
| |
| |
| cdef class _ChannelState: |
| |
| def __cinit__(self): |
| self.condition = threading.Condition() |
| self.open = True |
| self.integrated_call_states = {} |
| self.segregated_call_states = set() |
| self.connectivity_due = set() |
| self.closed_reason = None |
| |
| |
| cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): |
| cdef grpc_call_error c_call_error |
| cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) |
| tag.prepare() |
| cpython.Py_INCREF(tag) |
| with nogil: |
| c_call_error = grpc_call_start_batch( |
| c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) |
| return c_call_error, tag |
| |
| |
| cdef object _operate_from_integrated_call( |
| _ChannelState channel_state, _CallState call_state, object operations, |
| object user_tag): |
| cdef grpc_call_error c_call_error |
| cdef _BatchOperationTag tag |
| with channel_state.condition: |
| if call_state.due: |
| c_call_error, tag = _operate(call_state.c_call, operations, user_tag) |
| if c_call_error == GRPC_CALL_OK: |
| call_state.due.add(tag) |
| channel_state.integrated_call_states[tag] = call_state |
| return True |
| else: |
| _raise_call_error_no_metadata(c_call_error) |
| else: |
| return False |
| |
| |
| cdef object _operate_from_segregated_call( |
| _ChannelState channel_state, _CallState call_state, object operations, |
| object user_tag): |
| cdef grpc_call_error c_call_error |
| cdef _BatchOperationTag tag |
| with channel_state.condition: |
| if call_state.due: |
| c_call_error, tag = _operate(call_state.c_call, operations, user_tag) |
| if c_call_error == GRPC_CALL_OK: |
| call_state.due.add(tag) |
| return True |
| else: |
| _raise_call_error_no_metadata(c_call_error) |
| else: |
| return False |
| |
| |
| cdef _cancel( |
| _ChannelState channel_state, _CallState call_state, grpc_status_code code, |
| str details): |
| cdef grpc_call_error c_call_error |
| with channel_state.condition: |
| if call_state.due: |
| c_call_error = grpc_call_cancel_with_status( |
| call_state.c_call, code, _encode(details), NULL) |
| _check_and_raise_call_error_no_metadata(c_call_error) |
| |
| |
| cdef _next_call_event( |
| _ChannelState channel_state, grpc_completion_queue *c_completion_queue, |
| on_success, deadline): |
| tag, event = _latent_event(c_completion_queue, deadline) |
| with channel_state.condition: |
| on_success(tag) |
| channel_state.condition.notify_all() |
| return event |
| |
| |
| # TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. |
| cdef void _call( |
| _ChannelState channel_state, _CallState call_state, |
| grpc_completion_queue *c_completion_queue, on_success, int flags, method, |
| host, object deadline, CallCredentials credentials, |
| object operationses_and_user_tags, object metadata, |
| object context) except *: |
| """Invokes an RPC. |
| |
| Args: |
| channel_state: A _ChannelState with its "open" attribute set to True. RPCs |
| may not be invoked on a closed channel. |
| call_state: An empty _CallState to be altered (specifically assigned a |
| c_call and having its due set populated) if the RPC invocation is |
| successful. |
| c_completion_queue: A grpc_completion_queue to be used for the call's |
| operations. |
| on_success: A behavior to be called if attempting to start operations for |
| the call succeeds. If called the behavior will be called while holding the |
| channel_state condition and passed the tags associated with operations |
| that were successfully started for the call. |
| flags: Flags to be passed to gRPC Core as part of call creation. |
| method: The fully-qualified name of the RPC method being invoked. |
| host: A "host" string to be passed to gRPC Core as part of call creation. |
| deadline: A float for the deadline of the RPC, or None if the RPC is to have |
| no deadline. |
| credentials: A _CallCredentials for the RPC or None. |
| operationses_and_user_tags: A sequence of length-two sequences the first |
| element of which is a sequence of Operations and the second element of |
| which is an object to be used as a tag. A SendInitialMetadataOperation |
| must be present in the first element of this value. |
| metadata: The metadata for this call. |
| context: Context object for distributed tracing. |
| """ |
| cdef grpc_slice method_slice |
| cdef grpc_slice host_slice |
| cdef grpc_slice *host_slice_ptr |
| cdef grpc_call_credentials *c_call_credentials |
| cdef grpc_call_error c_call_error |
| cdef tuple error_and_wrapper_tag |
| cdef _BatchOperationTag wrapper_tag |
| with channel_state.condition: |
| if channel_state.open: |
| method_slice = _slice_from_bytes(method) |
| if host is None: |
| host_slice_ptr = NULL |
| else: |
| host_slice = _slice_from_bytes(host) |
| host_slice_ptr = &host_slice |
| call_state.c_call = grpc_channel_create_call( |
| channel_state.c_channel, NULL, flags, |
| c_completion_queue, method_slice, host_slice_ptr, |
| _timespec_from_time(deadline), NULL) |
| grpc_slice_unref(method_slice) |
| if host_slice_ptr: |
| grpc_slice_unref(host_slice) |
| if context is not None: |
| set_census_context_on_call(call_state, context) |
| if credentials is not None: |
| c_call_credentials = credentials.c() |
| c_call_error = grpc_call_set_credentials( |
| call_state.c_call, c_call_credentials) |
| grpc_call_credentials_release(c_call_credentials) |
| if c_call_error != GRPC_CALL_OK: |
| grpc_call_unref(call_state.c_call) |
| call_state.c_call = NULL |
| _raise_call_error_no_metadata(c_call_error) |
| started_tags = set() |
| for operations, user_tag in operationses_and_user_tags: |
| c_call_error, tag = _operate(call_state.c_call, operations, user_tag) |
| if c_call_error == GRPC_CALL_OK: |
| started_tags.add(tag) |
| else: |
| grpc_call_cancel(call_state.c_call, NULL) |
| grpc_call_unref(call_state.c_call) |
| call_state.c_call = NULL |
| _raise_call_error(c_call_error, metadata) |
| else: |
| call_state.due.update(started_tags) |
| on_success(started_tags) |
| else: |
| raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) |
| cdef void _process_integrated_call_tag( |
| _ChannelState state, _BatchOperationTag tag) except *: |
| cdef _CallState call_state = state.integrated_call_states.pop(tag) |
| call_state.due.remove(tag) |
| if not call_state.due: |
| grpc_call_unref(call_state.c_call) |
| call_state.c_call = NULL |
| |
| |
| cdef class IntegratedCall: |
| |
| def __cinit__(self, _ChannelState channel_state, _CallState call_state): |
| self._channel_state = channel_state |
| self._call_state = call_state |
| |
| def operate(self, operations, tag): |
| return _operate_from_integrated_call( |
| self._channel_state, self._call_state, operations, tag) |
| |
| def cancel(self, code, details): |
| _cancel(self._channel_state, self._call_state, code, details) |
| |
| |
| cdef IntegratedCall _integrated_call( |
| _ChannelState state, int flags, method, host, object deadline, |
| object metadata, CallCredentials credentials, operationses_and_user_tags, |
| object context): |
| call_state = _CallState() |
| |
| def on_success(started_tags): |
| for started_tag in started_tags: |
| state.integrated_call_states[started_tag] = call_state |
| |
| _call( |
| state, call_state, state.c_call_completion_queue, on_success, flags, |
| method, host, deadline, credentials, operationses_and_user_tags, metadata, context) |
| |
| return IntegratedCall(state, call_state) |
| |
| |
| cdef object _process_segregated_call_tag( |
| _ChannelState state, _CallState call_state, |
| grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): |
| call_state.due.remove(tag) |
| if not call_state.due: |
| grpc_call_unref(call_state.c_call) |
| call_state.c_call = NULL |
| state.segregated_call_states.remove(call_state) |
| _destroy_c_completion_queue(c_completion_queue) |
| return True |
| else: |
| return False |
| |
| |
| cdef class SegregatedCall: |
| |
| def __cinit__(self, _ChannelState channel_state, _CallState call_state): |
| self._channel_state = channel_state |
| self._call_state = call_state |
| |
| def operate(self, operations, tag): |
| return _operate_from_segregated_call( |
| self._channel_state, self._call_state, operations, tag) |
| |
| def cancel(self, code, details): |
| _cancel(self._channel_state, self._call_state, code, details) |
| |
| def next_event(self): |
| def on_success(tag): |
| _process_segregated_call_tag( |
| self._channel_state, self._call_state, self._c_completion_queue, tag) |
| return _next_call_event( |
| self._channel_state, self._c_completion_queue, on_success, None) |
| |
| |
| cdef SegregatedCall _segregated_call( |
| _ChannelState state, int flags, method, host, object deadline, |
| object metadata, CallCredentials credentials, operationses_and_user_tags, |
| object context): |
| cdef _CallState call_state = _CallState() |
| cdef SegregatedCall segregated_call |
| cdef grpc_completion_queue *c_completion_queue |
| |
| def on_success(started_tags): |
| state.segregated_call_states.add(call_state) |
| |
| with state.condition: |
| if state.open: |
| c_completion_queue = (grpc_completion_queue_create_for_next(NULL)) |
| else: |
| raise ValueError('Cannot invoke RPC on closed channel!') |
| |
| try: |
| _call( |
| state, call_state, c_completion_queue, on_success, flags, method, host, |
| deadline, credentials, operationses_and_user_tags, metadata, |
| context) |
| except: |
| _destroy_c_completion_queue(c_completion_queue) |
| raise |
| |
| segregated_call = SegregatedCall(state, call_state) |
| segregated_call._c_completion_queue = c_completion_queue |
| return segregated_call |
| |
| |
| cdef object _watch_connectivity_state( |
| _ChannelState state, grpc_connectivity_state last_observed_state, |
| object deadline): |
| cdef _ConnectivityTag tag = _ConnectivityTag(object()) |
| with state.condition: |
| if state.open: |
| cpython.Py_INCREF(tag) |
| grpc_channel_watch_connectivity_state( |
| state.c_channel, last_observed_state, _timespec_from_time(deadline), |
| state.c_connectivity_completion_queue, <cpython.PyObject *>tag) |
| state.connectivity_due.add(tag) |
| else: |
| raise ValueError('Cannot invoke RPC: %s' % state.closed_reason) |
| completed_tag, event = _latent_event( |
| state.c_connectivity_completion_queue, None) |
| with state.condition: |
| state.connectivity_due.remove(completed_tag) |
| state.condition.notify_all() |
| return event |
| |
| |
| cdef _close(Channel channel, grpc_status_code code, object details, |
| drain_calls): |
| cdef _ChannelState state = channel._state |
| cdef _CallState call_state |
| encoded_details = _encode(details) |
| with state.condition: |
| if state.open: |
| state.open = False |
| state.closed_reason = details |
| for call_state in set(state.integrated_call_states.values()): |
| grpc_call_cancel_with_status( |
| call_state.c_call, code, encoded_details, NULL) |
| for call_state in state.segregated_call_states: |
| grpc_call_cancel_with_status( |
| call_state.c_call, code, encoded_details, NULL) |
| # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity |
| # watching. |
| |
| if drain_calls: |
| while not _calls_drained(state): |
| event = channel.next_call_event() |
| if event.completion_type == CompletionType.queue_timeout: |
| continue |
| event.tag(event) |
| else: |
| while state.integrated_call_states: |
| state.condition.wait() |
| while state.segregated_call_states: |
| state.condition.wait() |
| while state.connectivity_due: |
| state.condition.wait() |
| |
| _destroy_c_completion_queue(state.c_call_completion_queue) |
| _destroy_c_completion_queue(state.c_connectivity_completion_queue) |
| grpc_channel_destroy(state.c_channel) |
| state.c_channel = NULL |
| grpc_shutdown() |
| state.condition.notify_all() |
| else: |
| # Another call to close already completed in the past or is currently |
| # being executed in another thread. |
| while state.c_channel != NULL: |
| state.condition.wait() |
| |
| |
| cdef _calls_drained(_ChannelState state): |
| return not (state.integrated_call_states or state.segregated_call_states or |
| state.connectivity_due) |
| |
| cdef class Channel: |
| |
| def __cinit__( |
| self, bytes target, object arguments, |
| ChannelCredentials channel_credentials): |
| arguments = () if arguments is None else tuple(arguments) |
| fork_handlers_and_grpc_init() |
| self._state = _ChannelState() |
| self._vtable.copy = &_copy_pointer |
| self._vtable.destroy = &_destroy_pointer |
| self._vtable.cmp = &_compare_pointer |
| cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( |
| arguments) |
| cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) |
| if channel_credentials is None: |
| self._state.c_channel = grpc_insecure_channel_create( |
| <char *>target, c_arguments, NULL) |
| else: |
| c_channel_credentials = channel_credentials.c() |
| self._state.c_channel = grpc_secure_channel_create( |
| c_channel_credentials, <char *>target, c_arguments, NULL) |
| grpc_channel_credentials_release(c_channel_credentials) |
| self._state.c_call_completion_queue = ( |
| grpc_completion_queue_create_for_next(NULL)) |
| self._state.c_connectivity_completion_queue = ( |
| grpc_completion_queue_create_for_next(NULL)) |
| self._arguments = arguments |
| |
| def target(self): |
| cdef char *c_target |
| with self._state.condition: |
| c_target = grpc_channel_get_target(self._state.c_channel) |
| target = <bytes>c_target |
| gpr_free(c_target) |
| return target |
| |
| def integrated_call( |
| self, int flags, method, host, object deadline, object metadata, |
| CallCredentials credentials, operationses_and_tags, |
| object context = None): |
| return _integrated_call( |
| self._state, flags, method, host, deadline, metadata, credentials, |
| operationses_and_tags, context) |
| |
| def next_call_event(self): |
| def on_success(tag): |
| if tag is not None: |
| _process_integrated_call_tag(self._state, tag) |
| if is_fork_support_enabled(): |
| queue_deadline = time.time() + 1.0 |
| else: |
| queue_deadline = None |
| return _next_call_event(self._state, self._state.c_call_completion_queue, |
| on_success, queue_deadline) |
| |
| def segregated_call( |
| self, int flags, method, host, object deadline, object metadata, |
| CallCredentials credentials, operationses_and_tags, |
| object context = None): |
| return _segregated_call( |
| self._state, flags, method, host, deadline, metadata, credentials, |
| operationses_and_tags, context) |
| |
| def check_connectivity_state(self, bint try_to_connect): |
| with self._state.condition: |
| if self._state.open: |
| return grpc_channel_check_connectivity_state( |
| self._state.c_channel, try_to_connect) |
| else: |
| raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason) |
| |
| def watch_connectivity_state( |
| self, grpc_connectivity_state last_observed_state, object deadline): |
| return _watch_connectivity_state(self._state, last_observed_state, deadline) |
| |
| def close(self, code, details): |
| _close(self, code, details, False) |
| |
| def close_on_fork(self, code, details): |
| _close(self, code, details, True) |