xds: fix ServerXdsClient to return subscribed resources only for LDS (#7689)

diff --git a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
index 2f27d62..1d8f48b 100644
--- a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
@@ -56,20 +56,25 @@
   @Nullable
   private ListenerWatcher listenerWatcher;
   private int listenerPort = -1;
-  private final boolean newServerApi;
+  private final boolean useNewApiForListenerQuery;
   @Nullable private final String instanceIp;
   private String grpcServerResourceId;
   @Nullable
   private ScheduledHandle ldsRespTimer;
 
-  ServerXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
-      BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier,
-      boolean newServerApi, String instanceIp, String grpcServerResourceId) {
+  ServerXdsClient(
+      XdsChannel channel,
+      Node node,
+      ScheduledExecutorService timeService,
+      BackoffPolicy.Provider backoffPolicyProvider,
+      Supplier<Stopwatch> stopwatchSupplier,
+      boolean useNewApiForListenerQuery,
+      String instanceIp,
+      String grpcServerResourceId) {
     super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
-    this.newServerApi = channel.isUseProtocolV3() && newServerApi;
+    this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery;
     this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
-    this.grpcServerResourceId =
-        (grpcServerResourceId != null) ? grpcServerResourceId : "grpc/server";
+    this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server";
   }
 
   @Override
@@ -78,7 +83,7 @@
     listenerWatcher = checkNotNull(watcher, "watcher");
     checkArgument(port > 0, "port needs to be > 0");
     listenerPort = port;
-    if (newServerApi) {
+    if (useNewApiForListenerQuery) {
       String listeningAddress = instanceIp + ":" + listenerPort;
       grpcServerResourceId =
           grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
@@ -89,7 +94,7 @@
       @Override
       public void run() {
         getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
-        if (!newServerApi) {
+        if (!useNewApiForListenerQuery) {
           updateNodeMetadataForListenerRequest(port);
         }
         adjustResourceSubscription(ResourceType.LDS);
@@ -107,7 +112,10 @@
   @Nullable
   @Override
   Collection<String> getSubscribedResources(ResourceType type) {
-    if (newServerApi) {
+    if (type != ResourceType.LDS) {
+      return null;
+    }
+    if (useNewApiForListenerQuery) {
       return ImmutableList.<String>of(grpcServerResourceId);
     } else {
       return Collections.emptyList();
@@ -175,17 +183,17 @@
   }
 
   private boolean isRequestedListener(Listener listener) {
-    if (newServerApi) {
+    if (useNewApiForListenerQuery) {
       return grpcServerResourceId.equals(listener.getName())
-              && listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
-              && isAddressMatching(listener.getAddress(), listenerPort);
+          && listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
+          && isAddressMatching(listener.getAddress(), listenerPort);
     }
     return isAddressMatching(listener.getAddress(), 15001)
         && hasMatchingFilter(listener.getFilterChainsList());
   }
 
   private boolean isAddressMatching(Address address, int portToMatch) {
-    return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch);
+    return address.hasSocketAddress() && address.getSocketAddress().getPortValue() == portToMatch;
   }
 
   private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
index 37b4adb..7534153 100644
--- a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
@@ -141,7 +141,7 @@
   private ListenerWatcher listenerWatcher;
 
   private ManagedChannel channel;
-  private XdsClient xdsClient;
+  private ServerXdsClient xdsClient;
 
   @Before
   public void setUp() throws IOException {
@@ -531,6 +531,7 @@
         .onNext(eq(buildDiscoveryRequest(NODE, "0",
                 ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
                 ResourceType.LDS.typeUrl(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server becomes unreachable.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -551,6 +552,7 @@
         .onNext(eq(buildDiscoveryRequest(NODE, "0",
                 ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
                 ResourceType.LDS.typeUrl(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server is still not reachable.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -571,6 +573,7 @@
         .onNext(eq(buildDiscoveryRequest(NODE, "0",
                 ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
                 ResourceType.LDS.typeUrl(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server sends back a LDS response.
     response = buildDiscoveryResponse("1", listeners,
@@ -595,6 +598,7 @@
         .onNext(eq(buildDiscoveryRequest(NODE, "1",
                 ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
                 ResourceType.LDS.typeUrl(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server becomes unreachable again.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -616,7 +620,7 @@
                 ResourceType.LDS.typeUrl(), "")));
 
     verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
-        backoffPolicy2);
+        backoffPolicy2, requestObserver);
   }
 
   static Listener buildListenerWithFilterChain(String name, int portValue, String address,
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
index 5730250..2f34e34 100644
--- a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
@@ -636,6 +636,7 @@
     verify(requestObserver)
         .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
             ResourceType.LDS.typeUrlV2(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
     final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
@@ -675,6 +676,7 @@
     verify(requestObserver)
         .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
             ResourceType.LDS.typeUrlV2(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server becomes unreachable.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -694,6 +696,7 @@
     verify(requestObserver)
         .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
             ResourceType.LDS.typeUrlV2(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server is still not reachable.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -713,6 +716,7 @@
     verify(requestObserver)
         .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
             ResourceType.LDS.typeUrlV2(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server sends back a LDS response.
     response = buildDiscoveryResponseV2("1", listeners,
@@ -736,6 +740,7 @@
     verify(requestObserver)
         .onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
             ResourceType.LDS.typeUrlV2(), "")));
+    verifyNoMoreInteractions(requestObserver);
 
     // Management server becomes unreachable again.
     responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -756,7 +761,7 @@
             ResourceType.LDS.typeUrlV2(), "")));
 
     verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
-        backoffPolicy2);
+        backoffPolicy2, requestObserver);
   }
 
   static Listener buildListenerWithFilterChain(String name, int portValue, String address,