blob: a500897029326b7a1801a94004ebbb311b6a9711 [file] [log] [blame]
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests of grpc_channelz.v1.channelz."""
import unittest
from concurrent import futures
import grpc
from grpc_channelz.v1 import channelz
from grpc_channelz.v1 import channelz_pb2
from grpc_channelz.v1 import channelz_pb2_grpc
from tests.unit import test_common
from tests.unit.framework.common import test_constants
_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x01\x01\x01'
_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
def _successful_unary_unary(request, servicer_context):
return _RESPONSE
def _failed_unary_unary(request, servicer_context):
servicer_context.set_code(grpc.StatusCode.INTERNAL)
servicer_context.set_details("Channelz Test Intended Failure")
def _successful_stream_stream(request_iterator, servicer_context):
for _ in request_iterator:
yield _RESPONSE
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
elif handler_call_details.method == _FAILED_UNARY_UNARY:
return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
return grpc.stream_stream_rpc_method_handler(
_successful_stream_stream)
else:
return None
class _ChannelServerPair(object):
def __init__(self):
# Server will enable channelz service
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
options=_DISABLE_REUSE_PORT +
_ENABLE_CHANNELZ)
port = self.server.add_insecure_port('[::]:0')
self.server.add_generic_rpc_handlers((_GenericHandler(),))
self.server.start()
# Channel will enable channelz service...
self.channel = grpc.insecure_channel('localhost:%d' % port,
_ENABLE_CHANNELZ)
def _generate_channel_server_pairs(n):
return [_ChannelServerPair() for i in range(n)]
def _close_channel_server_pairs(pairs):
for pair in pairs:
pair.server.stop(None)
pair.channel.close()
class ChannelzServicerTest(unittest.TestCase):
def _send_successful_unary_unary(self, idx):
_, r = self._pairs[idx].channel.unary_unary(
_SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
self.assertEqual(r.code(), grpc.StatusCode.OK)
def _send_failed_unary_unary(self, idx):
try:
self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
_REQUEST)
except grpc.RpcError:
return
else:
self.fail("This call supposed to fail")
def _send_successful_stream_stream(self, idx):
response_iterator = self._pairs[idx].channel.stream_stream(
_SUCCESSFUL_STREAM_STREAM).__call__(
iter([_REQUEST] * test_constants.STREAM_LENGTH))
cnt = 0
for _ in response_iterator:
cnt += 1
self.assertEqual(cnt, test_constants.STREAM_LENGTH)
def _get_channel_id(self, idx):
"""Channel id may not be consecutive"""
resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
self.assertGreater(len(resp.channel), idx)
return resp.channel[idx].ref.channel_id
def setUp(self):
self._pairs = []
# This server is for Channelz info fetching only
# It self should not enable Channelz
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
options=_DISABLE_REUSE_PORT +
_DISABLE_CHANNELZ)
port = self._server.add_insecure_port('[::]:0')
channelz.add_channelz_servicer(self._server)
self._server.start()
# This channel is used to fetch Channelz info only
# Channelz should not be enabled
self._channel = grpc.insecure_channel('localhost:%d' % port,
_DISABLE_CHANNELZ)
self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
def tearDown(self):
self._server.stop(None)
self._channel.close()
_close_channel_server_pairs(self._pairs)
def test_get_top_channels_basic(self):
self._pairs = _generate_channel_server_pairs(1)
resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
self.assertEqual(len(resp.channel), 1)
self.assertEqual(resp.end, True)
def test_get_top_channels_high_start_id(self):
self._pairs = _generate_channel_server_pairs(1)
resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
self.assertEqual(len(resp.channel), 0)
self.assertEqual(resp.end, True)
def test_successful_request(self):
self._pairs = _generate_channel_server_pairs(1)
self._send_successful_unary_unary(0)
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
self.assertEqual(resp.channel.data.calls_started, 1)
self.assertEqual(resp.channel.data.calls_succeeded, 1)
self.assertEqual(resp.channel.data.calls_failed, 0)
def test_failed_request(self):
self._pairs = _generate_channel_server_pairs(1)
self._send_failed_unary_unary(0)
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
self.assertEqual(resp.channel.data.calls_started, 1)
self.assertEqual(resp.channel.data.calls_succeeded, 0)
self.assertEqual(resp.channel.data.calls_failed, 1)
def test_many_requests(self):
self._pairs = _generate_channel_server_pairs(1)
k_success = 7
k_failed = 9
for i in range(k_success):
self._send_successful_unary_unary(0)
for i in range(k_failed):
self._send_failed_unary_unary(0)
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
self.assertEqual(resp.channel.data.calls_succeeded, k_success)
self.assertEqual(resp.channel.data.calls_failed, k_failed)
def test_many_channel(self):
k_channels = 4
self._pairs = _generate_channel_server_pairs(k_channels)
resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
self.assertEqual(len(resp.channel), k_channels)
def test_many_requests_many_channel(self):
k_channels = 4
self._pairs = _generate_channel_server_pairs(k_channels)
k_success = 11
k_failed = 13
for i in range(k_success):
self._send_successful_unary_unary(0)
self._send_successful_unary_unary(2)
for i in range(k_failed):
self._send_failed_unary_unary(1)
self._send_failed_unary_unary(2)
# The first channel saw only successes
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
self.assertEqual(resp.channel.data.calls_started, k_success)
self.assertEqual(resp.channel.data.calls_succeeded, k_success)
self.assertEqual(resp.channel.data.calls_failed, 0)
# The second channel saw only failures
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)))
self.assertEqual(resp.channel.data.calls_started, k_failed)
self.assertEqual(resp.channel.data.calls_succeeded, 0)
self.assertEqual(resp.channel.data.calls_failed, k_failed)
# The third channel saw both successes and failures
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)))
self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
self.assertEqual(resp.channel.data.calls_succeeded, k_success)
self.assertEqual(resp.channel.data.calls_failed, k_failed)
# The fourth channel saw nothing
resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)))
self.assertEqual(resp.channel.data.calls_started, 0)
self.assertEqual(resp.channel.data.calls_succeeded, 0)
self.assertEqual(resp.channel.data.calls_failed, 0)
def test_many_subchannels(self):
k_channels = 4
self._pairs = _generate_channel_server_pairs(k_channels)
k_success = 17
k_failed = 19
for i in range(k_success):
self._send_successful_unary_unary(0)
self._send_successful_unary_unary(2)
for i in range(k_failed):
self._send_failed_unary_unary(1)
self._send_failed_unary_unary(2)
gtc_resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
self.assertEqual(len(gtc_resp.channel), k_channels)
for i in range(k_channels):
# If no call performed in the channel, there shouldn't be any subchannel
if gtc_resp.channel[i].data.calls_started == 0:
self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
continue
# Otherwise, the subchannel should exist
self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
gsc_resp = self._channelz_stub.GetSubchannel(
channelz_pb2.GetSubchannelRequest(
subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
subchannel_id))
self.assertEqual(gtc_resp.channel[i].data.calls_started,
gsc_resp.subchannel.data.calls_started)
self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
gsc_resp.subchannel.data.calls_succeeded)
self.assertEqual(gtc_resp.channel[i].data.calls_failed,
gsc_resp.subchannel.data.calls_failed)
def test_server_basic(self):
self._pairs = _generate_channel_server_pairs(1)
resp = self._channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
self.assertEqual(len(resp.server), 1)
def test_get_one_server(self):
self._pairs = _generate_channel_server_pairs(1)
gss_resp = self._channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
self.assertEqual(len(gss_resp.server), 1)
gs_resp = self._channelz_stub.GetServer(
channelz_pb2.GetServerRequest(
server_id=gss_resp.server[0].ref.server_id))
self.assertEqual(gss_resp.server[0].ref.server_id,
gs_resp.server.ref.server_id)
def test_server_call(self):
self._pairs = _generate_channel_server_pairs(1)
k_success = 23
k_failed = 29
for i in range(k_success):
self._send_successful_unary_unary(0)
for i in range(k_failed):
self._send_failed_unary_unary(0)
resp = self._channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
self.assertEqual(len(resp.server), 1)
self.assertEqual(resp.server[0].data.calls_started,
k_success + k_failed)
self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
self.assertEqual(resp.server[0].data.calls_failed, k_failed)
def test_many_subchannels_and_sockets(self):
k_channels = 4
self._pairs = _generate_channel_server_pairs(k_channels)
k_success = 3
k_failed = 5
for i in range(k_success):
self._send_successful_unary_unary(0)
self._send_successful_unary_unary(2)
for i in range(k_failed):
self._send_failed_unary_unary(1)
self._send_failed_unary_unary(2)
gtc_resp = self._channelz_stub.GetTopChannels(
channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
self.assertEqual(len(gtc_resp.channel), k_channels)
for i in range(k_channels):
# If no call performed in the channel, there shouldn't be any subchannel
if gtc_resp.channel[i].data.calls_started == 0:
self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
continue
# Otherwise, the subchannel should exist
self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
gsc_resp = self._channelz_stub.GetSubchannel(
channelz_pb2.GetSubchannelRequest(
subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
subchannel_id))
self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
gs_resp = self._channelz_stub.GetSocket(
channelz_pb2.GetSocketRequest(
socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
self.assertEqual(gsc_resp.subchannel.data.calls_started,
gs_resp.socket.data.streams_started)
self.assertEqual(gsc_resp.subchannel.data.calls_started,
gs_resp.socket.data.streams_succeeded)
# Calls started == messages sent, only valid for unary calls
self.assertEqual(gsc_resp.subchannel.data.calls_started,
gs_resp.socket.data.messages_sent)
# Only receive responses when the RPC was successful
self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
gs_resp.socket.data.messages_received)
if gs_resp.socket.remote.HasField("tcpip_address"):
address = gs_resp.socket.remote.tcpip_address.ip_address
self.assertTrue(
len(address) == 4 or len(address) == 16, address)
if gs_resp.socket.local.HasField("tcpip_address"):
address = gs_resp.socket.local.tcpip_address.ip_address
self.assertTrue(
len(address) == 4 or len(address) == 16, address)
def test_streaming_rpc(self):
self._pairs = _generate_channel_server_pairs(1)
# In C++, the argument for _send_successful_stream_stream is message length.
# Here the argument is still channel idx, to be consistent with the other two.
self._send_successful_stream_stream(0)
gc_resp = self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
self.assertEqual(gc_resp.channel.data.calls_started, 1)
self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
self.assertEqual(gc_resp.channel.data.calls_failed, 0)
# Subchannel exists
self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
gsc_resp = self._channelz_stub.GetSubchannel(
channelz_pb2.GetSubchannelRequest(
subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
# Socket exists
self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
gs_resp = self._channelz_stub.GetSocket(
channelz_pb2.GetSocketRequest(
socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
self.assertEqual(gs_resp.socket.data.streams_started, 1)
self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
self.assertEqual(gs_resp.socket.data.streams_failed, 0)
self.assertEqual(gs_resp.socket.data.messages_sent,
test_constants.STREAM_LENGTH)
self.assertEqual(gs_resp.socket.data.messages_received,
test_constants.STREAM_LENGTH)
def test_server_sockets(self):
self._pairs = _generate_channel_server_pairs(1)
self._send_successful_unary_unary(0)
self._send_failed_unary_unary(0)
gs_resp = self._channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
self.assertEqual(len(gs_resp.server), 1)
self.assertEqual(gs_resp.server[0].data.calls_started, 2)
self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
gss_resp = self._channelz_stub.GetServerSockets(
channelz_pb2.GetServerSocketsRequest(
server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
# If the RPC call failed, it will raise a grpc.RpcError
# So, if there is no exception raised, considered pass
def test_server_listen_sockets(self):
self._pairs = _generate_channel_server_pairs(1)
gss_resp = self._channelz_stub.GetServers(
channelz_pb2.GetServersRequest(start_server_id=0))
self.assertEqual(len(gss_resp.server), 1)
self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
gs_resp = self._channelz_stub.GetSocket(
channelz_pb2.GetSocketRequest(
socket_id=gss_resp.server[0].listen_socket[0].socket_id))
# If the RPC call failed, it will raise a grpc.RpcError
# So, if there is no exception raised, considered pass
def test_invalid_query_get_server(self):
try:
self._channelz_stub.GetServer(
channelz_pb2.GetServerRequest(server_id=10000))
except BaseException as e:
self.assertIn('StatusCode.NOT_FOUND', str(e))
else:
self.fail('Invalid query not detected')
def test_invalid_query_get_channel(self):
try:
self._channelz_stub.GetChannel(
channelz_pb2.GetChannelRequest(channel_id=10000))
except BaseException as e:
self.assertIn('StatusCode.NOT_FOUND', str(e))
else:
self.fail('Invalid query not detected')
def test_invalid_query_get_subchannel(self):
try:
self._channelz_stub.GetSubchannel(
channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
except BaseException as e:
self.assertIn('StatusCode.NOT_FOUND', str(e))
else:
self.fail('Invalid query not detected')
def test_invalid_query_get_socket(self):
try:
self._channelz_stub.GetSocket(
channelz_pb2.GetSocketRequest(socket_id=10000))
except BaseException as e:
self.assertIn('StatusCode.NOT_FOUND', str(e))
else:
self.fail('Invalid query not detected')
def test_invalid_query_get_server_sockets(self):
try:
self._channelz_stub.GetServerSockets(
channelz_pb2.GetServerSocketsRequest(
server_id=10000,
start_socket_id=0,
))
except BaseException as e:
self.assertIn('StatusCode.NOT_FOUND', str(e))
else:
self.fail('Invalid query not detected')
if __name__ == '__main__':
unittest.main(verbosity=2)