xds: implement XdsRoutingLoadBalancer
diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java
index bb07c84..8ea9afc 100644
--- a/core/src/test/java/io/grpc/internal/TestUtils.java
+++ b/core/src/test/java/io/grpc/internal/TestUtils.java
@@ -24,6 +24,11 @@
import io.grpc.CallOptions;
import io.grpc.ChannelLogger;
import io.grpc.InternalLogId;
+import io.grpc.LoadBalancer.PickResult;
+import io.grpc.LoadBalancer.PickSubchannelArgs;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancerProvider;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.net.SocketAddress;
@@ -36,7 +41,41 @@
/**
* Common utility methods for tests.
*/
-final class TestUtils {
+public final class TestUtils {
+
+ /** Base class for a standard LoadBalancerProvider implementation. */
+ public abstract static class StandardLoadBalancerProvider extends LoadBalancerProvider {
+ private final String policyName;
+
+ public StandardLoadBalancerProvider(String policyName) {
+ this.policyName = policyName;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 5;
+ }
+
+ @Override
+ public final String getPolicyName() {
+ return policyName;
+ }
+ }
+
+ /** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */
+ public static SubchannelPicker pickerOf(final Subchannel subchannel) {
+ return new SubchannelPicker() {
+ @Override
+ public PickResult pickSubchannel(PickSubchannelArgs args) {
+ return PickResult.withSubchannel(subchannel);
+ }
+ };
+ }
static class MockClientTransportInfo {
/**
diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java
index 5fad2d4..1005061 100644
--- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancer.java
@@ -16,18 +16,205 @@
package io.grpc.xds;
-import io.grpc.LoadBalancer;
-import io.grpc.Status;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
-// TODO(zdapeng): Implementation.
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.ConnectivityState;
+import io.grpc.InternalLogId;
+import io.grpc.LoadBalancer;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import io.grpc.util.ForwardingLoadBalancerHelper;
+import io.grpc.util.GracefulSwitchLoadBalancer;
+import io.grpc.xds.XdsLogger.XdsLogLevel;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig;
+import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
/** Load balancer for xds_routing policy. */
final class XdsRoutingLoadBalancer extends LoadBalancer {
+ private final XdsLogger logger;
+ private final Helper helper;
+ private final Map<String, GracefulSwitchLoadBalancer> routeBalancers = new HashMap<>();
+ private final Map<String, RouteHelper> routeHelpers = new HashMap<>();
+
+ private Map<String, PolicySelection> actions = ImmutableMap.of();
+ private List<Route> routes = ImmutableList.of();
+
+ XdsRoutingLoadBalancer(Helper helper) {
+ this.helper = checkNotNull(helper, "helper");
+ logger = XdsLogger.withLogId(
+ InternalLogId.allocate("xds-routing-lb", helper.getAuthority()));
+ logger.log(XdsLogLevel.INFO, "Created");
+ }
+
+ @Override
+ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
+ XdsRoutingConfig xdsRoutingConfig =
+ (XdsRoutingConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
+ checkNotNull(xdsRoutingConfig, "Missing xds_routing lb config");
+
+ Map<String, PolicySelection> newActions = xdsRoutingConfig.actions;
+ for (String actionName : newActions.keySet()) {
+ PolicySelection action = newActions.get(actionName);
+ if (!actions.containsKey(actionName)) {
+ RouteHelper routeHelper = new RouteHelper();
+ GracefulSwitchLoadBalancer routeBalancer = new GracefulSwitchLoadBalancer(routeHelper);
+ routeBalancer.switchTo(action.getProvider());
+ routeHelpers.put(actionName, routeHelper);
+ routeBalancers.put(actionName, routeBalancer);
+ } else if (!action.getProvider().equals(actions.get(actionName).getProvider())) {
+ routeBalancers.get(actionName).switchTo(action.getProvider());
+ }
+ }
+
+ this.routes = xdsRoutingConfig.routes;
+ this.actions = newActions;
+
+ for (String actionName : actions.keySet()) {
+ routeBalancers.get(actionName).handleResolvedAddresses(
+ resolvedAddresses.toBuilder()
+ .setLoadBalancingPolicyConfig(actions.get(actionName).getConfig())
+ .build());
+ }
+
+ // Cleanup removed actions.
+ // TODO(zdapeng): cache removed actions for 15 minutes.
+ for (String actionName : routeBalancers.keySet()) {
+ if (!actions.containsKey(actionName)) {
+ routeBalancers.get(actionName).shutdown();
+ }
+ }
+ routeBalancers.keySet().retainAll(actions.keySet());
+ routeHelpers.keySet().retainAll(actions.keySet());
+ }
+
@Override
public void handleNameResolutionError(Status error) {
+ logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
+ if (routeBalancers.isEmpty()) {
+ helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
+ }
+ for (LoadBalancer routeBalancer : routeBalancers.values()) {
+ routeBalancer.handleNameResolutionError(error);
+ }
}
@Override
public void shutdown() {
+ logger.log(XdsLogLevel.INFO, "Shutdown");
+ for (LoadBalancer routeBalancer : routeBalancers.values()) {
+ routeBalancer.shutdown();
+ }
+ }
+
+ @Override
+ public boolean canHandleEmptyAddressListFromNameResolution() {
+ return true;
+ }
+
+ private void updateOverallBalancingState() {
+ ConnectivityState overallState = null;
+ // Use LinkedHashMap to preserve the order of routes.
+ Map<MethodName, SubchannelPicker> routePickers = new LinkedHashMap<>();
+ for (Route route : routes) {
+ RouteHelper routeHelper = routeHelpers.get(route.actionName);
+ routePickers.put(route.methodName, routeHelper.currentPicker);
+ ConnectivityState routeState = routeHelper.currentState;
+ overallState = aggregateState(overallState, routeState);
+ }
+ if (overallState != null) {
+ SubchannelPicker picker = new PathMatchingSubchannelPicker(routePickers);
+ helper.updateBalancingState(overallState, picker);
+ }
+ }
+
+ @Nullable
+ private static ConnectivityState aggregateState(
+ @Nullable ConnectivityState overallState, ConnectivityState childState) {
+ if (overallState == null) {
+ return childState;
+ }
+ if (overallState == READY || childState == READY) {
+ return READY;
+ }
+ if (overallState == CONNECTING || childState == CONNECTING) {
+ return CONNECTING;
+ }
+ if (overallState == IDLE || childState == IDLE) {
+ return IDLE;
+ }
+ return overallState;
+ }
+
+ /**
+ * The lb helper for a single route balancer.
+ */
+ private final class RouteHelper extends ForwardingLoadBalancerHelper {
+ ConnectivityState currentState = CONNECTING;
+ SubchannelPicker currentPicker = BUFFER_PICKER;
+
+ @Override
+ public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
+ currentState = newState;
+ currentPicker = newPicker;
+ updateOverallBalancingState();
+ }
+
+ @Override
+ protected Helper delegate() {
+ return helper;
+ }
+ }
+
+ private static final class PathMatchingSubchannelPicker extends SubchannelPicker {
+
+ final Map<MethodName, SubchannelPicker> routePickers;
+
+ /**
+ * Constructs a picker that will match the path of PickSubchannelArgs with the given map.
+ * The order of the map entries matters. First match will be picked even if second match is an
+ * exact (service + method) path match.
+ */
+ PathMatchingSubchannelPicker(Map<MethodName, SubchannelPicker> routePickers) {
+ this.routePickers = routePickers;
+ }
+
+ @Override
+ public PickResult pickSubchannel(PickSubchannelArgs args) {
+ for (MethodName methodName : routePickers.keySet()) {
+ if (match(args.getMethodDescriptor(), methodName)) {
+ return routePickers.get(methodName).pickSubchannel(args);
+ }
+ }
+ // At least the default route should match, otherwise there is a bug.
+ throw new IllegalStateException("PathMatchingSubchannelPicker: error in matching path");
+ }
+
+ boolean match(MethodDescriptor<?, ?> methodDescriptor, MethodName methodName) {
+ if (methodName.service.isEmpty() && methodName.method.isEmpty()) {
+ return true;
+ }
+ if (methodName.method.isEmpty()) {
+ return methodName.service.equals(methodDescriptor.getServiceName());
+ }
+ return (methodName.service + '/' + methodName.method)
+ .equals(methodDescriptor.getFullMethodName());
+ }
}
}
diff --git a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java
index f824873..2a08ad0 100644
--- a/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java
+++ b/xds/src/main/java/io/grpc/xds/XdsRoutingLoadBalancerProvider.java
@@ -64,10 +64,6 @@
this.lbRegistry = lbRegistry;
}
- private LoadBalancerRegistry loadBalancerRegistry() {
- return lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : lbRegistry;
- }
-
@Override
public boolean isAvailable() {
return true;
@@ -85,8 +81,7 @@
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
- // TODO(zdapeng): pass helper and loadBalancerRegistry() to constructor args.
- return new XdsRoutingLoadBalancer();
+ return new XdsRoutingLoadBalancer(helper);
}
@Override
@@ -112,8 +107,10 @@
+ rawConfig));
}
+ LoadBalancerRegistry lbRegistry =
+ this.lbRegistry == null ? LoadBalancerRegistry.getDefaultRegistry() : this.lbRegistry;
ConfigOrError selectedConfigOrError =
- ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, loadBalancerRegistry());
+ ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, lbRegistry);
if (selectedConfigOrError.getError() != null) {
return selectedConfigOrError;
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java
new file mode 100644
index 0000000..a22fa4a
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/XdsRoutingLoadBalancerTest.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import io.grpc.Attributes;
+import io.grpc.Attributes.Key;
+import io.grpc.CallOptions;
+import io.grpc.ChannelLogger;
+import io.grpc.ConnectivityState;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.PickSubchannelArgs;
+import io.grpc.LoadBalancer.ResolvedAddresses;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodDescriptor.MethodType;
+import io.grpc.Status;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import io.grpc.internal.TestUtils;
+import io.grpc.internal.TestUtils.StandardLoadBalancerProvider;
+import io.grpc.testing.TestMethodDescriptors;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.MethodName;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.Route;
+import io.grpc.xds.XdsRoutingLoadBalancerProvider.XdsRoutingConfig;
+import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link XdsRoutingLoadBalancer}. */
+public class XdsRoutingLoadBalancerTest {
+
+ private final List<LoadBalancer> fooBalancers = new ArrayList<>();
+ private final List<LoadBalancer> barBalancers = new ArrayList<>();
+ private final List<LoadBalancer> bazBalancers = new ArrayList<>();
+ private final List<Helper> fooHelpers = new ArrayList<>();
+ private final List<Helper> barHelpers = new ArrayList<>();
+ private final List<Helper> bazHelpers = new ArrayList<>();
+
+ private final LoadBalancerProvider fooLbProvider =
+ new StandardLoadBalancerProvider("foo_policy") {
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ LoadBalancer lb = mock(LoadBalancer.class);
+ fooBalancers.add(lb);
+ fooHelpers.add(helper);
+ return lb;
+ }
+ };
+ private final LoadBalancerProvider barLbProvider =
+ new StandardLoadBalancerProvider("bar_policy") {
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ LoadBalancer lb = mock(LoadBalancer.class);
+ barBalancers.add(lb);
+ barHelpers.add(helper);
+ return lb;
+ }
+ };
+ private final LoadBalancerProvider bazLbProvider =
+ new StandardLoadBalancerProvider("baz_policy") {
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ LoadBalancer lb = mock(LoadBalancer.class);
+ bazBalancers.add(lb);
+ bazHelpers.add(helper);
+ return lb;
+ }
+ };
+
+ @Mock
+ private Helper helper;
+ @Mock
+ private ChannelLogger channelLogger;
+
+ private LoadBalancer xdsRoutingLb;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ doReturn(channelLogger).when(helper).getChannelLogger();
+ xdsRoutingLb = new XdsRoutingLoadBalancer(helper);
+ }
+
+ @After
+ public void tearDown() {
+ xdsRoutingLb.shutdown();
+
+ for (LoadBalancer balancer : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) {
+ verify(balancer).shutdown();
+ }
+ }
+
+ @Test
+ public void typicalWorkflow() {
+ // Resolution error.
+ xdsRoutingLb.handleNameResolutionError(Status.UNAUTHENTICATED);
+ verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
+
+ // Config update.
+ Attributes attributes =
+ Attributes.newBuilder().set(Key.<String>create("fakeKey"), "fakeVal").build();
+ Object fooConfig1 = new Object();
+ Object barConfig1 = new Object();
+ Object bazConfig1 = new Object();
+ Object fooConfig2 = new Object();
+ XdsRoutingConfig xdsRoutingConfig = new XdsRoutingConfig(
+ ImmutableList.of(
+ new Route("foo_action", new MethodName("service1", "method1")),
+ new Route("foo_action", new MethodName("service2", "method2")),
+ new Route("bar_action", new MethodName("service1", "hello")),
+ new Route("bar_action", new MethodName("service2", "hello")),
+ new Route("foo_action_2", new MethodName("service2", "")),
+ new Route("baz_action", new MethodName("", ""))),
+ ImmutableMap.of(
+ "foo_action",
+ new PolicySelection(fooLbProvider, null, fooConfig1),
+ "foo_action_2",
+ new PolicySelection(fooLbProvider, null, fooConfig2),
+ "bar_action",
+ new PolicySelection(barLbProvider, null, barConfig1),
+ "baz_action",
+ new PolicySelection(bazLbProvider, null, bazConfig1)));
+ xdsRoutingLb.handleResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.<EquivalentAddressGroup>of())
+ .setAttributes(attributes)
+ .setLoadBalancingPolicyConfig(xdsRoutingConfig).build());
+ assertThat(fooBalancers).hasSize(2);
+ ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor = ArgumentCaptor.forClass(null);
+ verify(fooBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ ResolvedAddresses resolvedAddressesFoo0 = resolvedAddressesCaptor.getValue();
+ verify(fooBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ ResolvedAddresses resolvedAddressesFoo1 = resolvedAddressesCaptor.getValue();
+ assertThat(barBalancers).hasSize(1);
+ verify(barBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ ResolvedAddresses resolvedAddressesBar = resolvedAddressesCaptor.getValue();
+ assertThat(bazBalancers).hasSize(1);
+ verify(bazBalancers.get(0)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ ResolvedAddresses resolvedAddressesBaz = resolvedAddressesCaptor.getValue();
+ assertThat(resolvedAddressesFoo0.getAttributes()).isEqualTo(attributes);
+ assertThat(resolvedAddressesFoo1.getAttributes()).isEqualTo(attributes);
+ assertThat(resolvedAddressesBar.getAttributes()).isEqualTo(attributes);
+ assertThat(resolvedAddressesBaz.getAttributes()).isEqualTo(attributes);
+ assertThat(
+ Arrays.asList(
+ resolvedAddressesFoo0.getLoadBalancingPolicyConfig(),
+ resolvedAddressesFoo1.getLoadBalancingPolicyConfig()))
+ .containsExactly(fooConfig1, fooConfig2);
+ LoadBalancer fooBalancer1;
+ Helper fooHelper1;
+ Helper fooHelper2;
+ if (resolvedAddressesFoo0.getLoadBalancingPolicyConfig().equals(fooConfig1)) {
+ fooBalancer1 = fooBalancers.get(0);
+ fooHelper1 = fooHelpers.get(0);
+ fooHelper2 = fooHelpers.get(1);
+ } else {
+ fooBalancer1 = fooBalancers.get(1);
+ fooHelper1 = fooHelpers.get(1);
+ fooHelper2 = fooHelpers.get(0);
+ }
+ assertThat(resolvedAddressesBar.getLoadBalancingPolicyConfig()).isEqualTo(barConfig1);
+ assertThat(resolvedAddressesBaz.getLoadBalancingPolicyConfig()).isEqualTo(bazConfig1);
+ Helper barHelper = barHelpers.get(0);
+ Helper bazHelper = bazHelpers.get(0);
+
+ // State update.
+ Subchannel subchannelFoo1 = mock(Subchannel.class);
+ Subchannel subchannelFoo2 = mock(Subchannel.class);
+ fooHelper1.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo1));
+ fooHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelFoo2));
+ barHelper.updateBalancingState(
+ TRANSIENT_FAILURE, new ErrorPicker(Status.ABORTED.withDescription("abort bar")));
+ bazHelper.updateBalancingState(
+ TRANSIENT_FAILURE, new ErrorPicker(Status.DATA_LOSS.withDescription("data loss baz")));
+ ArgumentCaptor<ConnectivityState> connectivityStateCaptor = ArgumentCaptor.forClass(null);
+ ArgumentCaptor<SubchannelPicker> subchannelPickerCaptor = ArgumentCaptor.forClass(null);
+ verify(helper, atLeastOnce()).updateBalancingState(
+ connectivityStateCaptor.capture(), subchannelPickerCaptor.capture());
+ assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY);
+ SubchannelPicker picker = subchannelPickerCaptor.getValue();
+ assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1);
+ assertPickerRoutePathToSubchannel(picker, "service2", "method2", subchannelFoo1);
+ assertPickerRoutePathToError(
+ picker, "service1", "hello", Status.ABORTED.withDescription("abort bar"));
+ assertPickerRoutePathToError(
+ picker, "service2", "hello", Status.ABORTED.withDescription("abort bar"));
+ assertPickerRoutePathToSubchannel(picker, "service2", "otherMethod", subchannelFoo2);
+ assertPickerRoutePathToError(
+ picker, "otherService", "hello", Status.DATA_LOSS.withDescription("data loss baz"));
+
+ // Resolution error.
+ Status error = Status.UNAVAILABLE.withDescription("fake unavailable");
+ xdsRoutingLb.handleNameResolutionError(error);
+ for (LoadBalancer lb : Iterables.concat(fooBalancers, barBalancers, bazBalancers)) {
+ verify(lb).handleNameResolutionError(error);
+ }
+
+ // New config update.
+ Object fooConfig3 = new Object();
+ Object barConfig2 = new Object();
+ Object barConfig3 = new Object();
+ Object bazConfig2 = new Object();
+ xdsRoutingConfig = new XdsRoutingConfig(
+ ImmutableList.of(
+ new Route("foo_action", new MethodName("service1", "method1")),
+ new Route("foo_action", new MethodName("service2", "method3")),
+ new Route("bar_action", new MethodName("service1", "hello")),
+ new Route("bar_action_2", new MethodName("service2", "hello")),
+ new Route("baz_action", new MethodName("", ""))),
+ ImmutableMap.of(
+ "foo_action",
+ new PolicySelection(fooLbProvider, null, fooConfig3),
+ "bar_action",
+ new PolicySelection(barLbProvider, null, barConfig2),
+ "bar_action_2",
+ new PolicySelection(barLbProvider, null, barConfig3),
+ "baz_action",
+ new PolicySelection(bazLbProvider, null, bazConfig2)));
+ xdsRoutingLb.handleResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.<EquivalentAddressGroup>of())
+ .setLoadBalancingPolicyConfig(xdsRoutingConfig)
+ .build());
+ verify(fooBalancer1, times(2)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
+ .isEqualTo(fooConfig3);
+ assertThat(barBalancers).hasSize(2);
+ verify(barBalancers.get(0), times(2))
+ .handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
+ .isEqualTo(barConfig2);
+ verify(barBalancers.get(1)).handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
+ .isEqualTo(barConfig3);
+ verify(bazBalancers.get(0), times(2))
+ .handleResolvedAddresses(resolvedAddressesCaptor.capture());
+ assertThat(resolvedAddressesCaptor.getValue().getLoadBalancingPolicyConfig())
+ .isEqualTo(bazConfig2);
+
+ // New status update.
+ Subchannel subchannelBar2 = mock(Subchannel.class);
+ Helper barHelper2 = barHelpers.get(1);
+ barHelper2.updateBalancingState(READY, TestUtils.pickerOf(subchannelBar2));
+ verify(helper, atLeastOnce()).updateBalancingState(
+ connectivityStateCaptor.capture(), subchannelPickerCaptor.capture());
+ assertThat(connectivityStateCaptor.getValue()).isEqualTo(READY);
+ picker = subchannelPickerCaptor.getValue();
+ assertPickerRoutePathToSubchannel(picker, "service1", "method1", subchannelFoo1);
+ assertPickerRoutePathToError(
+ picker, "service1", "method2", Status.DATA_LOSS.withDescription("data loss baz"));
+ assertPickerRoutePathToSubchannel(picker, "service2", "method3", subchannelFoo1);
+ assertPickerRoutePathToError(
+ picker, "service1", "hello", Status.ABORTED.withDescription("abort bar"));
+ assertPickerRoutePathToSubchannel(picker, "service2", "hello", subchannelBar2);
+ }
+
+ private static PickSubchannelArgs pickSubchannelArgsForMethod(
+ final String service, final String method) {
+ return new PickSubchannelArgs() {
+
+ @Override
+ public CallOptions getCallOptions() {
+ return CallOptions.DEFAULT;
+ }
+
+ @Override
+ public Metadata getHeaders() {
+ return new Metadata();
+ }
+
+ @Override
+ public MethodDescriptor<Void, Void> getMethodDescriptor() {
+ return MethodDescriptor.<Void, Void>newBuilder()
+ .setType(MethodType.UNARY)
+ .setFullMethodName(service + "/" + method)
+ .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
+ .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
+ .build();
+ }
+ };
+ }
+
+ private static void assertPickerRoutePathToSubchannel(
+ SubchannelPicker picker, String service, String method, Subchannel expectedSubchannel) {
+ Subchannel actualSubchannel =
+ picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getSubchannel();
+ assertThat(actualSubchannel).isEqualTo(expectedSubchannel);
+ }
+
+ private static void assertPickerRoutePathToError(
+ SubchannelPicker picker, String service, String method, Status expectedStatus) {
+ Status actualStatus =
+ picker.pickSubchannel(pickSubchannelArgsForMethod(service, method)).getStatus();
+ assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
+ assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
+ }
+}