blob: 1504ac6025ba030371185b772401e57e0a580515 [file] [log] [blame]
# Copyright 2021 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Tuple
from absl import flags
from absl.testing import absltest
import grpc
from framework import xds_url_map_testcase
from framework.helpers import skips
from framework.test_app import client_app
# Type aliases
HostRule = xds_url_map_testcase.HostRule
PathMatcher = xds_url_map_testcase.PathMatcher
GcpResourceManager = xds_url_map_testcase.GcpResourceManager
DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
XdsTestClient = client_app.XdsTestClient
ExpectedResult = xds_url_map_testcase.ExpectedResult
_Lang = skips.Lang
logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_url_map_testcase)
# The first batch of RPCs don't count towards the result of test case. They are
# meant to prove the communication between driver and client is fine.
_NUM_RPCS = 10
_LENGTH_OF_RPC_SENDING_SEC = 16
# We are using sleep to synchronize test driver and the client... Even though
# the client is sending at QPS rate, we can't assert that exactly QPS *
# SLEEP_DURATION number of RPC is finished. The final completed RPC might be
# slightly more or less.
_NON_RANDOM_ERROR_TOLERANCE = 0.01
# For random generator involved test cases, we want to be more loose about the
# final result. Otherwise, we will need more test duration (sleep duration) and
# more accurate communication mechanism. The accurate of random number
# generation is not the intention of this test.
_ERROR_TOLERANCE = 0.2
_DELAY_CASE_APPLICATION_TIMEOUT_SEC = 1
_BACKLOG_WAIT_TIME_SEC = 20
def _build_fault_injection_route_rule(abort_percentage: int = 0,
delay_percentage: int = 0):
return {
'priority': 0,
'matchRules': [{
'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
}],
'service': GcpResourceManager().default_backend_service(),
'routeAction': {
'faultInjectionPolicy': {
'abort': {
'httpStatus': 401,
'percentage': abort_percentage,
},
'delay': {
'fixedDelay': {
'seconds': '20'
},
'percentage': delay_percentage,
}
}
},
}
def _wait_until_backlog_cleared(test_client: XdsTestClient,
timeout: int = _BACKLOG_WAIT_TIME_SEC):
""" Wait until the completed RPC is close to started RPC.
For delay injected test cases, there might be a backlog of RPCs due to slow
initialization of the client. E.g., if initialization took 20s and qps is
25, then there will be a backlog of 500 RPCs. In normal test cases, this is
fine, because RPCs will fail immediately. But for delay injected test cases,
the RPC might linger much longer and affect the stability of test results.
"""
logger.info('Waiting for RPC backlog to clear for %d seconds', timeout)
deadline = time.time() + timeout
while time.time() < deadline:
stats = test_client.get_load_balancer_accumulated_stats()
ok = True
for rpc_type in [RpcTypeUnaryCall, RpcTypeEmptyCall]:
started = stats.num_rpcs_started_by_method.get(rpc_type, 0)
completed = stats.num_rpcs_succeeded_by_method.get(
rpc_type, 0) + stats.num_rpcs_failed_by_method.get(rpc_type, 0)
# We consider the backlog is healthy, if the diff between started
# RPCs and completed RPCs is less than 1.5 QPS.
if abs(started - completed) > xds_url_map_testcase.QPS.value * 1.1:
logger.info(
'RPC backlog exist: rpc_type=%s started=%s completed=%s',
rpc_type, started, completed)
time.sleep(_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
ok = False
else:
logger.info(
'RPC backlog clear: rpc_type=%s started=%s completed=%s',
rpc_type, started, completed)
if ok:
# Both backlog of both types of RPCs is clear, success, return.
return
raise RuntimeError('failed to clear RPC backlog in %s seconds' % timeout)
def _is_supported(config: skips.TestConfig) -> bool:
if config.client_lang == _Lang.NODE:
return not config.version_lt('v1.4.x')
return True
class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(
0, filter_config['delay']['percentage'].get('numerator', 0))
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(
0, filter_config['abort']['percentage'].get('numerator', 0))
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.configure_and_send(test_client,
rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.OK,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestNonMatchingFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):
"""EMPTY_CALL is not fault injected, so it should succeed."""
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def client_init_config(rpc: str, metadata: str):
# Python interop client will stuck if the traffic is slow (in this case,
# 20s injected). The purpose of this test is examining the un-injected
# traffic is not impacted, so it's fine to just send un-injected
# traffic.
return 'EmptyCall', metadata
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=100,
delay_percentage=100)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
# The first route rule for UNARY_CALL is fault injected
self.assertEqual(
"/grpc.testing.TestService/UnaryCall",
xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'])
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(1000000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(1000000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
# The second route rule for all other RPCs is untouched
self.assertNotIn(
'envoy.filters.http.fault',
xds_config.rds['virtualHosts'][0]['routes'][1].get(
'typedPerFilterConfig', {}))
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.assertRpcStatusCode(test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeEmptyCall,
status_code=grpc.StatusCode.OK,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
@absltest.skip('20% RPC might pass immediately, reason unknown')
class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=100)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(1000000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.configure_and_send(test_client,
rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=100,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(1000000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.configure_and_send(test_client,
rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.UNAUTHENTICATED,
ratio=1),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_NON_RANDOM_ERROR_TOLERANCE)
class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=0,
delay_percentage=50)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual('20s', filter_config['delay']['fixedDelay'])
self.assertEqual(500000,
filter_config['delay']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['delay']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.configure_and_send(test_client,
rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS,
app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)
_wait_until_backlog_cleared(test_client)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.DEADLINE_EXCEEDED,
ratio=0.5),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase):
@staticmethod
def is_supported(config: skips.TestConfig) -> bool:
return _is_supported(config)
@staticmethod
def url_map_change(
host_rule: HostRule,
path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
path_matcher["routeRules"] = [
_build_fault_injection_route_rule(abort_percentage=50,
delay_percentage=0)
]
return host_rule, path_matcher
def xds_config_validate(self, xds_config: DumpedXdsConfig):
self.assertNumEndpoints(xds_config, 1)
filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][
'typedPerFilterConfig']['envoy.filters.http.fault']
self.assertEqual(401, filter_config['abort']['httpStatus'])
self.assertEqual(500000,
filter_config['abort']['percentage']['numerator'])
self.assertEqual('MILLION',
filter_config['abort']['percentage']['denominator'])
def rpc_distribution_validate(self, test_client: XdsTestClient):
self.configure_and_send(test_client,
rpc_types=(RpcTypeUnaryCall,),
num_rpcs=_NUM_RPCS)
self.assertRpcStatusCode(
test_client,
expected=(ExpectedResult(
rpc_type=RpcTypeUnaryCall,
status_code=grpc.StatusCode.UNAUTHENTICATED,
ratio=0.5),),
length=_LENGTH_OF_RPC_SENDING_SEC,
tolerance=_ERROR_TOLERANCE)
if __name__ == '__main__':
absltest.main()