xds: avoid pushing duplicate (CDS) resource data to watchers (#7143)
De-duplicate cluster update information pushed to cluster watchers.
This only applies to CDS as the management server sends a response with all requested clusters while only some of. them have changed (or newly been subscribed).
This does not apply to EDS as the protocol is incremental and each EDS response will only contain ClusterLoadAssignments for clusters whose endpoints have changed.
This does not apply to LDS and RDS as at any time we will subscribe to a single resource and our TD implementation will not send extra (unrequested) resources. So each time, the received responses always contain updated resource information.
diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java
index 8eb9068..befb826 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClient.java
@@ -182,6 +182,28 @@
.toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClusterUpdate that = (ClusterUpdate) o;
+ return Objects.equals(clusterName, that.clusterName)
+ && Objects.equals(edsServiceName, that.edsServiceName)
+ && Objects.equals(lbPolicy, that.lbPolicy)
+ && Objects.equals(lrsServerName, that.lrsServerName)
+ && Objects.equals(upstreamTlsContext, that.upstreamTlsContext);
+ }
+
static Builder newBuilder() {
return new Builder();
}
@@ -287,9 +309,9 @@
return false;
}
EndpointUpdate that = (EndpointUpdate) o;
- return clusterName.equals(that.clusterName)
- && localityLbEndpointsMap.equals(that.localityLbEndpointsMap)
- && dropPolicies.equals(that.dropPolicies);
+ return Objects.equals(clusterName, that.clusterName)
+ && Objects.equals(localityLbEndpointsMap, that.localityLbEndpointsMap)
+ && Objects.equals(dropPolicies, that.dropPolicies);
}
@Override
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
index ef16e6e..c5c90c8 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
@@ -1022,13 +1022,15 @@
// Update local CDS cache with data in this response.
absentCdsResources.removeAll(clusterUpdates.keySet());
- for (String clusterName : clusterNamesToClusterUpdates.keySet()) {
- if (!clusterUpdates.containsKey(clusterName)) {
+ for (Map.Entry<String, ClusterUpdate> entry : clusterNamesToClusterUpdates.entrySet()) {
+ if (!clusterUpdates.containsKey(entry.getKey())) {
// Some previously existing resource no longer exists.
- absentCdsResources.add(clusterName);
+ absentCdsResources.add(entry.getKey());
+ } else if (clusterUpdates.get(entry.getKey()).equals(entry.getValue())) {
+ clusterUpdates.remove(entry.getKey());
}
}
- clusterNamesToClusterUpdates.clear();
+ clusterNamesToClusterUpdates.keySet().removeAll(absentCdsResources);
clusterNamesToClusterUpdates.putAll(clusterUpdates);
// Remove EDS cache entries for ClusterLoadAssignments not referenced by this CDS response.
@@ -1056,12 +1058,13 @@
// Notify watchers if clusters interested in present in this CDS response.
for (Map.Entry<String, Set<ClusterWatcher>> entry : clusterWatchers.entrySet()) {
String clusterName = entry.getKey();
- if (clusterUpdates.containsKey(clusterName)) {
+ if (clusterUpdates.containsKey(entry.getKey())) {
ClusterUpdate clusterUpdate = clusterUpdates.get(clusterName);
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onClusterChanged(clusterUpdate);
}
- } else if (!cdsRespTimers.containsKey(clusterName)) {
+ } else if (!clusterNamesToClusterUpdates.containsKey(entry.getKey())
+ && !cdsRespTimers.containsKey(clusterName)) {
// Update for previously present resource being removed.
for (ClusterWatcher watcher : entry.getValue()) {
watcher.onResourceDoesNotExist(entry.getKey());
@@ -1192,6 +1195,8 @@
absentEdsResources.removeAll(endpointUpdates.keySet());
// Notify watchers waiting for updates of endpoint information received in this EDS response.
+ // Based on xDS protocol, the management server should not send endpoint data again if
+ // nothing has changed.
for (Map.Entry<String, EndpointUpdate> entry : endpointUpdates.entrySet()) {
String clusterName = entry.getKey();
// Cancel and delete response timeout timer.
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
index b9fb294..6b5e2a3 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java
@@ -1573,24 +1573,7 @@
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
- // All watchers received notification for cluster update.
- verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
- clusterUpdate1 = clusterUpdateCaptor1.getValue();
- assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
- assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
- assertThat(clusterUpdate1.getEdsServiceName()).isNull();
- assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
- assertThat(clusterUpdate1.getLrsServerName()).isNull();
-
- clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
- verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture());
- clusterUpdate2 = clusterUpdateCaptor2.getValue();
- assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
- assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
- assertThat(clusterUpdate2.getEdsServiceName()).isNull();
- assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
- assertThat(clusterUpdate2.getLrsServerName()).isNull();
-
+ verifyNoMoreInteractions(watcher1, watcher2); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
@@ -1728,14 +1711,7 @@
new DiscoveryRequestMatcher("1",
ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"),
XdsClientImpl.ADS_TYPE_URL_CDS, "0001")));
-
- verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
- clusterUpdate1 = clusterUpdateCaptor1.getValue();
- assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
- assertThat(clusterUpdate1.getEdsServiceName()).isNull();
- assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
- assertThat(clusterUpdate1.getLrsServerName()).isNull();
-
+ verifyNoMoreInteractions(watcher1); // resource has no change
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();