xds: implement ignore_resource_deletion server feature (#9339)

As defined in the gRFC [A53: Option for Ignoring xDS Resource Deletion](https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md).

This includes semi-related changes:
* Refactor ClientXdsClientTestBase: extract verify methods for golden resources
* Parameterize ClientXdsClientV2Test and ClientXdsClientV3Test with ignoreResourceDeletion enabled and disabled
* Add FORCE_INFO and FORCE_WARNING levels to XdsLogLevel
diff --git a/xds/src/main/java/io/grpc/xds/Bootstrapper.java b/xds/src/main/java/io/grpc/xds/Bootstrapper.java
index 57cdc6f..9d0a65a 100644
--- a/xds/src/main/java/io/grpc/xds/Bootstrapper.java
+++ b/xds/src/main/java/io/grpc/xds/Bootstrapper.java
@@ -62,10 +62,21 @@
 
     abstract boolean useProtocolV3();
 
+    abstract boolean ignoreResourceDeletion();
+
     @VisibleForTesting
     static ServerInfo create(
         String target, ChannelCredentials channelCredentials, boolean useProtocolV3) {
-      return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3);
+      return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
+          false);
+    }
+
+    @VisibleForTesting
+    static ServerInfo create(
+        String target, ChannelCredentials channelCredentials, boolean useProtocolV3,
+        boolean ignoreResourceDeletion) {
+      return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
+          ignoreResourceDeletion);
     }
   }
 
diff --git a/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
index e858d9e..9082df3 100644
--- a/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
+++ b/xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
@@ -54,15 +54,22 @@
   private static final String BOOTSTRAP_CONFIG_SYS_PROPERTY = "io.grpc.xds.bootstrapConfig";
   @VisibleForTesting
   static String bootstrapConfigFromSysProp = System.getProperty(BOOTSTRAP_CONFIG_SYS_PROPERTY);
-  private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
+
+  // Feature-gating environment variables.
+  static boolean enableFederation =
+      !Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
+          && Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
+
+  // Client features.
   @VisibleForTesting
   static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
       "envoy.lb.does_not_support_overprovisioning";
   @VisibleForTesting
   static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
-  static boolean enableFederation =
-      !Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
-          && Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
+
+  // Server features.
+  private static final String SERVER_FEATURE_XDS_V3 = "xds_v3";
+  private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
 
   private final XdsLogger logger;
   private FileReader reader = LocalFileReader.INSTANCE;
@@ -275,12 +282,15 @@
       }
 
       boolean useProtocolV3 = false;
+      boolean ignoreResourceDeletion = false;
       List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
       if (serverFeatures != null) {
         logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
-        useProtocolV3 = serverFeatures.contains(XDS_V3_SERVER_FEATURE);
+        useProtocolV3 = serverFeatures.contains(SERVER_FEATURE_XDS_V3);
+        ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
       }
-      servers.add(ServerInfo.create(serverUri, channelCredentials, useProtocolV3));
+      servers.add(
+          ServerInfo.create(serverUri, channelCredentials, useProtocolV3, ignoreResourceDeletion));
     }
     return servers.build();
   }
diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
index f7f0d9e..c448a82 100644
--- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java
@@ -2156,8 +2156,7 @@
         ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
         subscriber.removeWatcher(watcher);
         if (!subscriber.isWatched()) {
-          subscriber.stopTimer();
-          logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
+          subscriber.cancelResourceWatch();
           ldsResourceSubscribers.remove(resourceName);
           if (subscriber.xdsChannel != null) {
             subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
@@ -2194,8 +2193,7 @@
         ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
         subscriber.removeWatcher(watcher);
         if (!subscriber.isWatched()) {
-          subscriber.stopTimer();
-          logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
+          subscriber.cancelResourceWatch();
           rdsResourceSubscribers.remove(resourceName);
           if (subscriber.xdsChannel != null) {
             subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
@@ -2232,8 +2230,7 @@
         ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
         subscriber.removeWatcher(watcher);
         if (!subscriber.isWatched()) {
-          subscriber.stopTimer();
-          logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
+          subscriber.cancelResourceWatch();
           cdsResourceSubscribers.remove(resourceName);
           if (subscriber.xdsChannel != null) {
             subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
@@ -2270,8 +2267,7 @@
         ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
         subscriber.removeWatcher(watcher);
         if (!subscriber.isWatched()) {
-          subscriber.stopTimer();
-          logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
+          subscriber.cancelResourceWatch();
           edsResourceSubscribers.remove(resourceName);
           if (subscriber.xdsChannel != null) {
             subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
@@ -2320,7 +2316,7 @@
   Bootstrapper.BootstrapInfo getBootstrapInfo() {
     return bootstrapInfo;
   }
-  
+
   @Override
   public String toString() {
     return logId.toString();
@@ -2370,29 +2366,18 @@
       } else if (type == ResourceType.LDS || type == ResourceType.CDS) {
         if (subscriber.data != null && invalidResources.contains(resourceName)) {
           // Update is rejected but keep using the cached data.
-          if (type == ResourceType.LDS) {
-            LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
-            io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
-            if (hcm != null) {
-              String rdsName = hcm.rdsName();
-              if (rdsName != null) {
-                retainedResources.add(rdsName);
-              }
-            }
-          } else {
-            CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
-            String edsName = cdsUpdate.edsServiceName();
-            if (edsName == null) {
-              edsName = cdsUpdate.clusterName();
-            }
-            retainedResources.add(edsName);
-          }
+          retainDependentResource(subscriber, retainedResources);
         } else if (invalidResources.contains(resourceName)) {
           subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
         } else {
           // For State of the World services, notify watchers when their watched resource is missing
           // from the ADS update.
           subscriber.onAbsent();
+          // Retain any dependent resources if the resource deletion is ignored
+          // per bootstrap ignore_resource_deletion server feature.
+          if (!subscriber.absent) {
+            retainDependentResource(subscriber, retainedResources);
+          }
         }
       }
     }
@@ -2409,6 +2394,28 @@
     }
   }
 
+  private void retainDependentResource(
+      ResourceSubscriber subscriber, Set<String> retainedResources) {
+    if (subscriber.data == null) {
+      return;
+    }
+    String resourceName = null;
+    if (subscriber.type == ResourceType.LDS) {
+      LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
+      io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
+      if (hcm != null) {
+        resourceName = hcm.rdsName();
+      }
+    } else if (subscriber.type == ResourceType.CDS) {
+      CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
+      resourceName = cdsUpdate.edsServiceName();
+    }
+
+    if (resourceName != null) {
+      retainedResources.add(resourceName);
+    }
+  }
+
   private static final class ParsedResource {
     private final ResourceUpdate resourceUpdate;
     private final Any rawResource;
@@ -2431,15 +2438,18 @@
    * Tracks a single subscribed resource.
    */
   private final class ResourceSubscriber {
-    private final ServerInfo serverInfo;
+    @Nullable private final ServerInfo serverInfo;
     @Nullable private final AbstractXdsClient xdsChannel;
     private final ResourceType type;
     private final String resource;
     private final Set<ResourceWatcher> watchers = new HashSet<>();
-    private ResourceUpdate data;
+    @Nullable private ResourceUpdate data;
     private boolean absent;
-    private ScheduledHandle respTimer;
-    private ResourceMetadata metadata;
+    // Tracks whether the deletion has been ignored per bootstrap server feature.
+    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
+    private boolean resourceDeletionIgnored;
+    @Nullable private ScheduledHandle respTimer;
+    @Nullable private ResourceMetadata metadata;
     @Nullable private String errorDescription;
 
     ResourceSubscriber(ResourceType type, String resource) {
@@ -2533,6 +2543,21 @@
       }
     }
 
+    void cancelResourceWatch() {
+      if (isWatched()) {
+        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
+      }
+      stopTimer();
+      String message = "Unsubscribing {0} resource {1} from server {2}";
+      XdsLogLevel logLevel = XdsLogLevel.INFO;
+      if (resourceDeletionIgnored) {
+        message += " for which we previously ignored a deletion";
+        logLevel = XdsLogLevel.FORCE_INFO;
+      }
+      logger.log(logLevel, message, type, resource,
+          serverInfo != null ? serverInfo.target() : "unknown");
+    }
+
     boolean isWatched() {
       return !watchers.isEmpty();
     }
@@ -2547,6 +2572,12 @@
       ResourceUpdate oldData = this.data;
       this.data = parsedResource.getResourceUpdate();
       absent = false;
+      if (resourceDeletionIgnored) {
+        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+                + "of resource for which we previously ignored a deletion: type {1} name {2}",
+            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
+        resourceDeletionIgnored = false;
+      }
       if (!Objects.equals(oldData, data)) {
         for (ResourceWatcher watcher : watchers) {
           notifyWatcher(watcher, data);
@@ -2558,6 +2589,22 @@
       if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
         return;
       }
+
+      // Ignore deletion of State of the World resources when this feature is on,
+      // and the resource is reusable.
+      boolean ignoreResourceDeletionEnabled =
+          serverInfo != null && serverInfo.ignoreResourceDeletion();
+      boolean isStateOfTheWorld = (type == ResourceType.LDS || type == ResourceType.CDS);
+      if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) {
+        if (!resourceDeletionIgnored) {
+          logger.log(XdsLogLevel.FORCE_WARNING,
+              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
+              serverInfo.target(), type, resource);
+          resourceDeletionIgnored = true;
+        }
+        return;
+      }
+
       logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
       if (!absent) {
         data = null;
diff --git a/xds/src/main/java/io/grpc/xds/XdsLogger.java b/xds/src/main/java/io/grpc/xds/XdsLogger.java
index 6166025..7bcf190 100644
--- a/xds/src/main/java/io/grpc/xds/XdsLogger.java
+++ b/xds/src/main/java/io/grpc/xds/XdsLogger.java
@@ -81,6 +81,10 @@
         return Level.FINE;
       case INFO:
         return Level.FINER;
+      case FORCE_INFO:
+        return Level.INFO;
+      case FORCE_WARNING:
+        return Level.WARNING;
       default:
         return Level.FINEST;
     }
@@ -89,6 +93,11 @@
   /**
    * Log levels. See the table below for the mapping from the XdsLogger levels to
    * Java logger levels.
+   *
+   * <p><b>NOTE:</b>
+   *   Please use {@code FORCE_} levels with care, only when the message is expected to be
+   *   surfaced to the library user. Normally libraries should minimize the usage
+   *   of highly visible logs.
    * <pre>
    * +---------------------+-------------------+
    * | XdsLogger Level     | Java Logger Level |
@@ -97,6 +106,8 @@
    * | INFO                | FINER             |
    * | WARNING             | FINE              |
    * | ERROR               | FINE              |
+   * | FORCE_INFO          | INFO              |
+   * | FORCE_WARNING       | WARNING           |
    * +---------------------+-------------------+
    * </pre>
    */
@@ -104,6 +115,8 @@
     DEBUG,
     INFO,
     WARNING,
-    ERROR
+    ERROR,
+    FORCE_INFO,
+    FORCE_WARNING,
   }
 }
diff --git a/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java b/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
index f3a1af3..d6ecda1 100644
--- a/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
+++ b/xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
@@ -578,6 +578,8 @@
     ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
     assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
     assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
+    assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
+    // xds v2: xds v3 disabled
     assertThat(serverInfo.useProtocolV3()).isFalse();
   }
 
@@ -600,10 +602,60 @@
     ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
     assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
     assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
+    assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
+    // xds_v3 enabled
     assertThat(serverInfo.useProtocolV3()).isTrue();
   }
 
   @Test
+  public void serverFeatureIgnoreResourceDeletion() throws XdsInitializationException {
+    String rawData = "{\n"
+        + "  \"xds_servers\": [\n"
+        + "    {\n"
+        + "      \"server_uri\": \"" + SERVER_URI + "\",\n"
+        + "      \"channel_creds\": [\n"
+        + "        {\"type\": \"insecure\"}\n"
+        + "      ],\n"
+        + "      \"server_features\": [\"ignore_resource_deletion\"]\n"
+        + "    }\n"
+        + "  ]\n"
+        + "}";
+
+    bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
+    BootstrapInfo info = bootstrapper.bootstrap();
+    ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
+    assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
+    assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
+    // Only ignore_resource_deletion feature enabled: confirm it's on, and xds_v3 is off.
+    assertThat(serverInfo.useProtocolV3()).isFalse();
+    assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
+  }
+
+  @Test
+  public void serverFeatureIgnoreResourceDeletion_xdsV3() throws XdsInitializationException {
+    String rawData = "{\n"
+        + "  \"xds_servers\": [\n"
+        + "    {\n"
+        + "      \"server_uri\": \"" + SERVER_URI + "\",\n"
+        + "      \"channel_creds\": [\n"
+        + "        {\"type\": \"insecure\"}\n"
+        + "      ],\n"
+        + "      \"server_features\": [\"xds_v3\", \"ignore_resource_deletion\"]\n"
+        + "    }\n"
+        + "  ]\n"
+        + "}";
+
+    bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
+    BootstrapInfo info = bootstrapper.bootstrap();
+    ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
+    assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
+    assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
+    // xds_v3 and ignore_resource_deletion features enabled: confirm both are on.
+    assertThat(serverInfo.useProtocolV3()).isTrue();
+    assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
+  }
+
+  @Test
   public void notFound() {
     BootstrapperImpl.bootstrapPathFromEnvVar = null;
     BootstrapperImpl.bootstrapPathFromSysProp = null;
diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
index 1aee33a..53b4027 100644
--- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
+++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
@@ -133,8 +133,9 @@
   private static final Node NODE = Node.newBuilder().setId(NODE_ID).build();
   private static final Any FAILING_ANY = MessageFactory.FAILING_ANY;
   private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create();
-  private final ServerInfo lrsServerInfo =
-      ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3());
+
+  // xDS control plane server info.
+  private ServerInfo xdsServerInfo;
 
   private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER =
       new FakeClock.TaskFilter() {
@@ -316,10 +317,11 @@
       }
     };
 
+    xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3(),
+        ignoreResourceDeletion());
     Bootstrapper.BootstrapInfo bootstrapInfo =
         Bootstrapper.BootstrapInfo.builder()
-            .servers(Arrays.asList(
-                Bootstrapper.ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3())))
+            .servers(Collections.singletonList(xdsServerInfo))
             .node(NODE)
             .authorities(ImmutableMap.of(
                 "authority.xds.com",
@@ -365,6 +367,9 @@
 
   protected abstract boolean useProtocolV3();
 
+  /** Whether ignore_resource_deletion server feature is enabled for the given test. */
+  protected abstract boolean ignoreResourceDeletion();
+
   protected abstract BindableService createAdsService();
 
   protected abstract BindableService createLrsService();
@@ -484,10 +489,73 @@
   }
 
   /**
-   * Helper method to validate {@link XdsClient.EdsUpdate} created for the test CDS resource
-   * {@link ClientXdsClientTestBase#testClusterLoadAssignment}.
+   * Verifies the LDS update against the golden Listener with vhosts {@link #testListenerVhosts}.
    */
-  private void validateTestClusterLoadAssigment(EdsUpdate edsUpdate) {
+  private void verifyGoldenListenerVhosts(LdsUpdate ldsUpdate) {
+    assertThat(ldsUpdate.listener()).isNull();
+    HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
+    assertThat(hcm.rdsName()).isNull();
+    assertThat(hcm.virtualHosts()).hasSize(VHOST_SIZE);
+    verifyGoldenHcm(hcm);
+  }
+
+  /**
+   * Verifies the LDS update against the golden Listener with RDS name {@link #testListenerRds}.
+   */
+  private void verifyGoldenListenerRds(LdsUpdate ldsUpdate) {
+    assertThat(ldsUpdate.listener()).isNull();
+    HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
+    assertThat(hcm.rdsName()).isEqualTo(RDS_RESOURCE);
+    assertThat(hcm.virtualHosts()).isNull();
+    verifyGoldenHcm(hcm);
+  }
+
+  private void verifyGoldenHcm(HttpConnectionManager hcm) {
+    if (useProtocolV3()) {
+      // The last configured filter has to be a terminal filter.
+      assertThat(hcm.httpFilterConfigs()).isNotNull();
+      assertThat(hcm.httpFilterConfigs()).hasSize(1);
+      assertThat(hcm.httpFilterConfigs().get(0).name).isEqualTo("terminal");
+      assertThat(hcm.httpFilterConfigs().get(0).filterConfig).isEqualTo(RouterFilter.ROUTER_CONFIG);
+    } else {
+      assertThat(hcm.httpFilterConfigs()).isNull();
+    }
+  }
+
+  /**
+   * Verifies the RDS update against the golden route config {@link #testRouteConfig}.
+   */
+  private void verifyGoldenRouteConfig(RdsUpdate rdsUpdate) {
+    assertThat(rdsUpdate.virtualHosts).hasSize(VHOST_SIZE);
+    for (VirtualHost vhost : rdsUpdate.virtualHosts) {
+      assertThat(vhost.name()).contains("do not care");
+      assertThat(vhost.domains()).hasSize(1);
+      assertThat(vhost.routes()).hasSize(1);
+    }
+  }
+
+  /**
+   * Verifies the CDS update against the golden Round Robin Cluster {@link #testClusterRoundRobin}.
+   */
+  private void verifyGoldenClusterRoundRobin(CdsUpdate cdsUpdate) {
+    assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
+    assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
+    assertThat(cdsUpdate.edsServiceName()).isNull();
+    LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
+    assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
+    List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
+        JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
+    assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
+    assertThat(cdsUpdate.lrsServerInfo()).isNull();
+    assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
+    assertThat(cdsUpdate.upstreamTlsContext()).isNull();
+  }
+
+  /**
+   * Verifies the EDS update against the golden Cluster with load assignment
+   * {@link #testClusterLoadAssignment}.
+   */
+  private void validateGoldenClusterLoadAssignment(EdsUpdate edsUpdate) {
     assertThat(edsUpdate.clusterName).isEqualTo(EDS_RESOURCE);
     assertThat(edsUpdate.dropPolicies)
         .containsExactly(
@@ -620,7 +688,12 @@
     verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
     verifyResourceMetadataNacked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
         VERSION_2, TIME_INCREMENT * 2, errorsV2);
-    verifyResourceMetadataDoesNotExist(LDS, "C");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(LDS, "C");
+    } else {
+      // When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
+      verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
+    }
     call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
 
     // LDS -> {B, C} version 3
@@ -630,7 +703,12 @@
     call.sendResponse(LDS, resourcesV3.values().asList(), VERSION_3, "0002");
     // {A} -> does not exist
     // {B, C} -> ACK, version 3
-    verifyResourceMetadataDoesNotExist(LDS, "A");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(LDS, "A");
+    } else {
+      // When resource deletion is disabled, {A} stays ACKed in the previous version VERSION_2.
+      verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
+    }
     verifyResourceMetadataAcked(LDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
     verifyResourceMetadataAcked(LDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
     call.verifyRequest(LDS, subscribedResourceNames, VERSION_3, "0002", NODE);
@@ -638,7 +716,7 @@
   }
 
   @Test
-  public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscriptioin() {
+  public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscription() {
     List<String> subscribedResourceNames = ImmutableList.of("A", "B", "C");
     xdsClient.watchLdsResource("A", ldsResourceWatcher);
     xdsClient.watchRdsResource("A.1", rdsResourceWatcher);
@@ -694,14 +772,26 @@
     verifyResourceMetadataNacked(
         LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
         errorsV2);
-    verifyResourceMetadataDoesNotExist(LDS, "C");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(LDS, "C");
+    } else {
+      // When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
+      verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
+    }
     call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
-    // {A.1} -> does not exist
+    // {A.1} -> does not exist, missing from {A}
     // {B.1} -> version 1
-    // {C.1} -> does not exist
+    // {C.1} -> does not exist because {C} does not exist
     verifyResourceMetadataDoesNotExist(RDS, "A.1");
     verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
-    verifyResourceMetadataDoesNotExist(RDS, "C.1");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(RDS, "C.1");
+    } else {
+      // When resource deletion is disabled, {C.1} is not deleted when {C} is deleted.
+      // Verify {C.1} stays in the previous version VERSION_1.
+      verifyResourceMetadataAcked(RDS, "C.1", resourcesV11.get("C.1"), VERSION_1,
+          TIME_INCREMENT * 2);
+    }
   }
 
   @Test
@@ -712,8 +802,7 @@
     call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@@ -727,8 +816,7 @@
     call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000");
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@@ -742,8 +830,7 @@
     // Client sends an ACK LDS request.
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@@ -760,8 +847,7 @@
     LdsResourceWatcher watcher = mock(LdsResourceWatcher.class);
     xdsClient.watchLdsResource(LDS_RESOURCE, watcher);
     verify(watcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     call.verifyNoMoreRequest();
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@@ -790,16 +876,14 @@
     call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
 
     // Updated LDS response.
     call.sendResponse(LDS, testListenerRds, VERSION_2, "0001");
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
     verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_2, TIME_INCREMENT * 2);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
     assertThat(channelForCustomAuthority).isNull();
@@ -821,8 +905,7 @@
     call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
     call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(
         LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
   }
@@ -842,8 +925,7 @@
     call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
     call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(
         LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
   }
@@ -1072,6 +1154,8 @@
 
   @Test
   public void ldsResourceDeleted() {
+    Assume.assumeFalse(ignoreResourceDeletion());
+
     DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
     verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
 
@@ -1079,8 +1163,7 @@
     call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
     call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
 
@@ -1092,6 +1175,46 @@
     verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
   }
 
+  /**
+   * When ignore_resource_deletion server feature is on, xDS client should keep the deleted listener
+   * on empty response, and resume the normal work when LDS contains the listener again.
+   * */
+  @Test
+  public void ldsResourceDeleted_ignoreResourceDeletion() {
+    Assume.assumeTrue(ignoreResourceDeletion());
+
+    DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
+    verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
+
+    // Initial LDS response.
+    call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
+    call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
+    verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
+    verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
+    verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
+
+    // Empty LDS response does not delete the listener.
+    call.sendResponse(LDS, Collections.emptyList(), VERSION_2, "0001");
+    call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
+    // The resource is still ACKED at VERSION_1 (no changes).
+    verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
+    verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
+    // onResourceDoesNotExist not called
+    verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
+
+    // Next update is correct, and contains the listener again.
+    call.sendResponse(LDS, testListenerVhosts, VERSION_3, "0003");
+    call.verifyRequest(LDS, LDS_RESOURCE, VERSION_3, "0003", NODE);
+    verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
+    // LDS is now ACKEd at VERSION_3.
+    verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_3,
+        TIME_INCREMENT * 3);
+    verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
+    verifyNoMoreInteractions(ldsResourceWatcher);
+  }
+
   @Test
   public void multipleLdsWatchers() {
     String ldsResourceTwo = "bar.googleapis.com";
@@ -1119,17 +1242,14 @@
     call.sendResponse(LDS, ImmutableList.of(testListenerVhosts, listenerTwo), VERSION_1, "0000");
     // ldsResourceWatcher called with listenerVhosts.
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     // watcher1 called with listenerTwo.
     verify(watcher1).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts()).isNull();
     // watcher2 called with listenerTwo.
     verify(watcher2).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts()).isNull();
     // Metadata of both listeners is stored.
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
@@ -1265,7 +1385,7 @@
     // Client sends an ACK RDS request.
     call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@@ -1279,7 +1399,7 @@
     // Client sends an ACK RDS request.
     call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@@ -1296,7 +1416,7 @@
     RdsResourceWatcher watcher = mock(RdsResourceWatcher.class);
     xdsClient.watchRdsResource(RDS_RESOURCE, watcher);
     verify(watcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     call.verifyNoMoreRequest();
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@@ -1325,7 +1445,7 @@
     call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
     call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
 
     // Updated RDS response.
@@ -1353,23 +1473,25 @@
     DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
     call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
     verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
-        .isEqualTo(RDS_RESOURCE);
+    verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
     verifyResourceMetadataRequested(RDS, RDS_RESOURCE);
     verifySubscribedResourcesMetadataSizes(1, 0, 1, 0);
 
     call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT * 2);
     verifySubscribedResourcesMetadataSizes(1, 0, 1, 0);
 
+    // The Listener is getting replaced configured with an RDS name, to the one configured with
+    // vhosts. Expect the RDS resources to be discarded.
+    // Note that this must work the same despite the ignore_resource_deletion feature is on.
+    // This happens because the Listener is getting replaced, and not deleted.
     call.sendResponse(LDS, testListenerVhosts, VERSION_2, "0001");
     verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture());
-    assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
-        .hasSize(VHOST_SIZE);
+    verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
     verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE);
     verifyResourceMetadataDoesNotExist(RDS, RDS_RESOURCE);
     verifyResourceMetadataAcked(
@@ -1413,11 +1535,13 @@
     // Simulates receiving the requested RDS resource.
     call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT * 2);
 
     // Simulates receiving an updated version of the requested LDS resource as a TCP listener
     // with a filter chain containing inlined RouteConfiguration.
+    // Note that this must work the same despite the ignore_resource_deletion feature is on.
+    // This happens because the Listener is getting replaced, and not deleted.
     hcmFilter = mf.buildHttpConnectionManagerFilter(
         null,
         mf.buildRouteConfiguration(
@@ -1466,7 +1590,7 @@
 
     call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
     verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
-    assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
+    verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
     verifyNoMoreInteractions(watcher1, watcher2);
     verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
     verifyResourceMetadataDoesNotExist(RDS, rdsResourceTwo);
@@ -1598,7 +1722,12 @@
     verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
     verifyResourceMetadataNacked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
         VERSION_2, TIME_INCREMENT * 2, errorsV2);
-    verifyResourceMetadataDoesNotExist(CDS, "C");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(CDS, "C");
+    } else {
+      // When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
+      verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
+    }
     call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
 
     // CDS -> {B, C} version 3
@@ -1612,7 +1741,12 @@
     call.sendResponse(CDS, resourcesV3.values().asList(), VERSION_3, "0002");
     // {A} -> does not exit
     // {B, C} -> ACK, version 3
-    verifyResourceMetadataDoesNotExist(CDS, "A");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(CDS, "A");
+    } else {
+      // When resource deletion is disabled, {A} stays ACKed in the previous version VERSION_2.
+      verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
+    }
     verifyResourceMetadataAcked(CDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
     verifyResourceMetadataAcked(CDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
     call.verifyRequest(CDS, subscribedResourceNames, VERSION_3, "0002", NODE);
@@ -1684,14 +1818,26 @@
     verifyResourceMetadataNacked(
         CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
         errorsV2);
-    verifyResourceMetadataDoesNotExist(CDS, "C");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(CDS, "C");
+    } else {
+      // When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
+      verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
+    }
     call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
-    // {A.1} -> does not exist
+    // {A.1} -> does not exist, missing from {A}
     // {B.1} -> version 1
-    // {C.1} -> does not exist
+    // {C.1} -> does not exist because {C} does not exist
     verifyResourceMetadataDoesNotExist(EDS, "A.1");
     verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
-    verifyResourceMetadataDoesNotExist(EDS, "C.1");
+    if (!ignoreResourceDeletion()) {
+      verifyResourceMetadataDoesNotExist(EDS, "C.1");
+    } else {
+      // When resource deletion is disabled, {C.1} is not deleted when {C} is deleted.
+      // Verify {C.1} stays in the previous version VERSION_1.
+      verifyResourceMetadataAcked(EDS, "C.1", resourcesV11.get("C.1"), VERSION_1,
+          TIME_INCREMENT * 2);
+    }
   }
 
   @Test
@@ -1702,18 +1848,7 @@
     // Client sent an ACK CDS request.
     call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
-    CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
-    assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
-    assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
-    assertThat(cdsUpdate.edsServiceName()).isNull();
-    LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
-    assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
-    List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
-        JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
-    assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isNull();
-    assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
-    assertThat(cdsUpdate.upstreamTlsContext()).isNull();
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
         TIME_INCREMENT);
@@ -1728,18 +1863,7 @@
     // Client sent an ACK CDS request.
     call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
-    CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
-    assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
-    assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
-    assertThat(cdsUpdate.edsServiceName()).isNull();
-    LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
-    assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
-    List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
-        JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
-    assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isNull();
-    assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
-    assertThat(cdsUpdate.upstreamTlsContext()).isNull();
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
     assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
         TIME_INCREMENT);
@@ -1992,18 +2116,7 @@
     CdsResourceWatcher watcher = mock(CdsResourceWatcher.class);
     xdsClient.watchCdsResource(CDS_RESOURCE, watcher);
     verify(watcher).onChanged(cdsUpdateCaptor.capture());
-    CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
-    assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
-    assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
-    assertThat(cdsUpdate.edsServiceName()).isNull();
-    LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
-    assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
-    List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
-        JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
-    assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isNull();
-    assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
-    assertThat(cdsUpdate.upstreamTlsContext()).isNull();
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
     call.verifyNoMoreRequest();
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
         TIME_INCREMENT);
@@ -2070,7 +2183,7 @@
     childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
         JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
     assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
+    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterEds, VERSION_2, TIME_INCREMENT * 2);
@@ -2126,6 +2239,8 @@
 
   @Test
   public void cdsResourceDeleted() {
+    Assume.assumeFalse(ignoreResourceDeletion());
+
     DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
     verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
 
@@ -2133,18 +2248,7 @@
     call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
     call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
-    CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
-    assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
-    assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
-    assertThat(cdsUpdate.edsServiceName()).isNull();
-    LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
-    assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
-    List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
-        JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
-    assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isNull();
-    assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
-    assertThat(cdsUpdate.upstreamTlsContext()).isNull();
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
         TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
@@ -2157,6 +2261,48 @@
     verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
   }
 
+  /**
+   * When ignore_resource_deletion server feature is on, xDS client should keep the deleted cluster
+   * on empty response, and resume the normal work when CDS contains the cluster again.
+   * */
+  @Test
+  public void cdsResourceDeleted_ignoreResourceDeletion() {
+    Assume.assumeTrue(ignoreResourceDeletion());
+
+    DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
+    verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
+
+    // Initial CDS response.
+    call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
+    call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
+    verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
+    verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
+        TIME_INCREMENT);
+    verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
+
+    // Empty LDS response does not delete the cluster.
+    call.sendResponse(CDS, Collections.emptyList(), VERSION_2, "0001");
+    call.verifyRequest(CDS, CDS_RESOURCE, VERSION_2, "0001", NODE);
+
+    // The resource is still ACKED at VERSION_1 (no changes).
+    verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
+        TIME_INCREMENT);
+    verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
+    // onResourceDoesNotExist must not be called.
+    verify(cdsResourceWatcher, never()).onResourceDoesNotExist(CDS_RESOURCE);
+
+    // Next update is correct, and contains the cluster again.
+    call.sendResponse(CDS, testClusterRoundRobin, VERSION_3, "0003");
+    call.verifyRequest(CDS, CDS_RESOURCE, VERSION_3, "0003", NODE);
+    verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
+    verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
+    verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_3,
+        TIME_INCREMENT * 3);
+    verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
+    verifyNoMoreInteractions(cdsResourceWatcher);
+  }
+
   @Test
   public void multipleCdsWatchers() {
     String cdsResourceTwo = "cluster-bar.googleapis.com";
@@ -2211,7 +2357,7 @@
     childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
         JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
     assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
+    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
     verify(watcher2).onChanged(cdsUpdateCaptor.capture());
@@ -2224,7 +2370,7 @@
     childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
         JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
     assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
-    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
+    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
     assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
     assertThat(cdsUpdate.upstreamTlsContext()).isNull();
     // Metadata of both clusters is stored.
@@ -2369,7 +2515,7 @@
     // Client sent an ACK EDS request.
     call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
-    validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
+    validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
         TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
@@ -2383,7 +2529,7 @@
     // Client sent an ACK EDS request.
     call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
-    validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
+    validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
     verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
         TIME_INCREMENT);
     verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
@@ -2400,7 +2546,7 @@
     EdsResourceWatcher watcher = mock(EdsResourceWatcher.class);
     xdsClient.watchEdsResource(EDS_RESOURCE, watcher);
     verify(watcher).onChanged(edsUpdateCaptor.capture());
-    validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
+    validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
     call.verifyNoMoreRequest();
     verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
         TIME_INCREMENT);
@@ -2430,7 +2576,7 @@
     call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
     verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
     EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
-    validateTestClusterLoadAssigment(edsUpdate);
+    validateGoldenClusterLoadAssignment(edsUpdate);
     verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
         TIME_INCREMENT);
 
@@ -2505,7 +2651,7 @@
     verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture());
     CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
     assertThat(cdsUpdate.edsServiceName()).isEqualTo(null);
-    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
+    assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
     verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
     cdsUpdate = cdsUpdateCaptor.getValue();
     assertThat(cdsUpdate.edsServiceName()).isEqualTo(EDS_RESOURCE);
@@ -2553,7 +2699,10 @@
     call.sendResponse(CDS, clusters, VERSION_2, "0001");
     verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
     assertThat(cdsUpdateCaptor.getValue().edsServiceName()).isNull();
+    // Note that the endpoint must be deleted even if the ignore_resource_deletion feature.
+    // This happens because the cluster CDS_RESOURCE is getting replaced, and not deleted.
     verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE);
+    verify(edsResourceWatcher, never()).onResourceDoesNotExist(resource);
     verifyNoMoreInteractions(cdsWatcher, edsWatcher);
     verifyResourceMetadataDoesNotExist(EDS, EDS_RESOURCE);
     verifyResourceMetadataAcked(
@@ -2588,7 +2737,7 @@
     call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000");
     verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
     EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
-    validateTestClusterLoadAssigment(edsUpdate);
+    validateGoldenClusterLoadAssignment(edsUpdate);
     verifyNoMoreInteractions(watcher1, watcher2);
     verifyResourceMetadataAcked(
         EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, TIME_INCREMENT);
@@ -2848,7 +2997,7 @@
   public void reportLoadStatsToServer() {
     xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher);
     String clusterName = "cluster-foo.googleapis.com";
-    ClusterDropStats dropStats = xdsClient.addClusterDropStats(lrsServerInfo, clusterName, null);
+    ClusterDropStats dropStats = xdsClient.addClusterDropStats(xdsServerInfo, clusterName, null);
     LrsRpcCall lrsCall = loadReportCalls.poll();
     lrsCall.verifyNextReportClusters(Collections.<String[]>emptyList()); // initial LRS request
 
diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java
index b47193e..b302b56 100644
--- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java
+++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java
@@ -100,16 +100,27 @@
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.ArgumentMatcher;
 import org.mockito.InOrder;
 
 /**
  * Tests for {@link ClientXdsClient} with protocol version v2.
  */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
 
+  /** Parameterized test cases. */
+  @Parameters(name = "ignoreResourceDeletion={0}")
+  public static Iterable<? extends Boolean> data() {
+    return ImmutableList.of(false, true);
+  }
+
+  @Parameter
+  public boolean ignoreResourceDeletion;
+
   @Override
   protected BindableService createAdsService() {
     return new AggregatedDiscoveryServiceImplBase() {
@@ -168,6 +179,11 @@
     return false;
   }
 
+  @Override
+  protected boolean ignoreResourceDeletion() {
+    return ignoreResourceDeletion;
+  }
+
   private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall {
     StreamObserver<DiscoveryRequest> requestObserver;
     StreamObserver<DiscoveryResponse> responseObserver;
diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java
index 432cadb..4f86b17 100644
--- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java
+++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java
@@ -108,16 +108,27 @@
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.ArgumentMatcher;
 import org.mockito.InOrder;
 
 /**
  * Tests for {@link ClientXdsClient} with protocol version v3.
  */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
 
+  /** Parameterized test cases. */
+  @Parameters(name = "ignoreResourceDeletion={0}")
+  public static Iterable<? extends Boolean> data() {
+    return ImmutableList.of(false, true);
+  }
+
+  @Parameter
+  public boolean ignoreResourceDeletion;
+
   @Override
   protected BindableService createAdsService() {
     return new AggregatedDiscoveryServiceImplBase() {
@@ -176,6 +187,11 @@
     return true;
   }
 
+  @Override
+  protected boolean ignoreResourceDeletion() {
+    return ignoreResourceDeletion;
+  }
+
   private static class DiscoveryRpcCallV3 extends DiscoveryRpcCall {
     StreamObserver<DiscoveryRequest> requestObserver;
     StreamObserver<DiscoveryResponse> responseObserver;