xds: add CsdsService
diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
index 9470b67..4001345 100644
--- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
@@ -196,6 +196,11 @@
   }
 
   @Override
+  Node getNode() {
+    return node;
+  }
+
+  @Override
   public String toString() {
     return logId.toString();
   }
@@ -313,11 +318,9 @@
     stopwatch.reset().start();
   }
 
-  /**
-   * Returns the latest accepted version of the given resource type.
-   */
   // Must be synchronized.
-  private String getCurrentVersion(ResourceType type) {
+  @Override
+  String getCurrentVersion(ResourceType type) {
     String version;
     switch (type) {
       case LDS:
diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
index b30048b..b76422d 100644
--- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
@@ -1174,6 +1174,7 @@
     return resources.isEmpty() ? null : resources.keySet();
   }
 
+  @Override
   Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
     Map<String, ResourceMetadata> metadataMap = new HashMap<>();
     for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
diff --git a/xds/src/main/java/io/grpc/xds/CsdsService.java b/xds/src/main/java/io/grpc/xds/CsdsService.java
new file mode 100644
index 0000000..5f3f4ba
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/CsdsService.java
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.util.Timestamps;
+import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
+import io.envoyproxy.envoy.admin.v3.ClustersConfigDump;
+import io.envoyproxy.envoy.admin.v3.ClustersConfigDump.DynamicCluster;
+import io.envoyproxy.envoy.admin.v3.EndpointsConfigDump;
+import io.envoyproxy.envoy.admin.v3.EndpointsConfigDump.DynamicEndpointConfig;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump.DynamicListener;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump.DynamicListenerState;
+import io.envoyproxy.envoy.admin.v3.RoutesConfigDump;
+import io.envoyproxy.envoy.admin.v3.RoutesConfigDump.DynamicRouteConfig;
+import io.envoyproxy.envoy.service.status.v3.ClientConfig;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusDiscoveryServiceGrpc;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
+import io.envoyproxy.envoy.service.status.v3.PerXdsConfig;
+import io.grpc.ExperimentalApi;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.internal.ObjectPool;
+import io.grpc.stub.StreamObserver;
+import io.grpc.xds.AbstractXdsClient.ResourceType;
+import io.grpc.xds.XdsClient.ResourceMetadata;
+import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
+import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
+import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The CSDS service provides information about the status of a running xDS client.
+ *
+ * <p><a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto">
+ * Client Status Discovery Service</a> is a service that exposes xDS config of a given client. See
+ * the full design at <a href="https://github.com/grpc/proposal/blob/master/A40-csds-support.md">
+ * gRFC A40: xDS Configuration Dump via Client Status Discovery Service in gRPC</a>.
+ *
+ * @since 1.37.0
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8016")
+public final class CsdsService extends
+    ClientStatusDiscoveryServiceGrpc.ClientStatusDiscoveryServiceImplBase {
+  private static final Logger logger = Logger.getLogger(CsdsService.class.getName());
+  private final XdsClientPoolFactory xdsClientPoolFactory;
+
+  @VisibleForTesting
+  CsdsService(XdsClientPoolFactory xdsClientPoolFactory) {
+    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolProvider");
+  }
+
+  private CsdsService() {
+    this(SharedXdsClientPoolProvider.getDefaultProvider());
+  }
+
+  /** Creates an instance. */
+  public static CsdsService newInstance() {
+    return new CsdsService();
+  }
+
+  @Override
+  public void fetchClientStatus(
+      ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
+    if (handleRequest(request, responseObserver)) {
+      responseObserver.onCompleted();
+    }
+  }
+
+  @Override
+  public StreamObserver<ClientStatusRequest> streamClientStatus(
+      final StreamObserver<ClientStatusResponse> responseObserver) {
+    return new StreamObserver<ClientStatusRequest>() {
+      @Override
+      public void onNext(ClientStatusRequest request) {
+        handleRequest(request, responseObserver);
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        onCompleted();
+      }
+
+      @Override
+      public void onCompleted() {
+        responseObserver.onCompleted();
+      }
+    };
+  }
+
+  private boolean handleRequest(
+      ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
+    try {
+      responseObserver.onNext(getConfigDumpForRequest(request));
+      return true;
+    } catch (StatusException e) {
+      responseObserver.onError(e);
+    } catch (Exception e) {
+      logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
+      responseObserver.onError(new StatusException(
+          Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)));
+    }
+    return false;
+  }
+
+  private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
+      throws StatusException {
+    if (request.getNodeMatchersCount() > 0) {
+      throw new StatusException(
+          Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
+    }
+
+    ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
+    if (xdsClientPool == null) {
+      return ClientStatusResponse.getDefaultInstance();
+    }
+
+    XdsClient xdsClient = null;
+    try {
+      xdsClient = xdsClientPool.getObject();
+      return ClientStatusResponse.newBuilder()
+          .addConfig(getClientConfigForXdsClient(xdsClient))
+          .build();
+    } finally {
+      if (xdsClient != null) {
+        xdsClientPool.returnObject(xdsClient);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) {
+    ListenersConfigDump ldsConfig = dumpLdsConfig(
+        xdsClient.getSubscribedResourcesMetadata(ResourceType.LDS),
+        xdsClient.getCurrentVersion(ResourceType.LDS));
+    RoutesConfigDump rdsConfig = dumpRdsConfig(
+        xdsClient.getSubscribedResourcesMetadata(ResourceType.RDS));
+    ClustersConfigDump cdsConfig = dumpCdsConfig(
+        xdsClient.getSubscribedResourcesMetadata(ResourceType.CDS),
+        xdsClient.getCurrentVersion(ResourceType.CDS));
+    EndpointsConfigDump edsConfig = dumpEdsConfig(
+        xdsClient.getSubscribedResourcesMetadata(ResourceType.EDS));
+
+    return ClientConfig.newBuilder()
+        .setNode(xdsClient.getNode().toEnvoyProtoNode())
+        .addXdsConfig(PerXdsConfig.newBuilder().setListenerConfig(ldsConfig))
+        .addXdsConfig(PerXdsConfig.newBuilder().setRouteConfig(rdsConfig))
+        .addXdsConfig(PerXdsConfig.newBuilder().setClusterConfig(cdsConfig))
+        .addXdsConfig(PerXdsConfig.newBuilder().setEndpointConfig(edsConfig))
+        .build();
+  }
+
+  @VisibleForTesting
+  static ListenersConfigDump dumpLdsConfig(
+      Map<String, ResourceMetadata> resourcesMetadata, String version) {
+    ListenersConfigDump.Builder ldsConfig = ListenersConfigDump.newBuilder();
+    for (Map.Entry<String, ResourceMetadata> entry : resourcesMetadata.entrySet()) {
+      ldsConfig.addDynamicListeners(buildDynamicListener(entry.getKey(), entry.getValue()));
+    }
+    return ldsConfig.setVersionInfo(version).build();
+  }
+
+  @VisibleForTesting
+  static DynamicListener buildDynamicListener(String name, ResourceMetadata metadata) {
+    DynamicListener.Builder dynamicListener = DynamicListener.newBuilder()
+        .setName(name)
+        .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()));
+    if (metadata.getErrorState() != null) {
+      dynamicListener.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
+    }
+    DynamicListenerState.Builder dynamicListenerState = DynamicListenerState.newBuilder()
+        .setVersionInfo(metadata.getVersion())
+        .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()));
+    if (metadata.getRawResource() != null) {
+      dynamicListenerState.setListener(metadata.getRawResource());
+    }
+    return dynamicListener.setActiveState(dynamicListenerState).build();
+  }
+
+  @VisibleForTesting
+  static RoutesConfigDump dumpRdsConfig(Map<String, ResourceMetadata> resourcesMetadata) {
+    RoutesConfigDump.Builder rdsConfig = RoutesConfigDump.newBuilder();
+    for (ResourceMetadata metadata : resourcesMetadata.values()) {
+      rdsConfig.addDynamicRouteConfigs(buildDynamicRouteConfig(metadata));
+    }
+    return rdsConfig.build();
+  }
+
+  @VisibleForTesting
+  static DynamicRouteConfig buildDynamicRouteConfig(ResourceMetadata metadata) {
+    DynamicRouteConfig.Builder dynamicRouteConfig = DynamicRouteConfig.newBuilder()
+        .setVersionInfo(metadata.getVersion())
+        .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()))
+        .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()));
+    if (metadata.getErrorState() != null) {
+      dynamicRouteConfig.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
+    }
+    if (metadata.getRawResource() != null) {
+      dynamicRouteConfig.setRouteConfig(metadata.getRawResource());
+    }
+    return dynamicRouteConfig.build();
+  }
+
+  @VisibleForTesting
+  static ClustersConfigDump dumpCdsConfig(
+      Map<String, ResourceMetadata> resourcesMetadata, String version) {
+    ClustersConfigDump.Builder cdsConfig = ClustersConfigDump.newBuilder();
+    for (ResourceMetadata metadata : resourcesMetadata.values()) {
+      cdsConfig.addDynamicActiveClusters(buildDynamicCluster(metadata));
+    }
+    return cdsConfig.setVersionInfo(version).build();
+  }
+
+  @VisibleForTesting
+  static DynamicCluster buildDynamicCluster(ResourceMetadata metadata) {
+    DynamicCluster.Builder dynamicCluster = DynamicCluster.newBuilder()
+        .setVersionInfo(metadata.getVersion())
+        .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()))
+        .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()));
+    if (metadata.getErrorState() != null) {
+      dynamicCluster.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
+    }
+    if (metadata.getRawResource() != null) {
+      dynamicCluster.setCluster(metadata.getRawResource());
+    }
+    return dynamicCluster.build();
+  }
+
+  @VisibleForTesting
+  static EndpointsConfigDump dumpEdsConfig(Map<String, ResourceMetadata> resourcesMetadata) {
+    EndpointsConfigDump.Builder edsConfig = EndpointsConfigDump.newBuilder();
+    for (ResourceMetadata metadata : resourcesMetadata.values()) {
+      edsConfig.addDynamicEndpointConfigs(buildDynamicEndpointConfig(metadata));
+    }
+    return edsConfig.build();
+  }
+
+  @VisibleForTesting
+  static DynamicEndpointConfig buildDynamicEndpointConfig(ResourceMetadata metadata) {
+    DynamicEndpointConfig.Builder dynamicRouteConfig = DynamicEndpointConfig.newBuilder()
+        .setVersionInfo(metadata.getVersion())
+        .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()))
+        .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()));
+    if (metadata.getErrorState() != null) {
+      dynamicRouteConfig.setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
+    }
+    if (metadata.getRawResource() != null) {
+      dynamicRouteConfig.setEndpointConfig(metadata.getRawResource());
+    }
+    return dynamicRouteConfig.build();
+  }
+
+  @VisibleForTesting
+  static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus status) {
+    switch (status) {
+      case UNKNOWN:
+        return ClientResourceStatus.UNKNOWN;
+      case DOES_NOT_EXIST:
+        return ClientResourceStatus.DOES_NOT_EXIST;
+      case REQUESTED:
+        return ClientResourceStatus.REQUESTED;
+      case ACKED:
+        return ClientResourceStatus.ACKED;
+      case NACKED:
+        return ClientResourceStatus.NACKED;
+      default:
+        throw new AssertionError("Unexpected ResourceMetadataStatus: " + status);
+    }
+  }
+
+  private static io.envoyproxy.envoy.admin.v3.UpdateFailureState metadataUpdateFailureStateToProto(
+      UpdateFailureState errorState) {
+    return io.envoyproxy.envoy.admin.v3.UpdateFailureState.newBuilder()
+        .setLastUpdateAttempt(Timestamps.fromNanos(errorState.getFailedUpdateTimeNanos()))
+        .setDetails(errorState.getFailedDetails())
+        .setVersionInfo(errorState.getFailedVersion())
+        .build();
+  }
+}
diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
index 712109c..9f8ae2a 100644
--- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
+++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
@@ -70,7 +70,13 @@
   }
 
   @Override
-  public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
+  @Nullable
+  public ObjectPool<XdsClient> get() {
+    return xdsClientPool;
+  }
+
+  @Override
+  public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
     ObjectPool<XdsClient> ref = xdsClientPool;
     if (ref == null) {
       synchronized (lock) {
diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java
index 9f1798a..791410e 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClient.java
@@ -24,8 +24,10 @@
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Any;
 import io.grpc.Status;
+import io.grpc.xds.AbstractXdsClient.ResourceType;
 import io.grpc.xds.Endpoints.DropOverload;
 import io.grpc.xds.Endpoints.LocalityLbEndpoints;
+import io.grpc.xds.EnvoyProtoData.Node;
 import io.grpc.xds.EnvoyServerProtoData.Listener;
 import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
 import io.grpc.xds.Filter.NamedFilterConfig;
@@ -422,10 +424,10 @@
     }
 
     static ResourceMetadata newResourceMetadataAcked(
-        Any resource, String version, long updateTime) {
-      checkNotNull(resource, "resource");
+        Any rawResource, String version, long updateTimeNanos) {
+      checkNotNull(rawResource, "rawResource");
       return new ResourceMetadata(
-          ResourceMetadataStatus.ACKED, version, updateTime, resource, null);
+          ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null);
     }
 
     static ResourceMetadata newResourceMetadataNacked(
@@ -527,6 +529,24 @@
   }
 
   /**
+   * Returns gRPC representation of {@link io.envoyproxy.envoy.config.core.v3.Node}.
+   */
+  Node getNode() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Returns the latest accepted version of the given resource type.
+   */
+  String getCurrentVersion(ResourceType type) {
+    throw new UnsupportedOperationException();
+  }
+
+  Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
    * Registers a data watcher for the given LDS resource.
    */
   void watchLdsResource(String resourceName, LdsResourceWatcher watcher) {
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
index f0cc0b6..b979d6e 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java
@@ -152,7 +152,7 @@
   public void start(Listener2 listener) {
     this.listener = checkNotNull(listener, "listener");
     try {
-      xdsClientPool = xdsClientPoolFactory.getXdsClientPool();
+      xdsClientPool = xdsClientPoolFactory.getOrCreate();
     } catch (Exception e) {
       listener.onError(
           Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
index bb51455..40aa4f9 100644
--- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
+++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
@@ -79,7 +79,10 @@
   interface XdsClientPoolFactory {
     void setBootstrapOverride(Map<String, ?> bootstrap);
 
-    ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException;
+    @Nullable
+    ObjectPool<XdsClient> get();
+
+    ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException;
   }
 
   /**
diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
new file mode 100644
index 0000000..1d108c7
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
@@ -0,0 +1,891 @@
+/*
+ * Copyright 2021 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.xds.AbstractXdsClient.ResourceType.CDS;
+import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
+import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
+import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.Timestamp;
+import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
+import io.envoyproxy.envoy.admin.v3.ClustersConfigDump;
+import io.envoyproxy.envoy.admin.v3.ClustersConfigDump.DynamicCluster;
+import io.envoyproxy.envoy.admin.v3.EndpointsConfigDump;
+import io.envoyproxy.envoy.admin.v3.EndpointsConfigDump.DynamicEndpointConfig;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump.DynamicListener;
+import io.envoyproxy.envoy.admin.v3.ListenersConfigDump.DynamicListenerState;
+import io.envoyproxy.envoy.admin.v3.RoutesConfigDump;
+import io.envoyproxy.envoy.admin.v3.RoutesConfigDump.DynamicRouteConfig;
+import io.envoyproxy.envoy.admin.v3.UpdateFailureState;
+import io.envoyproxy.envoy.config.cluster.v3.Cluster;
+import io.envoyproxy.envoy.config.core.v3.Node;
+import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
+import io.envoyproxy.envoy.config.listener.v3.Listener;
+import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
+import io.envoyproxy.envoy.service.status.v3.ClientConfig;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusDiscoveryServiceGrpc;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest;
+import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
+import io.envoyproxy.envoy.service.status.v3.PerXdsConfig;
+import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+import io.grpc.internal.ObjectPool;
+import io.grpc.internal.testing.StreamRecorder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import io.grpc.xds.AbstractXdsClient.ResourceType;
+import io.grpc.xds.XdsClient.ResourceMetadata;
+import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
+import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link CsdsService}. */
+@RunWith(Enclosed.class)
+public class CsdsServiceTest {
+  private static final String NODE_ID =
+      "projects/42/networks/default/nodes/5c85b298-6f5b-4722-b74a-f7d1f0ccf5ad";
+  private static final EnvoyProtoData.Node BOOTSTRAP_NODE =
+      EnvoyProtoData.Node.newBuilder().setId(NODE_ID).build();
+  private static final XdsClient XDS_CLIENT_NO_RESOURCES = new XdsClient() {
+    @Override
+    EnvoyProtoData.Node getNode() {
+      return BOOTSTRAP_NODE;
+    }
+
+    @Override
+    String getCurrentVersion(ResourceType type) {
+      return "getCurrentVersion." + type.name();
+    }
+
+    @Override
+    Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
+      return ImmutableMap.of();
+    }
+  };
+
+  @RunWith(JUnit4.class)
+  public static class ServiceTests {
+    private static final CsdsService CSDS_SERVICE_MINIMAL =
+        new CsdsService(new FakeXdsClientPoolFactory(XDS_CLIENT_NO_RESOURCES));
+    private static final ClientStatusRequest REQUEST = ClientStatusRequest.getDefaultInstance();
+    private static final ClientStatusRequest REQUEST_INVALID =
+        ClientStatusRequest.newBuilder().addNodeMatchers(NodeMatcher.getDefaultInstance()).build();
+
+    @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+    private ClientStatusDiscoveryServiceGrpc.ClientStatusDiscoveryServiceBlockingStub csdsStub;
+    private ClientStatusDiscoveryServiceGrpc.ClientStatusDiscoveryServiceStub csdsAsyncStub;
+
+    @Before
+    public void setUp() {
+      csdsStub = ClientStatusDiscoveryServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
+      csdsAsyncStub = ClientStatusDiscoveryServiceGrpc.newStub(grpcServerRule.getChannel());
+    }
+
+    /** Until XdsClient successfully initialized, expect response to be empty. */
+    @Test
+    public void fetchClientConfig_xdsClientPoolNotInitialized() {
+      grpcServerRule.getServiceRegistry().addService(CsdsService.newInstance());
+      ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST);
+      assertThat(response).isEqualTo(ClientStatusResponse.getDefaultInstance());
+    }
+
+    /** Status.INVALID_ARGUMENT on unexpected request fields. */
+    @Test
+    public void fetchClientConfig_invalidArgument() {
+      grpcServerRule.getServiceRegistry().addService(CSDS_SERVICE_MINIMAL);
+      try {
+        ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST_INVALID);
+        fail("Should've failed, got response: " + response);
+      } catch (StatusRuntimeException e) {
+        verifyRequestInvalidResponseStatus(e.getStatus());
+      }
+    }
+
+    /** Unexpected exceptions translated to internal error status. */
+    @Test
+    public void fetchClientConfig_unexpectedException() {
+      XdsClient throwingXdsClient = new XdsClient() {
+        @Override
+        Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
+          throw new IllegalArgumentException("IllegalArgumentException");
+        }
+      };
+      grpcServerRule.getServiceRegistry()
+          .addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient)));
+
+      try {
+        ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST);
+        fail("Should've failed, got response: " + response);
+      } catch (StatusRuntimeException e) {
+        assertThat(e.getStatus().getCode()).isEqualTo(Code.INTERNAL);
+        assertThat(e.getStatus().getDescription()).isEqualTo("Unexpected internal error");
+      }
+    }
+
+    /** ClientStatusResponse contains valid ClientConfig with the correct shape. */
+    @Test
+    public void fetchClientConfig_happyPath() {
+      grpcServerRule.getServiceRegistry().addService(CSDS_SERVICE_MINIMAL);
+      verifyResponse(csdsStub.fetchClientStatus(REQUEST));
+    }
+
+    @Test
+    public void streamClientStatus_happyPath() {
+      CsdsService csdsService =
+          new CsdsService(new FakeXdsClientPoolFactory(XDS_CLIENT_NO_RESOURCES) {
+            boolean calledOnce;
+
+            @Override
+            @Nullable
+            public ObjectPool<XdsClient> get() {
+              // xDS client not ready on the first call, then becomes ready.
+              if (!calledOnce) {
+                calledOnce = true;
+                return null;
+              } else {
+                return super.get();
+              }
+            }
+          });
+
+      grpcServerRule.getServiceRegistry().addService(csdsService);
+
+      StreamRecorder<ClientStatusResponse> responseObserver = StreamRecorder.create();
+      StreamObserver<ClientStatusRequest> requestObserver =
+          csdsAsyncStub.streamClientStatus(responseObserver);
+
+      requestObserver.onNext(REQUEST);
+      requestObserver.onNext(REQUEST);
+      requestObserver.onNext(REQUEST);
+      requestObserver.onCompleted();
+
+      List<ClientStatusResponse> responses = responseObserver.getValues();
+      assertThat(responses.size()).isEqualTo(3);
+      // Empty response on XdsClient not ready.
+      assertThat(responses.get(0)).isEqualTo(ClientStatusResponse.getDefaultInstance());
+      // The following calls return ClientConfig's successfully.
+      verifyResponse(responses.get(1));
+      verifyResponse(responses.get(2));
+    }
+
+    @Test
+    public void streamClientStatus_requestInvalid() {
+      grpcServerRule.getServiceRegistry().addService(CSDS_SERVICE_MINIMAL);
+
+      StreamRecorder<ClientStatusResponse> responseObserver = StreamRecorder.create();
+      StreamObserver<ClientStatusRequest> requestObserver =
+          csdsAsyncStub.streamClientStatus(responseObserver);
+
+      requestObserver.onNext(REQUEST);
+      requestObserver.onNext(REQUEST_INVALID);
+      requestObserver.onNext(REQUEST);
+      requestObserver.onCompleted();
+
+      List<ClientStatusResponse> responses = responseObserver.getValues();
+      assertThat(responses.size()).isEqualTo(1);
+      verifyResponse(responses.get(0));
+      assertThat(responseObserver.getError()).isNotNull();
+      verifyRequestInvalidResponseStatus(Status.fromThrowable(responseObserver.getError()));
+    }
+
+    @Test
+    public void streamClientStatus_onClientError() {
+      grpcServerRule.getServiceRegistry().addService(CSDS_SERVICE_MINIMAL);
+
+      StreamRecorder<ClientStatusResponse> responseObserver = StreamRecorder.create();
+      StreamObserver<ClientStatusRequest> requestObserver =
+          csdsAsyncStub.streamClientStatus(responseObserver);
+
+      requestObserver.onNext(REQUEST);
+      requestObserver.onError(new StatusRuntimeException(Status.DATA_LOSS));
+
+      List<ClientStatusResponse> responses = responseObserver.getValues();
+      assertThat(responses.size()).isEqualTo(1);
+      verifyResponse(responses.get(0));
+      // Server quietly closes its side.
+      assertThat(responseObserver.getError()).isNull();
+    }
+
+    private void verifyResponse(ClientStatusResponse response) {
+      assertThat(response.getConfigCount()).isEqualTo(1);
+      ClientConfig clientConfig = response.getConfig(0);
+      verifyClientConfigNode(clientConfig);
+      verifyClientConfigNoResources(clientConfig);
+    }
+
+    private void verifyRequestInvalidResponseStatus(Status status) {
+      assertThat(status.getCode()).isEqualTo(Code.INVALID_ARGUMENT);
+      assertThat(status.getDescription()).isEqualTo("node_matchers not supported");
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class MetadataToProtoTests {
+    private static final String LDS_RESOURCE = "listener.googleapis.com";
+    private static final String RDS_RESOURCE = "route-configuration.googleapis.com";
+    private static final String CDS_RESOURCE = "cluster.googleapis.com";
+    private static final String EDS_RESOURCE = "cluster-load-assignment.googleapis.com";
+    private static final String VERSION_ACK = "42";
+    private static final String VERSION_NACK = "43";
+    private static final String ERROR = "Parse error line 1\n Parse error line 2";
+
+    // Test timestamps.
+    private static final Timestamp TIMESTAMP_ZERO = Timestamp.getDefaultInstance();
+    private static final long NANOS_LAST_UPDATE = 1577923199_606042047L;
+    private static final Timestamp TIMESTAMP_LAST_UPDATE = Timestamp.newBuilder()
+        .setSeconds(1577923199L)  // 2020-01-01T23:59:59Z
+        .setNanos(606042047)
+        .build();
+    private static final long NANOS_FAILED_UPDATE = 1609545599_732105843L;
+    private static final Timestamp TIMESTAMP_FAILED_UPDATE = Timestamp.newBuilder()
+        .setSeconds(1609545599L)  // 2021-01-01T23:59:59Z
+        .setNanos(732105843)
+        .build();
+
+    // Raw resources.
+    private static final Any RAW_LISTENER =
+        Any.pack(Listener.newBuilder().setName(LDS_RESOURCE).build());
+    private static final Any RAW_ROUTE_CONFIGURATION =
+        Any.pack(RouteConfiguration.newBuilder().setName(RDS_RESOURCE).build());
+    private static final Any RAW_CLUSTER =
+        Any.pack(Cluster.newBuilder().setName(CDS_RESOURCE).build());
+    private static final Any RAW_CLUSTER_LOAD_ASSIGNMENT =
+        Any.pack(ClusterLoadAssignment.newBuilder().setClusterName(EDS_RESOURCE).build());
+
+    // Test metadata: no data received states.
+    private static final ResourceMetadata METADATA_UNKNOWN =
+        ResourceMetadata.newResourceMetadataUnknown();
+    private static final ResourceMetadata METADATA_DOES_NOT_EXIST =
+        ResourceMetadata.newResourceMetadataDoesNotExist();
+    private static final ResourceMetadata METADATA_REQUESTED =
+        ResourceMetadata.newResourceMetadataRequested();
+
+    // Test metadata: resource acknowledged state, per resource type.
+    private static final ResourceMetadata METADATA_ACKED_LDS = ResourceMetadata
+        .newResourceMetadataAcked(RAW_LISTENER, VERSION_ACK, NANOS_LAST_UPDATE);
+    private static final ResourceMetadata METADATA_ACKED_RDS = ResourceMetadata
+        .newResourceMetadataAcked(RAW_ROUTE_CONFIGURATION, VERSION_ACK, NANOS_LAST_UPDATE);
+    private static final ResourceMetadata METADATA_ACKED_CDS = ResourceMetadata
+        .newResourceMetadataAcked(RAW_CLUSTER, VERSION_ACK, NANOS_LAST_UPDATE);
+    private static final ResourceMetadata METADATA_ACKED_EDS = ResourceMetadata
+        .newResourceMetadataAcked(RAW_CLUSTER_LOAD_ASSIGNMENT, VERSION_ACK, NANOS_LAST_UPDATE);
+
+    // Test resources list.
+    private static final ImmutableMap<String, ResourceMetadata> RESOURCES_METADATA =
+        ImmutableMap.of("A", METADATA_UNKNOWN, "B", METADATA_REQUESTED);
+
+    /* LDS tests */
+
+    @Test
+    public void dumpLdsConfig() {
+      ListenersConfigDump ldsConfig = CsdsService.dumpLdsConfig(RESOURCES_METADATA, VERSION_ACK);
+      assertThat(ldsConfig.getVersionInfo()).isEqualTo(VERSION_ACK);
+      assertThat(ldsConfig.getStaticListenersCount()).isEqualTo(0);
+      assertThat(ldsConfig.getDynamicListenersCount()).isEqualTo(2);
+      // Minimal check to confirm that resources generated from corresponding metadata.
+      DynamicListener listenerA = ldsConfig.getDynamicListeners(0);
+      assertThat(listenerA.getName()).isEqualTo("A");
+      assertThat(listenerA.getClientStatus()).isEqualTo(ClientResourceStatus.UNKNOWN);
+      DynamicListener listenerB = ldsConfig.getDynamicListeners(1);
+      assertThat(listenerB.getName()).isEqualTo("B");
+      assertThat(listenerB.getClientStatus()).isEqualTo(ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicListener_metadataUnknown() {
+      DynamicListener dynamicListener =
+          CsdsService.buildDynamicListener(LDS_RESOURCE, METADATA_UNKNOWN);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.UNKNOWN);
+      verifyDynamicListenerStateNoData(dynamicListener.getActiveState());
+    }
+
+    @Test
+    public void buildDynamicListener_metadataDoesNotExist() {
+      DynamicListener dynamicListener =
+          CsdsService.buildDynamicListener(LDS_RESOURCE, METADATA_DOES_NOT_EXIST);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.DOES_NOT_EXIST);
+      verifyDynamicListenerStateNoData(dynamicListener.getActiveState());
+    }
+
+    @Test
+    public void buildDynamicListener_metadataRequested() {
+      DynamicListener dynamicListener =
+          CsdsService.buildDynamicListener(LDS_RESOURCE, METADATA_REQUESTED);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.REQUESTED);
+      verifyDynamicListenerStateNoData(dynamicListener.getActiveState());
+    }
+
+    @Test
+    public void buildDynamicListener_metadataAcked() {
+      DynamicListener dynamicListener =
+          CsdsService.buildDynamicListener(LDS_RESOURCE, METADATA_ACKED_LDS);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.ACKED);
+      verifyDynamicListenerStateAccepted(dynamicListener.getActiveState());
+    }
+
+    @Test
+    public void buildDynamicListener_metadataNackedFromRequested() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_REQUESTED, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicListener dynamicListener = CsdsService.buildDynamicListener(LDS_RESOURCE, metadata);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicListener.getErrorState());
+      verifyDynamicListenerStateNoData(dynamicListener.getActiveState());
+    }
+
+    @Test
+    public void buildDynamicListener_metadataNackedFromAcked() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_ACKED_LDS, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicListener dynamicListener = CsdsService.buildDynamicListener(LDS_RESOURCE, metadata);
+      verifyDynamicListener(dynamicListener, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicListener.getErrorState());
+      verifyDynamicListenerStateAccepted(dynamicListener.getActiveState());
+    }
+
+    private void verifyDynamicListener(
+        DynamicListener dynamicListener, ClientResourceStatus status) {
+      assertWithMessage("name").that(dynamicListener.getName()).isEqualTo(LDS_RESOURCE);
+      assertWithMessage("active_state").that(dynamicListener.hasActiveState()).isTrue();
+      assertWithMessage("warming_state").that(dynamicListener.hasWarmingState()).isFalse();
+      assertWithMessage("draining_state").that(dynamicListener.hasDrainingState()).isFalse();
+      assertWithMessage("error_state").that(dynamicListener.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicListener.getClientStatus()).isEqualTo(status);
+    }
+
+    private void verifyDynamicListenerStateNoData(DynamicListenerState dynamicListenerState) {
+      assertWithMessage("version_info").that(dynamicListenerState.getVersionInfo()).isEmpty();
+      assertWithMessage("listener").that(dynamicListenerState.hasListener()).isFalse();
+      assertWithMessage("last_updated").that(dynamicListenerState.getLastUpdated())
+          .isEqualTo(TIMESTAMP_ZERO);
+    }
+
+    private void verifyDynamicListenerStateAccepted(DynamicListenerState dynamicListenerState) {
+      assertWithMessage("version_info").that(dynamicListenerState.getVersionInfo())
+          .isEqualTo(VERSION_ACK);
+      assertWithMessage("listener").that(dynamicListenerState.hasListener()).isTrue();
+      assertWithMessage("listener").that(dynamicListenerState.getListener())
+          .isEqualTo(RAW_LISTENER);
+      assertWithMessage("last_updated").that(dynamicListenerState.getLastUpdated())
+          .isEqualTo(TIMESTAMP_LAST_UPDATE);
+    }
+
+    /* RDS tests */
+
+    @Test
+    public void dumpRdsConfig() {
+      RoutesConfigDump rdsConfig = CsdsService.dumpRdsConfig(RESOURCES_METADATA);
+      assertThat(rdsConfig.getStaticRouteConfigsCount()).isEqualTo(0);
+      assertThat(rdsConfig.getDynamicRouteConfigsCount()).isEqualTo(2);
+      // Minimal check to confirm that resources generated from corresponding metadata.
+      assertThat(rdsConfig.getDynamicRouteConfigs(0).getClientStatus())
+          .isEqualTo(ClientResourceStatus.UNKNOWN);
+      assertThat(rdsConfig.getDynamicRouteConfigs(1).getClientStatus())
+          .isEqualTo(ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataUnknown() {
+      verifyDynamicRouteConfigNoData(
+          CsdsService.buildDynamicRouteConfig(METADATA_UNKNOWN),
+          ClientResourceStatus.UNKNOWN);
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataDoesNotExist() {
+      verifyDynamicRouteConfigNoData(
+          CsdsService.buildDynamicRouteConfig(METADATA_DOES_NOT_EXIST),
+          ClientResourceStatus.DOES_NOT_EXIST);
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataRequested() {
+      verifyDynamicRouteConfigNoData(
+          CsdsService.buildDynamicRouteConfig(METADATA_REQUESTED),
+          ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataAcked() {
+      verifyDynamicRouteConfigAccepted(
+          CsdsService.buildDynamicRouteConfig(METADATA_ACKED_RDS),
+          ClientResourceStatus.ACKED);
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataNackedFromRequested() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_REQUESTED, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicRouteConfig dynamicRouteConfig = CsdsService.buildDynamicRouteConfig(metadata);
+      verifyDynamicRouteConfigNoData(dynamicRouteConfig, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicRouteConfig.getErrorState());
+    }
+
+    @Test
+    public void buildDynamicRouteConfig_metadataNackedFromAcked() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_ACKED_RDS, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicRouteConfig dynamicRouteConfig = CsdsService.buildDynamicRouteConfig(metadata);
+      verifyDynamicRouteConfigAccepted(dynamicRouteConfig, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicRouteConfig.getErrorState());
+    }
+
+    private void verifyDynamicRouteConfigNoData(
+        DynamicRouteConfig dynamicRouteConfig, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicRouteConfig.getVersionInfo()).isEmpty();
+      assertWithMessage("route_config").that(dynamicRouteConfig.hasRouteConfig()).isFalse();
+      assertWithMessage("last_updated").that(dynamicRouteConfig.getLastUpdated())
+          .isEqualTo(TIMESTAMP_ZERO);
+      assertWithMessage("error_state").that(dynamicRouteConfig.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicRouteConfig.getClientStatus())
+          .isEqualTo(status);
+    }
+
+    private void verifyDynamicRouteConfigAccepted(
+        DynamicRouteConfig dynamicRouteConfig, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicRouteConfig.getVersionInfo())
+          .isEqualTo(VERSION_ACK);
+      assertWithMessage("route_config").that(dynamicRouteConfig.hasRouteConfig()).isTrue();
+      assertWithMessage("route_config").that(dynamicRouteConfig.getRouteConfig())
+          .isEqualTo(RAW_ROUTE_CONFIGURATION);
+      assertWithMessage("last_updated").that(dynamicRouteConfig.getLastUpdated())
+          .isEqualTo(TIMESTAMP_LAST_UPDATE);
+      assertWithMessage("error_state").that(dynamicRouteConfig.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicRouteConfig.getClientStatus())
+          .isEqualTo(status);
+    }
+
+    /* CDS tests */
+
+    @Test
+    public void dumpCdsConfig() {
+      ClustersConfigDump cdsConfig = CsdsService.dumpCdsConfig(RESOURCES_METADATA, VERSION_ACK);
+      assertThat(cdsConfig.getVersionInfo()).isEqualTo(VERSION_ACK);
+      assertThat(cdsConfig.getStaticClustersCount()).isEqualTo(0);
+      assertThat(cdsConfig.getDynamicWarmingClustersCount()).isEqualTo(0);
+      assertThat(cdsConfig.getDynamicActiveClustersCount()).isEqualTo(2);
+      // Minimal check to confirm that resources generated from corresponding metadata.
+      assertThat(cdsConfig.getDynamicActiveClusters(0).getClientStatus())
+          .isEqualTo(ClientResourceStatus.UNKNOWN);
+      assertThat(cdsConfig.getDynamicActiveClusters(1).getClientStatus())
+          .isEqualTo(ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataUnknown() {
+      verifyDynamicClusterNoData(
+          CsdsService.buildDynamicCluster(METADATA_UNKNOWN),
+          ClientResourceStatus.UNKNOWN);
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataDoesNotExist() {
+      verifyDynamicClusterNoData(
+          CsdsService.buildDynamicCluster(METADATA_DOES_NOT_EXIST),
+          ClientResourceStatus.DOES_NOT_EXIST);
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataRequested() {
+      verifyDynamicClusterNoData(
+          CsdsService.buildDynamicCluster(METADATA_REQUESTED),
+          ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataAcked() {
+      verifyDynamicClusterAccepted(
+          CsdsService.buildDynamicCluster(METADATA_ACKED_CDS),
+          ClientResourceStatus.ACKED);
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataNackedFromRequested() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_REQUESTED, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicCluster dynamicCluster = CsdsService.buildDynamicCluster(metadata);
+      verifyDynamicClusterNoData(dynamicCluster, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicCluster.getErrorState());
+    }
+
+    @Test
+    public void buildDynamicCluster_metadataNackedFromAcked() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_ACKED_CDS, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicCluster dynamicCluster = CsdsService.buildDynamicCluster(metadata);
+      verifyDynamicClusterAccepted(dynamicCluster, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicCluster.getErrorState());
+    }
+
+    private void verifyDynamicClusterNoData(
+        DynamicCluster dynamicCluster, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicCluster.getVersionInfo()).isEmpty();
+      assertWithMessage("route_config").that(dynamicCluster.hasCluster()).isFalse();
+      assertWithMessage("last_updated").that(dynamicCluster.getLastUpdated())
+          .isEqualTo(TIMESTAMP_ZERO);
+      assertWithMessage("error_state").that(dynamicCluster.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicCluster.getClientStatus()).isEqualTo(status);
+    }
+
+    private void verifyDynamicClusterAccepted(
+        DynamicCluster dynamicCluster, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicCluster.getVersionInfo())
+          .isEqualTo(VERSION_ACK);
+      assertWithMessage("route_config").that(dynamicCluster.hasCluster()).isTrue();
+      assertWithMessage("route_config").that(dynamicCluster.getCluster()).isEqualTo(RAW_CLUSTER);
+      assertWithMessage("last_updated").that(dynamicCluster.getLastUpdated())
+          .isEqualTo(TIMESTAMP_LAST_UPDATE);
+      assertWithMessage("error_state").that(dynamicCluster.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicCluster.getClientStatus()).isEqualTo(status);
+    }
+
+    /* EDS tests */
+
+    @Test
+    public void dumpEdsConfig() {
+      EndpointsConfigDump edsConfig = CsdsService.dumpEdsConfig(RESOURCES_METADATA);
+      assertThat(edsConfig.getStaticEndpointConfigsCount()).isEqualTo(0);
+      assertThat(edsConfig.getDynamicEndpointConfigsCount()).isEqualTo(2);
+      // Minimal check to confirm that resources generated from corresponding metadata.
+      assertThat(edsConfig.getDynamicEndpointConfigs(0).getClientStatus())
+          .isEqualTo(ClientResourceStatus.UNKNOWN);
+      assertThat(edsConfig.getDynamicEndpointConfigs(1).getClientStatus())
+          .isEqualTo(ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataUnknown() {
+      buildDynamicEndpointConfigNoData(
+          CsdsService.buildDynamicEndpointConfig(METADATA_UNKNOWN),
+          ClientResourceStatus.UNKNOWN);
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataDoesNotExist() {
+      buildDynamicEndpointConfigNoData(
+          CsdsService.buildDynamicEndpointConfig(METADATA_DOES_NOT_EXIST),
+          ClientResourceStatus.DOES_NOT_EXIST);
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataRequested() {
+      buildDynamicEndpointConfigNoData(
+          CsdsService.buildDynamicEndpointConfig(METADATA_REQUESTED),
+          ClientResourceStatus.REQUESTED);
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataAcked() {
+      verifyDynamicEndpointConfigAccepted(
+          CsdsService.buildDynamicEndpointConfig(METADATA_ACKED_EDS),
+          ClientResourceStatus.ACKED);
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataNackedFromRequested() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_REQUESTED, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicEndpointConfig dynamicEndpointConfig =
+          CsdsService.buildDynamicEndpointConfig(metadata);
+      buildDynamicEndpointConfigNoData(dynamicEndpointConfig, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicEndpointConfig.getErrorState());
+    }
+
+    @Test
+    public void buildDynamicEndpointConfig_metadataNackedFromAcked() {
+      ResourceMetadata metadata = ResourceMetadata.newResourceMetadataNacked(
+          METADATA_ACKED_EDS, VERSION_NACK, NANOS_FAILED_UPDATE, ERROR);
+      DynamicEndpointConfig dynamicEndpointConfig =
+          CsdsService.buildDynamicEndpointConfig(metadata);
+      verifyDynamicEndpointConfigAccepted(dynamicEndpointConfig, ClientResourceStatus.NACKED);
+      verifyErrorState(dynamicEndpointConfig.getErrorState());
+    }
+
+    private void buildDynamicEndpointConfigNoData(
+        DynamicEndpointConfig dynamicEndpointConfig, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicEndpointConfig.getVersionInfo()).isEmpty();
+      assertWithMessage("route_config").that(dynamicEndpointConfig.hasEndpointConfig()).isFalse();
+      assertWithMessage("last_updated").that(dynamicEndpointConfig.getLastUpdated())
+          .isEqualTo(TIMESTAMP_ZERO);
+      assertWithMessage("error_state").that(dynamicEndpointConfig.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicEndpointConfig.getClientStatus())
+          .isEqualTo(status);
+    }
+
+    private void verifyDynamicEndpointConfigAccepted(
+        DynamicEndpointConfig dynamicEndpointConfig, ClientResourceStatus status) {
+      assertWithMessage("version_info").that(dynamicEndpointConfig.getVersionInfo())
+          .isEqualTo(VERSION_ACK);
+      assertWithMessage("route_config").that(dynamicEndpointConfig.hasEndpointConfig()).isTrue();
+      assertWithMessage("route_config").that(dynamicEndpointConfig.getEndpointConfig())
+          .isEqualTo(RAW_CLUSTER_LOAD_ASSIGNMENT);
+      assertWithMessage("last_updated").that(dynamicEndpointConfig.getLastUpdated())
+          .isEqualTo(TIMESTAMP_LAST_UPDATE);
+      assertWithMessage("error_state").that(dynamicEndpointConfig.hasErrorState())
+          .isEqualTo(status.equals(ClientResourceStatus.NACKED));
+      assertWithMessage("client_status").that(dynamicEndpointConfig.getClientStatus())
+          .isEqualTo(status);
+    }
+
+    /* Common methods. */
+
+    @Test
+    public void metadataStatusToClientStatus() {
+      assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.UNKNOWN))
+          .isEqualTo(ClientResourceStatus.UNKNOWN);
+      assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.DOES_NOT_EXIST))
+          .isEqualTo(ClientResourceStatus.DOES_NOT_EXIST);
+      assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.REQUESTED))
+          .isEqualTo(ClientResourceStatus.REQUESTED);
+      assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.ACKED))
+          .isEqualTo(ClientResourceStatus.ACKED);
+      assertThat(CsdsService.metadataStatusToClientStatus(ResourceMetadataStatus.NACKED))
+          .isEqualTo(ClientResourceStatus.NACKED);
+    }
+
+    @Test
+    public void getClientConfigForXdsClient_subscribedResourcesToPerXdsConfig() {
+      ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(new XdsClient() {
+        @Override
+        EnvoyProtoData.Node getNode() {
+          return BOOTSTRAP_NODE;
+        }
+
+        @Override
+        String getCurrentVersion(ResourceType type) {
+          return "getCurrentVersion." + type.name();
+        }
+
+        @Override
+        Map<String, ResourceMetadata> getSubscribedResourcesMetadata(ResourceType type) {
+          switch (type) {
+            case LDS:
+              return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_LDS);
+            case RDS:
+              return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_RDS);
+            case CDS:
+              return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_CDS);
+            case EDS:
+              return ImmutableMap.of("subscribedResourceName." + type.name(), METADATA_ACKED_EDS);
+            case UNKNOWN:
+            default:
+              throw new AssertionError("Unexpected resource name");
+          }
+        }
+      });
+
+      verifyClientConfigNode(clientConfig);
+
+      // Minimal verification to confirm that the data/metadata XdsClient provides,
+      // is propagated to the correct resource types.
+      assertThat(clientConfig.getXdsConfigCount()).isEqualTo(4);
+      EnumMap<ResourceType, PerXdsConfig> configDumps = mapConfigDumps(clientConfig);
+      assertThat(configDumps.keySet()).containsExactly(LDS, RDS, CDS, EDS);
+
+      // LDS.
+      // Both the version provided by XdsClient.getCurrentVersion(),
+      // and the resource name provided by XdsClient.getSubscribedResourcesMetadata() are used.
+      PerXdsConfig perXdsConfigLds = configDumps.get(LDS);
+      verifyPerXdsConfigEmptyFields(perXdsConfigLds);
+      ListenersConfigDump listenerConfig = perXdsConfigLds.getListenerConfig();
+      assertThat(listenerConfig.getVersionInfo()).isEqualTo("getCurrentVersion.LDS");
+      assertThat(listenerConfig.getDynamicListenersCount()).isEqualTo(1);
+      DynamicListener dynamicListener = listenerConfig.getDynamicListeners(0);
+      assertThat(dynamicListener.getName()).isEqualTo("subscribedResourceName.LDS");
+      assertThat(dynamicListener.getClientStatus()).isEqualTo(ClientResourceStatus.ACKED);
+      assertThat(dynamicListener.getActiveState().getVersionInfo()).isEqualTo(VERSION_ACK);
+
+      // RDS.
+      // Neither the version provided by XdsClient.getCurrentVersion(),
+      // nor the resource name provided by XdsClient.getSubscribedResourcesMetadata() are used.
+      PerXdsConfig perXdsConfigRds = configDumps.get(RDS);
+      verifyPerXdsConfigEmptyFields(perXdsConfigRds);
+      RoutesConfigDump routeConfig = perXdsConfigRds.getRouteConfig();
+      assertThat(routeConfig.getDynamicRouteConfigsCount()).isEqualTo(1);
+      DynamicRouteConfig dynamicRouteConfig = routeConfig.getDynamicRouteConfigs(0);
+      assertThat(dynamicRouteConfig.getClientStatus()).isEqualTo(ClientResourceStatus.ACKED);
+      assertThat(dynamicRouteConfig.getVersionInfo()).isEqualTo(VERSION_ACK);
+
+      // CDS.
+      // Only the version provided by XdsClient.getCurrentVersion() is used,
+      // the resource name provided by XdsClient.getSubscribedResourcesMetadata() is ignored.
+      PerXdsConfig perXdsConfigCds = configDumps.get(CDS);
+      verifyPerXdsConfigEmptyFields(perXdsConfigRds);
+      ClustersConfigDump clusterConfig = perXdsConfigCds.getClusterConfig();
+      assertThat(clusterConfig.getVersionInfo()).isEqualTo("getCurrentVersion.CDS");
+      assertThat(clusterConfig.getDynamicActiveClustersCount()).isEqualTo(1);
+      DynamicCluster dynamicCluster = clusterConfig.getDynamicActiveClusters(0);
+      assertThat(dynamicCluster.getClientStatus()).isEqualTo(ClientResourceStatus.ACKED);
+      assertThat(dynamicCluster.getVersionInfo()).isEqualTo(VERSION_ACK);
+
+      // RDS.
+      // Neither the version provided by XdsClient.getCurrentVersion(),
+      // nor the resource name provided by XdsClient.getSubscribedResourcesMetadata() are used.
+      PerXdsConfig perXdsConfigEds = configDumps.get(EDS);
+      verifyPerXdsConfigEmptyFields(perXdsConfigEds);
+      EndpointsConfigDump endpointConfig = perXdsConfigEds.getEndpointConfig();
+      assertThat(endpointConfig.getDynamicEndpointConfigsCount()).isEqualTo(1);
+      DynamicEndpointConfig dynamicEndpointConfig = endpointConfig.getDynamicEndpointConfigs(0);
+      assertThat(dynamicEndpointConfig.getClientStatus()).isEqualTo(ClientResourceStatus.ACKED);
+      assertThat(dynamicEndpointConfig.getVersionInfo()).isEqualTo(VERSION_ACK);
+    }
+
+    @Test
+    public void getClientConfigForXdsClient_noSubscribedResources() {
+      ClientConfig clientConfig = CsdsService.getClientConfigForXdsClient(XDS_CLIENT_NO_RESOURCES);
+      verifyClientConfigNode(clientConfig);
+      verifyClientConfigNoResources(clientConfig);
+    }
+
+    private void verifyErrorState(UpdateFailureState errorState) {
+      // failed_configuration currently not supported.
+      assertWithMessage("failed_configuration").that(errorState.hasFailedConfiguration()).isFalse();
+      assertWithMessage("last_update_attempt").that(errorState.getLastUpdateAttempt())
+          .isEqualTo(TIMESTAMP_FAILED_UPDATE);
+      assertWithMessage("details").that(errorState.getDetails()).isEqualTo(ERROR);
+      assertWithMessage("version_info").that(errorState.getVersionInfo()).isEqualTo(VERSION_NACK);
+    }
+  }
+
+  /**
+   * Assuming {@link MetadataToProtoTests} passes, and metadata converted to corresponding
+   * config dumps correctly, perform a minimal verification of the general shape of ClientConfig.
+   */
+  private static void verifyClientConfigNoResources(ClientConfig clientConfig) {
+    // Expect PerXdsConfig for all resource types to be present, but empty.
+    assertThat(clientConfig.getXdsConfigCount()).isEqualTo(4);
+    EnumMap<ResourceType, PerXdsConfig> configDumps = mapConfigDumps(clientConfig);
+    assertThat(configDumps.keySet()).containsExactly(LDS, RDS, CDS, EDS);
+
+    ListenersConfigDump listenerConfig = configDumps.get(LDS).getListenerConfig();
+    assertThat(listenerConfig.getVersionInfo()).isEqualTo("getCurrentVersion.LDS");
+    assertThat(listenerConfig.getDynamicListenersCount()).isEqualTo(0);
+
+    RoutesConfigDump routeConfig = configDumps.get(RDS).getRouteConfig();
+    assertThat(routeConfig.getDynamicRouteConfigsCount()).isEqualTo(0);
+
+    ClustersConfigDump clusterConfig = configDumps.get(CDS).getClusterConfig();
+    assertThat(clusterConfig.getVersionInfo()).isEqualTo("getCurrentVersion.CDS");
+    assertThat(clusterConfig.getDynamicActiveClustersCount()).isEqualTo(0);
+
+    EndpointsConfigDump endpointConfig = configDumps.get(EDS).getEndpointConfig();
+    assertThat(endpointConfig.getDynamicEndpointConfigsCount()).isEqualTo(0);
+  }
+
+  /**
+   * Assuming {@link io.grpc.xds.EnvoyProtoDataTest#convertNode} passes, perform a minimal check,
+   * just verify the node itself is the one we expect.
+   */
+  private static void verifyClientConfigNode(ClientConfig clientConfig) {
+    Node node = clientConfig.getNode();
+    assertThat(node.getId()).isEqualTo(NODE_ID);
+    assertThat(node).isEqualTo(BOOTSTRAP_NODE.toEnvoyProtoNode());
+  }
+
+  /** Verify PerXdsConfig fields that are expected to be omitted. */
+  private static void verifyPerXdsConfigEmptyFields(PerXdsConfig perXdsConfig) {
+    assertThat(perXdsConfig.getStatusValue()).isEqualTo(0);
+    @SuppressWarnings("deprecation")
+    int clientStatusValue = perXdsConfig.getClientStatusValue();
+    assertThat(clientStatusValue).isEqualTo(0);
+  }
+
+  private static EnumMap<ResourceType, PerXdsConfig> mapConfigDumps(ClientConfig config) {
+    EnumMap<ResourceType, PerXdsConfig> xdsConfigMap = new EnumMap<>(ResourceType.class);
+    for (PerXdsConfig perXdsConfig : config.getXdsConfigList()) {
+      ResourceType type = perXdsConfigToResourceType(perXdsConfig);
+      assertThat(type).isNotEqualTo(ResourceType.UNKNOWN);
+      assertThat(xdsConfigMap).doesNotContainKey(type);
+      xdsConfigMap.put(type, perXdsConfig);
+    }
+    return xdsConfigMap;
+  }
+
+  private static ResourceType perXdsConfigToResourceType(PerXdsConfig perXdsConfig) {
+    switch (perXdsConfig.getPerXdsConfigCase()) {
+      case LISTENER_CONFIG:
+        return LDS;
+      case CLUSTER_CONFIG:
+        return CDS;
+      case ROUTE_CONFIG:
+        return RDS;
+      case ENDPOINT_CONFIG:
+        return EDS;
+      default:
+        return ResourceType.UNKNOWN;
+    }
+  }
+
+  private static class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
+    @Nullable private final XdsClient xdsClient;
+
+    private FakeXdsClientPoolFactory(@Nullable XdsClient xdsClient) {
+      this.xdsClient = xdsClient;
+    }
+
+    @Override
+    @Nullable
+    public ObjectPool<XdsClient> get() {
+      return new ObjectPool<XdsClient>() {
+        @Override
+        public XdsClient getObject() {
+          return xdsClient;
+        }
+
+        @Override
+        public XdsClient returnObject(Object object) {
+          return null;
+        }
+      };
+    }
+
+    @Override
+    public void setBootstrapOverride(Map<String, ?> bootstrap) {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+  }
+}
diff --git a/xds/src/test/java/io/grpc/xds/GoogleCloudToProdNameResolverTest.java b/xds/src/test/java/io/grpc/xds/GoogleCloudToProdNameResolverTest.java
index 6692f43..9df8953 100644
--- a/xds/src/test/java/io/grpc/xds/GoogleCloudToProdNameResolverTest.java
+++ b/xds/src/test/java/io/grpc/xds/GoogleCloudToProdNameResolverTest.java
@@ -49,6 +49,7 @@
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -250,7 +251,13 @@
     }
 
     @Override
-    public ObjectPool<XdsClient> getXdsClientPool() {
+    @Nullable
+    public ObjectPool<XdsClient> get() {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public ObjectPool<XdsClient> getOrCreate() {
       throw new UnsupportedOperationException("Should not be called");
     }
   }
diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
index 53ad28f..1bab405 100644
--- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
+++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java
@@ -62,7 +62,8 @@
     SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
     thrown.expect(XdsInitializationException.class);
     thrown.expectMessage("No xDS server provided");
-    provider.getXdsClientPool();
+    provider.getOrCreate();
+    assertThat(provider.get()).isNull();
   }
 
   @Test
@@ -73,9 +74,12 @@
     when(bootstrapper.bootstrap()).thenReturn(bootstrapInfo);
 
     SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper);
-    ObjectPool<XdsClient> xdsClientPool = provider.getXdsClientPool();
+    assertThat(provider.get()).isNull();
+    ObjectPool<XdsClient> xdsClientPool = provider.getOrCreate();
     verify(bootstrapper).bootstrap();
-    assertThat(provider.getXdsClientPool()).isSameInstanceAs(xdsClientPool);
+    assertThat(provider.getOrCreate()).isSameInstanceAs(xdsClientPool);
+    assertThat(provider.get()).isNotNull();
+    assertThat(provider.get()).isSameInstanceAs(xdsClientPool);
     verifyNoMoreInteractions(bootstrapper);
   }
 
diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
index 7aca0a1..46ed3da 100644
--- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
@@ -171,7 +171,13 @@
       }
 
       @Override
-      public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
+      @Nullable
+      public ObjectPool<XdsClient> get() {
+        throw new UnsupportedOperationException("Should not be called");
+      }
+
+      @Override
+      public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
         throw new XdsInitializationException("Fail to read bootstrap file");
       }
     };
@@ -1432,7 +1438,13 @@
     }
 
     @Override
-    public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
+    @Nullable
+    public ObjectPool<XdsClient> get() {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
       return new ObjectPool<XdsClient>() {
         @Override
         public XdsClient getObject() {