blob: 5d7e56492f84fb262b848ddc5ea174fdd277f8a8 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Transport library for ProtoRPC.
Contains underlying infrastructure used for communicating RPCs over low level
transports such as HTTP.
Includes HTTP transport built over urllib2.
"""
import six.moves.http_client
import logging
import os
import socket
import sys
import urlparse
from . import messages
from . import protobuf
from . import remote
from . import util
import six
__all__ = [
'RpcStateError',
'HttpTransport',
'LocalTransport',
'Rpc',
'Transport',
]
class RpcStateError(messages.Error):
"""Raised when trying to put RPC in to an invalid state."""
class Rpc(object):
"""Represents a client side RPC.
An RPC is created by the transport class and is used with a single RPC. While
an RPC is still in process, the response is set to None. When it is complete
the response will contain the response message.
"""
def __init__(self, request):
"""Constructor.
Args:
request: Request associated with this RPC.
"""
self.__request = request
self.__response = None
self.__state = remote.RpcState.RUNNING
self.__error_message = None
self.__error_name = None
@property
def request(self):
"""Request associated with RPC."""
return self.__request
@property
def response(self):
"""Response associated with RPC."""
self.wait()
self.__check_status()
return self.__response
@property
def state(self):
"""State associated with RPC."""
return self.__state
@property
def error_message(self):
"""Error, if any, associated with RPC."""
self.wait()
return self.__error_message
@property
def error_name(self):
"""Error name, if any, associated with RPC."""
self.wait()
return self.__error_name
def wait(self):
"""Wait for an RPC to finish."""
if self.__state == remote.RpcState.RUNNING:
self._wait_impl()
def _wait_impl(self):
"""Implementation for wait()."""
raise NotImplementedError()
def __check_status(self):
error_class = remote.RpcError.from_state(self.__state)
if error_class is not None:
if error_class is remote.ApplicationError:
raise error_class(self.__error_message, self.__error_name)
else:
raise error_class(self.__error_message)
def __set_state(self, state, error_message=None, error_name=None):
if self.__state != remote.RpcState.RUNNING:
raise RpcStateError(
'RPC must be in RUNNING state to change to %s' % state)
if state == remote.RpcState.RUNNING:
raise RpcStateError('RPC is already in RUNNING state')
self.__state = state
self.__error_message = error_message
self.__error_name = error_name
def set_response(self, response):
# TODO: Even more specific type checking.
if not isinstance(response, messages.Message):
raise TypeError('Expected Message type, received %r' % (response))
self.__response = response
self.__set_state(remote.RpcState.OK)
def set_status(self, status):
status.check_initialized()
self.__set_state(status.state, status.error_message, status.error_name)
class Transport(object):
"""Transport base class.
Provides basic support for implementing a ProtoRPC transport such as one
that can send and receive messages over HTTP.
Implementations override _start_rpc. This method receives a RemoteInfo
instance and a request Message. The transport is expected to set the rpc
response or raise an exception before termination.
"""
@util.positional(1)
def __init__(self, protocol=protobuf):
"""Constructor.
Args:
protocol: If string, will look up a protocol from the default Protocols
instance by name. Can also be an instance of remote.ProtocolConfig.
If neither, it must be an object that implements a protocol interface
by implementing encode_message, decode_message and set CONTENT_TYPE.
For example, the modules protobuf and protojson can be used directly.
"""
if isinstance(protocol, six.string_types):
protocols = remote.Protocols.get_default()
try:
protocol = protocols.lookup_by_name(protocol)
except KeyError:
protocol = protocols.lookup_by_content_type(protocol)
if isinstance(protocol, remote.ProtocolConfig):
self.__protocol = protocol.protocol
self.__protocol_config = protocol
else:
self.__protocol = protocol
self.__protocol_config = remote.ProtocolConfig(
protocol, 'default', default_content_type=protocol.CONTENT_TYPE)
@property
def protocol(self):
"""Protocol associated with this transport."""
return self.__protocol
@property
def protocol_config(self):
"""Protocol associated with this transport."""
return self.__protocol_config
def send_rpc(self, remote_info, request):
"""Initiate sending an RPC over the transport.
Args:
remote_info: RemoteInfo instance describing remote method.
request: Request message to send to service.
Returns:
An Rpc instance intialized with the request..
"""
request.check_initialized()
rpc = self._start_rpc(remote_info, request)
return rpc
def _start_rpc(self, remote_info, request):
"""Start a remote procedure call.
Args:
remote_info: RemoteInfo instance describing remote method.
request: Request message to send to service.
Returns:
An Rpc instance initialized with the request.
"""
raise NotImplementedError()
class HttpTransport(Transport):
"""Transport for communicating with HTTP servers."""
@util.positional(2)
def __init__(self,
service_url,
protocol=protobuf):
"""Constructor.
Args:
service_url: URL where the service is located. All communication via
the transport will go to this URL.
protocol: The protocol implementation. Must implement encode_message and
decode_message. Can also be an instance of remote.ProtocolConfig.
"""
super(HttpTransport, self).__init__(protocol=protocol)
self.__service_url = service_url
def __get_rpc_status(self, response, content):
"""Get RPC status from HTTP response.
Args:
response: HTTPResponse object.
content: Content read from HTTP response.
Returns:
RpcStatus object parsed from response, else an RpcStatus with a generic
HTTP error.
"""
# Status above 400 may have RpcStatus content.
if response.status >= 400:
content_type = response.getheader('content-type')
if content_type == self.protocol_config.default_content_type:
try:
rpc_status = self.protocol.decode_message(remote.RpcStatus, content)
except Exception as decode_err:
logging.warning(
'An error occurred trying to parse status: %s\n%s',
str(decode_err), content)
else:
if rpc_status.is_initialized():
return rpc_status
else:
logging.warning(
'Body does not result in an initialized RpcStatus message:\n%s',
content)
# If no RpcStatus message present, attempt to forward any content. If empty
# use standard error message.
if not content.strip():
content = six.moves.http_client.responses.get(response.status, 'Unknown Error')
return remote.RpcStatus(state=remote.RpcState.SERVER_ERROR,
error_message='HTTP Error %s: %s' % (
response.status, content or 'Unknown Error'))
def __set_response(self, remote_info, connection, rpc):
"""Set response on RPC.
Sets response or status from HTTP request. Implements the wait method of
Rpc instance.
Args:
remote_info: Remote info for invoked RPC.
connection: HTTPConnection that is making request.
rpc: Rpc instance.
"""
try:
response = connection.getresponse()
content = response.read()
if response.status == six.moves.http_client.OK:
response = self.protocol.decode_message(remote_info.response_type,
content)
rpc.set_response(response)
else:
status = self.__get_rpc_status(response, content)
rpc.set_status(status)
finally:
connection.close()
def _start_rpc(self, remote_info, request):
"""Start a remote procedure call.
Args:
remote_info: A RemoteInfo instance for this RPC.
request: The request message for this RPC.
Returns:
An Rpc instance initialized with a Request.
"""
method_url = '%s.%s' % (self.__service_url, remote_info.method.__name__)
encoded_request = self.protocol.encode_message(request)
url = urlparse.urlparse(method_url)
if url.scheme == 'https':
connection_type = six.moves.http_client.HTTPSConnection
else:
connection_type = six.moves.http_client.HTTPConnection
connection = connection_type(url.hostname, url.port)
try:
self._send_http_request(connection, url.path, encoded_request)
rpc = Rpc(request)
except remote.RpcError:
# Pass through all ProtoRPC errors
connection.close()
raise
except socket.error as err:
connection.close()
raise remote.NetworkError('Socket error: %s %r' % (type(err).__name__,
err.args),
err)
except Exception as err:
connection.close()
raise remote.NetworkError('Error communicating with HTTP server',
err)
else:
wait_impl = lambda: self.__set_response(remote_info, connection, rpc)
rpc._wait_impl = wait_impl
return rpc
def _send_http_request(self, connection, http_path, encoded_request):
connection.request(
'POST',
http_path,
encoded_request,
headers={'Content-type': self.protocol_config.default_content_type,
'Content-length': len(encoded_request)})
class LocalTransport(Transport):
"""Local transport that sends messages directly to services.
Useful in tests or creating code that can work with either local or remote
services. Using LocalTransport is preferrable to simply instantiating a
single instance of a service and reusing it. The entire request process
involves instantiating a new instance of a service, initializing it with
request state and then invoking the remote method for every request.
"""
def __init__(self, service_factory):
"""Constructor.
Args:
service_factory: Service factory or class.
"""
super(LocalTransport, self).__init__()
self.__service_class = getattr(service_factory,
'service_class',
service_factory)
self.__service_factory = service_factory
@property
def service_class(self):
return self.__service_class
@property
def service_factory(self):
return self.__service_factory
def _start_rpc(self, remote_info, request):
"""Start a remote procedure call.
Args:
remote_info: RemoteInfo instance describing remote method.
request: Request message to send to service.
Returns:
An Rpc instance initialized with the request.
"""
rpc = Rpc(request)
def wait_impl():
instance = self.__service_factory()
try:
initalize_request_state = instance.initialize_request_state
except AttributeError:
pass
else:
host = six.text_type(os.uname()[1])
initalize_request_state(remote.RequestState(remote_host=host,
remote_address=u'127.0.0.1',
server_host=host,
server_port=-1))
try:
response = remote_info.method(instance, request)
assert isinstance(response, remote_info.response_type)
except remote.ApplicationError:
raise
except:
exc_type, exc_value, traceback = sys.exc_info()
message = 'Unexpected error %s: %s' % (exc_type.__name__, exc_value)
six.reraise(remote.ServerError, message, traceback)
rpc.set_response(response)
rpc._wait_impl = wait_impl
return rpc