Add CSDS xDS interop test (#26007)

* Add CSDS xDS interop test

* Add CSDS test to the test suite

* Fix a typo

* Address comments

* Improve the logging of each attempt

* Improve Python readability
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
index 2ca9084..6fb22b8 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_python_test_in_docker.sh
@@ -26,7 +26,7 @@
 virtualenv "$VIRTUAL_ENV" -p python3
 PYTHON="$VIRTUAL_ENV"/bin/python
 "$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
 
 # Prepare generated Python code.
 TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
index ae28ead..8825719 100755
--- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh
@@ -26,7 +26,7 @@
 virtualenv "$VIRTUAL_ENV" -p python3
 PYTHON="$VIRTUAL_ENV"/bin/python
 "$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
 
 # Prepare generated Python code.
 TOOLS_DIR=tools/run_tests
@@ -67,7 +67,7 @@
 # they are added into "all".
 GRPC_VERBOSITY=debug GRPC_TRACE=xds_client,xds_resolver,xds_cluster_manager_lb,cds_lb,xds_cluster_resolver_lb,priority_lb,xds_cluster_impl_lb,weighted_target_lb "$PYTHON" \
   tools/run_tests/run_xds_tests.py \
-    --test_case="all,circuit_breaking,timeout,fault_injection" \
+    --test_case="all,circuit_breaking,timeout,fault_injection,csds" \
     --project_id=grpc-testing \
     --project_num=830293263384 \
     --source_image=projects/grpc-testing/global/images/xds-test-server-4 \
diff --git a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
index 9d45056..21b0d4e 100755
--- a/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_csharp_test_in_docker.sh
@@ -26,7 +26,7 @@
 virtualenv "$VIRTUAL_ENV" -p python3
 PYTHON="$VIRTUAL_ENV"/bin/python
 "$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
 
 # Prepare generated Python code.
 TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
index c877b86..9d87920 100755
--- a/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_php_test_in_docker.sh
@@ -26,7 +26,7 @@
 virtualenv "$VIRTUAL_ENV" -p python3
 PYTHON="$VIRTUAL_ENV"/bin/python
 "$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
 
 # Prepare generated Python code.
 TOOLS_DIR=tools/run_tests
diff --git a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
index 5715b15..6c76f92 100755
--- a/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
+++ b/tools/internal_ci/linux/grpc_xds_ruby_test_in_docker.sh
@@ -26,7 +26,7 @@
 virtualenv "$VIRTUAL_ENV" -p python3
 PYTHON="$VIRTUAL_ENV"/bin/python
 "$PYTHON" -m pip install --upgrade pip==19.3.1
-"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client
+"$PYTHON" -m pip install --upgrade grpcio-tools google-api-python-client google-auth-httplib2 oauth2client xds-protos
 
 # Prepare generated Python code.
 TOOLS_DIR=tools/run_tests
diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py
index e55cdf2..4b2331c 100755
--- a/tools/run_tests/run_xds_tests.py
+++ b/tools/run_tests/run_xds_tests.py
@@ -15,6 +15,7 @@
 """Run xDS integration tests on GCP using Traffic Director."""
 
 import argparse
+import datetime
 import googleapiclient.discovery
 import grpc
 import json
@@ -41,6 +42,15 @@
 from src.proto.grpc.testing import messages_pb2
 from src.proto.grpc.testing import test_pb2_grpc
 
+# Envoy protos provided by PyPI package xds-protos
+# Needs to import the generated Python file to load descriptors
+from envoy.service.status.v3 import csds_pb2
+from envoy.service.status.v3 import csds_pb2_grpc
+from envoy.extensions.filters.network.http_connection_manager.v3 import http_connection_manager_pb2
+from envoy.extensions.filters.common.fault.v3 import fault_pb2
+from envoy.extensions.filters.http.fault.v3 import fault_pb2
+from envoy.extensions.filters.http.router.v3 import router_pb2
+
 logger = logging.getLogger()
 console_handler = logging.StreamHandler()
 formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
@@ -79,10 +89,11 @@
     'circuit_breaking',
     'timeout',
     'fault_injection',
+    'csds',
 ]
 
 # Test cases that require the V3 API.  Skipped in older runs.
-_V3_TEST_CASES = frozenset(['timeout', 'fault_injection'])
+_V3_TEST_CASES = frozenset(['timeout', 'fault_injection', 'csds'])
 
 # Test cases that require the alpha API.  Skipped for stable API runs.
 _ALPHA_TEST_CASES = frozenset(['timeout'])
@@ -363,6 +374,32 @@
             return response
 
 
+def get_client_xds_config_dump():
+    if CLIENT_HOSTS:
+        hosts = CLIENT_HOSTS
+    else:
+        hosts = ['localhost']
+    for host in hosts:
+        server_address = '%s:%d' % (host, args.stats_port)
+        with grpc.insecure_channel(server_address) as channel:
+            stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(channel)
+            logger.debug('Fetching xDS config dump from %s', server_address)
+            response = stub.FetchClientStatus(csds_pb2.ClientStatusRequest(),
+                                              wait_for_ready=True,
+                                              timeout=_CONNECTION_TIMEOUT_SEC)
+            logger.debug('Fetched xDS config dump from %s', server_address)
+            if len(response.config) != 1:
+                logger.error('Unexpected number of ClientConfigs %d: %s',
+                             len(response.config), response)
+                return None
+            else:
+                # Converting the ClientStatusResponse into JSON, because many
+                # fields are packed in google.protobuf.Any. It will require many
+                # duplicated code to unpack proto message and inspect values.
+                return json_format.MessageToDict(
+                    response.config[0], preserving_proto_field_name=True)
+
+
 def configure_client(rpc_types, metadata=[], timeout_sec=None):
     if CLIENT_HOSTS:
         hosts = CLIENT_HOSTS
@@ -1782,6 +1819,92 @@
         set_validate_for_proxyless(gcp, True)
 
 
+def test_csds(gcp, original_backend_service, instance_group, server_uri):
+    test_csds_timeout_s = datetime.timedelta(minutes=5).total_seconds()
+    sleep_interval_between_attempts_s = datetime.timedelta(
+        seconds=2).total_seconds()
+    logger.info('Running test_csds')
+
+    logger.info('waiting for original backends to become healthy')
+    wait_for_healthy_backends(gcp, original_backend_service, instance_group)
+
+    # Test case timeout: 5 minutes
+    deadline = time.time() + test_csds_timeout_s
+    cnt = 0
+    while time.time() <= deadline:
+        client_config = get_client_xds_config_dump()
+        logger.info('test_csds attempt %d: received xDS config %s', cnt,
+                    json.dumps(client_config, indent=2))
+        if client_config is not None:
+            # Got the xDS config dump, now validate it
+            ok = True
+            try:
+                if client_config['node']['locality']['zone'] != args.zone:
+                    logger.info('Invalid zone %s != %s',
+                                client_config['node']['locality']['zone'],
+                                args.zone)
+                    ok = False
+                seen = set()
+                for xds_config in client_config['xds_config']:
+                    if 'listener_config' in xds_config:
+                        listener_name = xds_config['listener_config'][
+                            'dynamic_listeners'][0]['active_state']['listener'][
+                                'name']
+                        if listener_name != server_uri:
+                            logger.info('Invalid Listener name %s != %s',
+                                        listener_name, server_uri)
+                            ok = False
+                        else:
+                            seen.add('lds')
+                    elif 'route_config' in xds_config:
+                        num_vh = len(
+                            xds_config['route_config']['dynamic_route_configs']
+                            [0]['route_config']['virtual_hosts'])
+                        if num_vh <= 0:
+                            logger.info('Invalid number of VirtualHosts %s',
+                                        num_vh)
+                            ok = False
+                        else:
+                            seen.add('rds')
+                    elif 'cluster_config' in xds_config:
+                        cluster_type = xds_config['cluster_config'][
+                            'dynamic_active_clusters'][0]['cluster']['type']
+                        if cluster_type != 'EDS':
+                            logger.info('Invalid cluster type %s != EDS',
+                                        cluster_type)
+                            ok = False
+                        else:
+                            seen.add('cds')
+                    elif 'endpoint_config' in xds_config:
+                        sub_zone = xds_config["endpoint_config"][
+                            "dynamic_endpoint_configs"][0]["endpoint_config"][
+                                "endpoints"][0]["locality"]["sub_zone"]
+                        if args.zone not in sub_zone:
+                            logger.info('Invalid endpoint sub_zone %s',
+                                        sub_zone)
+                            ok = False
+                        else:
+                            seen.add('eds')
+                want = {'lds', 'rds', 'cds', 'eds'}
+                if seen == want:
+                    logger.info('Incomplete xDS config dump, seen=%s', seen)
+                    ok = False
+            except:
+                logger.exception('Error in xDS config dump:')
+                ok = False
+            finally:
+                if ok:
+                    # Successfully fetched xDS config, and they looks good.
+                    logger.info('success')
+                    return
+        logger.info('test_csds attempt %d failed', cnt)
+        # Give the client some time to fetch xDS resources
+        time.sleep(sleep_interval_between_attempts_s)
+        cnt += 1
+
+    raise RuntimeError('failed to receive valid xDS config')
+
+
 def set_validate_for_proxyless(gcp, validate_for_proxyless):
     if not gcp.alpha_compute:
         logger.debug(
@@ -1838,7 +1961,7 @@
 
 def get_startup_script(path_to_server_binary, service_port):
     if path_to_server_binary:
-        return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
+        return 'nohup %s --port=%d 1>/dev/null &' % (path_to_server_binary,
                                                      service_port)
     else:
         return """#!/bin/bash
@@ -2737,6 +2860,8 @@
                     test_timeout(gcp, backend_service, instance_group)
                 elif test_case == 'fault_injection':
                     test_fault_injection(gcp, backend_service, instance_group)
+                elif test_case == 'csds':
+                    test_csds(gcp, backend_service, instance_group, server_uri)
                 else:
                     logger.error('Unknown test case: %s', test_case)
                     sys.exit(1)