xds: throw away subchannel references after ring_hash is shutdown (#8140)
Similar to 368c43aec4bbf73999fab8ab05edddbd7a4a2b0d.
Clean up subchannels after the RingHashLoadBalancer itself is shutdown to prevent further balancing state updates being propagated to the upstream.
Note this should not be considered as a fix for any problem anybody is noticing. Upstreams of RingHashLoadBalancer should not rely on this, it should still have its own logic for maintaining the lifecycle of downstream LB and ignore invalid upcalls when necessary.
diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
index 6ca61f1..260b45e 100644
--- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
@@ -188,6 +188,7 @@
for (Subchannel subchannel : subchannels.values()) {
shutdownSubchannel(subchannel);
}
+ subchannels.clear();
}
private void updateBalancingState() {
diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
index 5243ff8..eaa2160 100644
--- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
@@ -20,6 +20,7 @@
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -337,6 +338,26 @@
}
@Test
+ public void ignoreShutdownSubchannelStateChange() {
+ RingHashConfig config = new RingHashConfig(10, 100);
+ List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1);
+ loadBalancer.handleResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
+ verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
+ verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));
+
+ loadBalancer.shutdown();
+ for (Subchannel sc : subchannels.values()) {
+ verify(sc).shutdown();
+ // When the subchannel is being shut down, a SHUTDOWN connectivity state is delivered
+ // back to the subchannel state listener.
+ deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(SHUTDOWN));
+ }
+ verifyNoMoreInteractions(helper);
+ }
+
+ @Test
public void deterministicPickWithHostsPartiallyRemoved() {
RingHashConfig config = new RingHashConfig(10, 100);
List<EquivalentAddressGroup> servers = createWeightedServerAddrs(1, 1, 1, 1, 1);