xds: fix ServerXdsClient to return subscribed resources only for LDS (#7689)
diff --git a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
index 2f27d62..1d8f48b 100644
--- a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
@@ -56,20 +56,25 @@
@Nullable
private ListenerWatcher listenerWatcher;
private int listenerPort = -1;
- private final boolean newServerApi;
+ private final boolean useNewApiForListenerQuery;
@Nullable private final String instanceIp;
private String grpcServerResourceId;
@Nullable
private ScheduledHandle ldsRespTimer;
- ServerXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
- BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier,
- boolean newServerApi, String instanceIp, String grpcServerResourceId) {
+ ServerXdsClient(
+ XdsChannel channel,
+ Node node,
+ ScheduledExecutorService timeService,
+ BackoffPolicy.Provider backoffPolicyProvider,
+ Supplier<Stopwatch> stopwatchSupplier,
+ boolean useNewApiForListenerQuery,
+ String instanceIp,
+ String grpcServerResourceId) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
- this.newServerApi = channel.isUseProtocolV3() && newServerApi;
+ this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery;
this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
- this.grpcServerResourceId =
- (grpcServerResourceId != null) ? grpcServerResourceId : "grpc/server";
+ this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server";
}
@Override
@@ -78,7 +83,7 @@
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
listenerPort = port;
- if (newServerApi) {
+ if (useNewApiForListenerQuery) {
String listeningAddress = instanceIp + ":" + listenerPort;
grpcServerResourceId =
grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
@@ -89,7 +94,7 @@
@Override
public void run() {
getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
- if (!newServerApi) {
+ if (!useNewApiForListenerQuery) {
updateNodeMetadataForListenerRequest(port);
}
adjustResourceSubscription(ResourceType.LDS);
@@ -107,7 +112,10 @@
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
- if (newServerApi) {
+ if (type != ResourceType.LDS) {
+ return null;
+ }
+ if (useNewApiForListenerQuery) {
return ImmutableList.<String>of(grpcServerResourceId);
} else {
return Collections.emptyList();
@@ -175,17 +183,17 @@
}
private boolean isRequestedListener(Listener listener) {
- if (newServerApi) {
+ if (useNewApiForListenerQuery) {
return grpcServerResourceId.equals(listener.getName())
- && listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
- && isAddressMatching(listener.getAddress(), listenerPort);
+ && listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
+ && isAddressMatching(listener.getAddress(), listenerPort);
}
return isAddressMatching(listener.getAddress(), 15001)
&& hasMatchingFilter(listener.getFilterChainsList());
}
private boolean isAddressMatching(Address address, int portToMatch) {
- return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch);
+ return address.hasSocketAddress() && address.getSocketAddress().getPortValue() == portToMatch;
}
private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
index 37b4adb..7534153 100644
--- a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
@@ -141,7 +141,7 @@
private ListenerWatcher listenerWatcher;
private ManagedChannel channel;
- private XdsClient xdsClient;
+ private ServerXdsClient xdsClient;
@Before
public void setUp() throws IOException {
@@ -531,6 +531,7 @@
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -551,6 +552,7 @@
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -571,6 +573,7 @@
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server sends back a LDS response.
response = buildDiscoveryResponse("1", listeners,
@@ -595,6 +598,7 @@
.onNext(eq(buildDiscoveryRequest(NODE, "1",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -616,7 +620,7 @@
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
- backoffPolicy2);
+ backoffPolicy2, requestObserver);
}
static Listener buildListenerWithFilterChain(String name, int portValue, String address,
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
index 5730250..2f34e34 100644
--- a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
@@ -636,6 +636,7 @@
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
+ verifyNoMoreInteractions(requestObserver);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
@@ -675,6 +676,7 @@
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -694,6 +696,7 @@
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -713,6 +716,7 @@
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server sends back a LDS response.
response = buildDiscoveryResponseV2("1", listeners,
@@ -736,6 +740,7 @@
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "")));
+ verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
@@ -756,7 +761,7 @@
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
- backoffPolicy2);
+ backoffPolicy2, requestObserver);
}
static Listener buildListenerWithFilterChain(String name, int portValue, String address,