Add CSDS xDS interop test (#26007)
* Add CSDS xDS interop test
* Add CSDS test to the test suite
* Fix a typo
* Address comments
* Improve the logging of each attempt
* Improve Python readability
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
index 2ca9084..6fb22b8 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
@@ -26,7 +26,7 @@
virtualenv "$VIRTUAL_ENV" -p python3
PYTHON="$VIRTUAL_ENV"/bin/python
"$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
# Prepare generated Python code.
TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
index ae28ead..8825719 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
@@ -26,7 +26,7 @@
virtualenv "$VIRTUAL_ENV" -p python3
PYTHON="$VIRTUAL_ENV"/bin/python
"$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
# Prepare generated Python code.
TOOLS_DIR=tools/run_tests
@@ -67,7 +67,7 @@
# they are added into "all".
GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
tools/run_tests/run_xds_tests.py \
- --test_case="all,circuit_breaking,timeout,fault_injection" \
+ --test_case="all,circuit_breaking,timeout,fault_injection,csds" \
--project_id=grpc-testing \
--project_num=830293263384 \
--source_image=projects/grpc-testing/global/images/xds-test-server-4 \
diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
index 9d45056..21b0d4e 100755
--- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
@@ -26,7 +26,7 @@
virtualenv "$VIRTUAL_ENV" -p python3
PYTHON="$VIRTUAL_ENV"/bin/python
"$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
# Prepare generated Python code.
TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
index c877b86..9d87920 100755
--- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
@@ -26,7 +26,7 @@
virtualenv "$VIRTUAL_ENV" -p python3
PYTHON="$VIRTUAL_ENV"/bin/python
"$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
# Prepare generated Python code.
TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
index 5715b15..6c76f92 100755
--- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
@@ -26,7 +26,7 @@
virtualenv "$VIRTUAL_ENV" -p python3
PYTHON="$VIRTUAL_ENV"/bin/python
"$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
# Prepare generated Python code.
TOOLS_DIR=tools/run_tests
diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py
index e55cdf2..4b2331c 100755
--- a/tools/run_tests/run_xds_tests.py
+++ b/tools/run_tests/run_xds_tests.py
@@ -15,6 +15,7 @@
"""Run xDS integration tests on GCP using Traffic Director."""
import argparse
+import datetime
import googleapiclient.discovery
import grpc
import json
@@ -41,6 +42,15 @@
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
+# Envoy protos provided by PyPI package xds-protos
+# Needs to import the generated Python file to load descriptors
+from envoy.service.status.v3 import csds_pb2
+from envoy.service.status.v3 import csds_pb2_grpc
+from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2
+from envoy.extensions.filters.common.fault.v3 import fault_pb2
+from envoy.extensions.filters.http.fault.v3 import fault_pb2
+from envoy.extensions.filters.http.router.v3 import router_pb2
+
logger = logging.getLogger()
console_handler = logging.StreamHandler()
formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
@@ -79,10 +89,11 @@
'circuit_breaking',
'timeout',
'fault_injection',
+ 'csds',
]
# Test cases that require the V3 API. Skipped in older runs.
-_V3_TEST_CASES = frozenset(['timeout', 'fault_injection'])
+_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
# Test cases that require the alpha API. Skipped for stable API runs.
_ALPHA_TEST_CASES = frozenset(['timeout'])
@@ -363,6 +374,32 @@
return response
+def get_client_xds_config_dump():
+ if CLIENT_HOSTS:
+ hosts = CLIENT_HOSTS
+ else:
+ hosts = ['localhost']
+ for host in hosts:
+ server_address = '%s:%d' % (host, args.stats_port)
+ with grpc.insecure_channel(server_address) as channel:
+ stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
+ logger.debug('Fetching xDS config dump from %s', server_address)
+ response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
+ wait_for_ready=True,
+ timeout=_CONNECTION_TIMEOUT_SEC)
+ logger.debug('Fetched xDS config dump from %s', server_address)
+ if len(response.config) != 1:
+ logger.error('Unexpected number of ClientConfigs %d: %s',
+ len(response.config), response)
+ return None
+ else:
+ # Converting the ClientStatusResponse into JSON, because many
+ # fields are packed in google.protobuf.Any. It will require many
+ # duplicated code to unpack proto message and inspect values.
+ return json_format.MessageToDict(
+ response.config[0], preserving_proto_field_name=True)
+
+
def configure_client(rpc_types, metadata=[], timeout_sec=None):
if CLIENT_HOSTS:
hosts = CLIENT_HOSTS
@@ -1782,6 +1819,92 @@
set_validate_for_proxyless(gcp, True)
+def test_csds(gcp, original_backend_service, instance_group, server_uri):
+ test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
+ sleep_interval_between_attempts_s = datetime.timedelta(
+ seconds=2).total_seconds()
+ logger.info('Running test_csds')
+
+ logger.info('waiting for original backends to become healthy')
+ wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+
+ # Test case timeout: 5 minutes
+ deadline = time.time() + test_csds_timeout_s
+ cnt = 0
+ while time.time() <= deadline:
+ client_config = get_client_xds_config_dump()
+ logger.info('test_csds attempt %d: received xDS config %s', cnt,
+ json.dumps(client_config, indent=2))
+ if client_config is not None:
+ # Got the xDS config dump, now validate it
+ ok = True
+ try:
+ if client_config['node']['locality']['zone'] != args.zone:
+ logger.info('Invalid zone %s != %s',
+ client_config['node']['locality']['zone'],
+ args.zone)
+ ok = False
+ seen = set()
+ for xds_config in client_config['xds_config']:
+ if 'listener_config' in xds_config:
+ listener_name = xds_config['listener_config'][
+ 'dynamic_listeners'][0]['active_state']['listener'][
+ 'name']
+ if listener_name != server_uri:
+ logger.info('Invalid Listener name %s != %s',
+ listener_name, server_uri)
+ ok = False
+ else:
+ seen.add('lds')
+ elif 'route_config' in xds_config:
+ num_vh = len(
+ xds_config['route_config']['dynamic_route_configs']
+ [0]['route_config']['virtual_hosts'])
+ if num_vh <= 0:
+ logger.info('Invalid number of VirtualHosts %s',
+ num_vh)
+ ok = False
+ else:
+ seen.add('rds')
+ elif 'cluster_config' in xds_config:
+ cluster_type = xds_config['cluster_config'][
+ 'dynamic_active_clusters'][0]['cluster']['type']
+ if cluster_type != 'EDS':
+ logger.info('Invalid cluster type %s != EDS',
+ cluster_type)
+ ok = False
+ else:
+ seen.add('cds')
+ elif 'endpoint_config' in xds_config:
+ sub_zone = xds_config["endpoint_config"][
+ "dynamic_endpoint_configs"][0]["endpoint_config"][
+ "endpoints"][0]["locality"]["sub_zone"]
+ if args.zone not in sub_zone:
+ logger.info('Invalid endpoint sub_zone %s',
+ sub_zone)
+ ok = False
+ else:
+ seen.add('eds')
+ want = {'lds', 'rds', 'cds', 'eds'}
+ if seen == want:
+ logger.info('Incomplete xDS config dump, seen=%s', seen)
+ ok = False
+ except:
+ logger.exception('Error in xDS config dump:')
+ ok = False
+ finally:
+ if ok:
+ # Successfully fetched xDS config, and they looks good.
+ logger.info('success')
+ return
+ logger.info('test_csds attempt %d failed', cnt)
+ # Give the client some time to fetch xDS resources
+ time.sleep(sleep_interval_between_attempts_s)
+ cnt += 1
+
+ raise RuntimeError('failed to receive valid xDS config')
+
+
def set_validate_for_proxyless(gcp, validate_for_proxyless):
if not gcp.alpha_compute:
logger.debug(
@@ -1838,7 +1961,7 @@
def get_startup_script(path_to_server_binary, service_port):
if path_to_server_binary:
- return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
+ return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
service_port)
else:
return """#!/bin/bash
@@ -2737,6 +2860,8 @@
test_timeout(gcp, backend_service, instance_group)
elif test_case == 'fault_injection':
test_fault_injection(gcp, backend_service, instance_group)
+ elif test_case == 'csds':
+ test_csds(gcp, backend_service, instance_group, server_uri)
else:
logger.error('Unknown test case: %s', test_case)
sys.exit(1)