blob: 191b1c17264ccde8a753cee80da48c805b7f11f2 [file] [log] [blame]
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import grpc
_NOT_YET_OBSERVED = object()
_LOGGER = logging.getLogger(__name__)
def _cancel(handler):
return handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!')
def _is_active(handler):
return handler.is_active()
def _time_remaining(unused_handler):
raise NotImplementedError()
def _add_callback(handler, callback):
return handler.add_callback(callback)
def _initial_metadata(handler):
return handler.initial_metadata()
def _trailing_metadata(handler):
trailing_metadata, unused_code, unused_details = handler.termination()
return trailing_metadata
def _code(handler):
unused_trailing_metadata, code, unused_details = handler.termination()
return code
def _details(handler):
unused_trailing_metadata, unused_code, details = handler.termination()
return details
class _Call(grpc.Call):
def __init__(self, handler):
self._handler = handler
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
class _RpcErrorCall(grpc.RpcError, grpc.Call):
def __init__(self, handler):
self._handler = handler
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
def _next(handler):
read = handler.take_response()
if read.code is None:
return read.response
elif read.code is grpc.StatusCode.OK:
raise StopIteration()
else:
raise _RpcErrorCall(handler)
class _HandlerExtras(object):
def __init__(self):
self.condition = threading.Condition()
self.unary_response = _NOT_YET_OBSERVED
self.cancelled = False
def _with_extras_cancel(handler, extras):
with extras.condition:
if handler.cancel(grpc.StatusCode.CANCELLED, 'Locally cancelled!'):
extras.cancelled = True
return True
else:
return False
def _extras_without_cancelled(extras):
with extras.condition:
return extras.cancelled
def _running(handler):
return handler.is_active()
def _done(handler):
return not handler.is_active()
def _with_extras_unary_response(handler, extras):
with extras.condition:
if extras.unary_response is _NOT_YET_OBSERVED:
read = handler.take_response()
if read.code is None:
extras.unary_response = read.response
return read.response
else:
raise _RpcErrorCall(handler)
else:
return extras.unary_response
def _exception(unused_handler):
raise NotImplementedError('TODO!')
def _traceback(unused_handler):
raise NotImplementedError('TODO!')
def _add_done_callback(handler, callback, future):
adapted_callback = lambda: callback(future)
if not handler.add_callback(adapted_callback):
callback(future)
class _FutureCall(grpc.Future, grpc.Call):
def __init__(self, handler, extras):
self._handler = handler
self._extras = extras
def cancel(self):
return _with_extras_cancel(self._handler, self._extras)
def cancelled(self):
return _extras_without_cancelled(self._extras)
def running(self):
return _running(self._handler)
def done(self):
return _done(self._handler)
def result(self):
return _with_extras_unary_response(self._handler, self._extras)
def exception(self):
return _exception(self._handler)
def traceback(self):
return _traceback(self._handler)
def add_done_callback(self, fn):
_add_done_callback(self._handler, fn, self)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)
def consume_requests(request_iterator, handler):
def _consume():
while True:
try:
request = next(request_iterator)
added = handler.add_request(request)
if not added:
break
except StopIteration:
handler.close_requests()
break
except Exception: # pylint: disable=broad-except
details = 'Exception iterating requests!'
_LOGGER.exception(details)
handler.cancel(grpc.StatusCode.UNKNOWN, details)
consumption = threading.Thread(target=_consume)
consumption.start()
def blocking_unary_response(handler):
read = handler.take_response()
if read.code is None:
unused_trailing_metadata, code, unused_details = handler.termination()
if code is grpc.StatusCode.OK:
return read.response
else:
raise _RpcErrorCall(handler)
else:
raise _RpcErrorCall(handler)
def blocking_unary_response_with_call(handler):
read = handler.take_response()
if read.code is None:
unused_trailing_metadata, code, unused_details = handler.termination()
if code is grpc.StatusCode.OK:
return read.response, _Call(handler)
else:
raise _RpcErrorCall(handler)
else:
raise _RpcErrorCall(handler)
def future_call(handler):
return _FutureCall(handler, _HandlerExtras())
class ResponseIteratorCall(grpc.Call):
def __init__(self, handler):
self._handler = handler
def __iter__(self):
return self
def __next__(self):
return _next(self._handler)
def next(self):
return _next(self._handler)
def cancel(self):
_cancel(self._handler)
def is_active(self):
return _is_active(self._handler)
def time_remaining(self):
return _time_remaining(self._handler)
def add_callback(self, callback):
return _add_callback(self._handler, callback)
def initial_metadata(self):
return _initial_metadata(self._handler)
def trailing_metadata(self):
return _trailing_metadata(self._handler)
def code(self):
return _code(self._handler)
def details(self):
return _details(self._handler)