xds: cluster manager to delay picker updates (#9365)
Do not perform picker updates while handling new addresses even if child
LBs request it. Assure that a single picker update is done.
diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
index 0557f3a..8500987 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
@@ -57,6 +57,8 @@
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final XdsLogger logger;
+ // Set to true if currently in the process of handling resolved addresses.
+ private boolean resolvingAddresses;
ClusterManagerLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
@@ -69,6 +71,15 @@
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ try {
+ resolvingAddresses = true;
+ handleResolvedAddressesInternal(resolvedAddresses);
+ } finally {
+ resolvingAddresses = false;
+ }
+ }
+
+ public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
ClusterManagerConfig config = (ClusterManagerConfig)
resolvedAddresses.getLoadBalancingPolicyConfig();
@@ -251,21 +262,18 @@
@Override
public void updateBalancingState(final ConnectivityState newState,
final SubchannelPicker newPicker) {
- syncContext.execute(new Runnable() {
- @Override
- public void run() {
- if (!childLbStates.containsKey(name)) {
- return;
- }
- // Subchannel picker and state are saved, but will only be propagated to the channel
- // when the child instance exits deactivated state.
- currentState = newState;
- currentPicker = newPicker;
- if (!deactivated) {
- updateOverallBalancingState();
- }
- }
- });
+ // If we are already in the process of resolving addresses, the overall balancing state
+ // will be updated at the end of it, and we don't need to trigger that update here.
+ if (resolvingAddresses || !childLbStates.containsKey(name)) {
+ return;
+ }
+ // Subchannel picker and state are saved, but will only be propagated to the channel
+ // when the child instance exits deactivated state.
+ currentState = newState;
+ currentPicker = newPicker;
+ if (!deactivated) {
+ updateOverallBalancingState();
+ }
}
@Override
diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
index 5890f6f..043c27e 100644
--- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
@@ -17,6 +17,7 @@
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
@@ -52,6 +53,7 @@
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.testing.TestMethodDescriptors;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
+import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -249,6 +251,15 @@
assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("unknown error");
}
+ @Test
+ public void noDuplicateOverallBalancingStateUpdate() {
+ deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
+
+ // The test child LBs would have triggered state updates, let's make sure the overall balancing
+ // state was only updated once.
+ verify(helper, times(1)).updateBalancingState(any(), any());
+ }
+
private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
clusterManagerLoadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
@@ -329,6 +340,10 @@
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
+
+ // Update balancing state here so that concurrent child state changes can be easily tested.
+ // Most tests ignore this and trigger separate child LB updates.
+ helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
}
@Override