xds: cluster manager to delay picker updates (#9365)

Do not perform picker updates while handling new addresses even if child
LBs request it. Assure that a single picker update is done.
diff --git a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
index 0557f3a..8500987 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
@@ -57,6 +57,8 @@
   private final SynchronizationContext syncContext;
   private final ScheduledExecutorService timeService;
   private final XdsLogger logger;
+  // Set to true if currently in the process of handling resolved addresses.
+  private boolean resolvingAddresses;
 
   ClusterManagerLoadBalancer(Helper helper) {
     this.helper = checkNotNull(helper, "helper");
@@ -69,6 +71,15 @@
 
   @Override
   public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+    try {
+      resolvingAddresses = true;
+      handleResolvedAddressesInternal(resolvedAddresses);
+    } finally {
+      resolvingAddresses = false;
+    }
+  }
+
+  public void handleResolvedAddressesInternal(ResolvedAddresses resolvedAddresses) {
     logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
     ClusterManagerConfig config = (ClusterManagerConfig)
         resolvedAddresses.getLoadBalancingPolicyConfig();
@@ -251,21 +262,18 @@
       @Override
       public void updateBalancingState(final ConnectivityState newState,
           final SubchannelPicker newPicker) {
-        syncContext.execute(new Runnable() {
-          @Override
-          public void run() {
-            if (!childLbStates.containsKey(name)) {
-              return;
-            }
-            // Subchannel picker and state are saved, but will only be propagated to the channel
-            // when the child instance exits deactivated state.
-            currentState = newState;
-            currentPicker = newPicker;
-            if (!deactivated) {
-              updateOverallBalancingState();
-            }
-          }
-        });
+        // If we are already in the process of resolving addresses, the overall balancing state
+        // will be updated at the end of it, and we don't need to trigger that update here.
+        if (resolvingAddresses || !childLbStates.containsKey(name)) {
+          return;
+        }
+        // Subchannel picker and state are saved, but will only be propagated to the channel
+        // when the child instance exits deactivated state.
+        currentState = newState;
+        currentPicker = newPicker;
+        if (!deactivated) {
+          updateOverallBalancingState();
+        }
       }
 
       @Override
diff --git a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
index 5890f6f..043c27e 100644
--- a/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClusterManagerLoadBalancerTest.java
@@ -17,6 +17,7 @@
 package io.grpc.xds;
 
 import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeast;
@@ -52,6 +53,7 @@
 import io.grpc.internal.ServiceConfigUtil.PolicySelection;
 import io.grpc.testing.TestMethodDescriptors;
 import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
+import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -249,6 +251,15 @@
     assertThat(childBalancer2.upstreamError.getDescription()).isEqualTo("unknown error");
   }
 
+  @Test
+  public void noDuplicateOverallBalancingStateUpdate() {
+    deliverResolvedAddresses(ImmutableMap.of("childA", "policy_a", "childB", "policy_b"));
+
+    // The test child LBs would have triggered state updates, let's make sure the overall balancing
+    // state was only updated once.
+    verify(helper, times(1)).updateBalancingState(any(), any());
+  }
+
   private void deliverResolvedAddresses(final Map<String, String> childPolicies) {
     clusterManagerLoadBalancer.handleResolvedAddresses(
         ResolvedAddresses.newBuilder()
@@ -329,6 +340,10 @@
     @Override
     public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
       config = resolvedAddresses.getLoadBalancingPolicyConfig();
+
+      // Update balancing state here so that concurrent child state changes can be easily tested.
+      // Most tests ignore this and trigger separate child LB updates.
+      helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(Status.INTERNAL));
     }
 
     @Override