blob: b1b5bbdcab30dd304d6f254b087dc9beda234d11 [file] [log] [blame]
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests exposure of SSL auth context"""
import pickle
import unittest
import logging
import grpc
from grpc import _channel
from grpc.experimental import session_cache
import six
from tests.unit import test_common
from tests.unit import resources
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x00'
_UNARY_UNARY = '/test/UnaryUnary'
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
_CLIENT_IDS = (
b'*.test.google.fr',
b'waterzooi.test.google.be',
b'*.test.youtube.com',
b'192.168.1.3',
)
_ID = 'id'
_ID_KEY = 'id_key'
_AUTH_CTX = 'auth_ctx'
_PRIVATE_KEY = resources.private_key()
_CERTIFICATE_CHAIN = resources.certificate_chain()
_TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
_PROPERTY_OPTIONS = ((
'grpc.ssl_target_name_override',
_SERVER_HOST_OVERRIDE,
),)
def handle_unary_unary(request, servicer_context):
return pickle.dumps({
_ID: servicer_context.peer_identities(),
_ID_KEY: servicer_context.peer_identity_key(),
_AUTH_CTX: servicer_context.auth_context()
})
class AuthContextTest(unittest.TestCase):
def testInsecure(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
server.stop(None)
auth_data = pickle.loads(response)
self.assertIsNone(auth_data[_ID])
self.assertIsNone(auth_data[_ID_KEY])
self.assertDictEqual({}, auth_data[_AUTH_CTX])
def testSecureNoCert(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES)
channel = grpc.secure_channel(
'localhost:{}'.format(port),
channel_creds,
options=_PROPERTY_OPTIONS)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
server.stop(None)
auth_data = pickle.loads(response)
self.assertIsNone(auth_data[_ID])
self.assertIsNone(auth_data[_ID_KEY])
self.assertDictEqual({
'transport_security_type': [b'ssl'],
'ssl_session_reused': [b'false'],
}, auth_data[_AUTH_CTX])
def testSecureClientCert(self):
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(
_SERVER_CERTS,
root_certificates=_TEST_ROOT_CERTIFICATES,
require_client_auth=True)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES,
private_key=_PRIVATE_KEY,
certificate_chain=_CERTIFICATE_CHAIN)
channel = grpc.secure_channel(
'localhost:{}'.format(port),
channel_creds,
options=_PROPERTY_OPTIONS)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
server.stop(None)
auth_data = pickle.loads(response)
auth_ctx = auth_data[_AUTH_CTX]
six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
self.assertSequenceEqual([b'*.test.google.com'],
auth_ctx['x509_common_name'])
def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
expect_ssl_session_reused):
channel = grpc.secure_channel(
'localhost:{}'.format(port), channel_creds, options=channel_options)
response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
auth_data = pickle.loads(response)
self.assertEqual(expect_ssl_session_reused,
auth_data[_AUTH_CTX]['ssl_session_reused'])
channel.close()
def testSessionResumption(self):
# Set up a secure server
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
server = test_common.test_server()
server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
# Create a cache for TLS session tickets
cache = session_cache.ssl_session_cache_lru(1)
channel_creds = grpc.ssl_channel_credentials(
root_certificates=_TEST_ROOT_CERTIFICATES)
channel_options = _PROPERTY_OPTIONS + (
('grpc.ssl_session_cache', cache),)
# Initial connection has no session to resume
self._do_one_shot_client_rpc(
channel_creds,
channel_options,
port,
expect_ssl_session_reused=[b'false'])
# Subsequent connections resume sessions
self._do_one_shot_client_rpc(
channel_creds,
channel_options,
port,
expect_ssl_session_reused=[b'true'])
server.stop(None)
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)