xds: restructure XdsLoadBalancer Part 3: LookasideLb
diff --git a/xds/src/main/java/io/grpc/xds/LookasideLb.java b/xds/src/main/java/io/grpc/xds/LookasideLb.java
new file mode 100644
index 0000000..78722a6
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/LookasideLb.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Attributes;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.LoadBalancerRegistry;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.util.ForwardingLoadBalancer;
+import io.grpc.util.GracefulSwitchLoadBalancer;
+import io.grpc.xds.XdsComms.AdsStreamCallback;
+import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
+import java.util.Map;
+
+/** Lookaside load balancer that handles balancer name changes. */
+final class LookasideLb extends ForwardingLoadBalancer {
+
+ private final AdsStreamCallback adsCallback;
+ private final LookasideChannelLbFactory lookasideChannelLbFactory;
+ private final GracefulSwitchLoadBalancer lookasideChannelLb;
+ private final LoadBalancerRegistry lbRegistry;
+
+ private String balancerName;
+
+ // TODO(zdapeng): Add LookasideLb(Helper helper, AdsCallback adsCallback) with default factory
+ @VisibleForTesting
+ LookasideLb(
+ Helper lookasideLbHelper,
+ AdsStreamCallback adsCallback,
+ LookasideChannelLbFactory lookasideChannelLbFactory,
+ LoadBalancerRegistry lbRegistry) {
+ this.adsCallback = adsCallback;
+ this.lookasideChannelLbFactory = lookasideChannelLbFactory;
+ this.lbRegistry = lbRegistry;
+ this.lookasideChannelLb = new GracefulSwitchLoadBalancer(lookasideLbHelper);
+ }
+
+ @Override
+ protected LoadBalancer delegate() {
+ return lookasideChannelLb;
+ }
+
+ @Override
+ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ // In the future, xdsConfig can be gotten directly by
+ // resolvedAddresses.getLoadBalancingPolicyConfig()
+ Attributes attributes = resolvedAddresses.getAttributes();
+ Map<String, ?> newRawLbConfig = checkNotNull(
+ attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available");
+ ConfigOrError cfg =
+ XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(newRawLbConfig, lbRegistry);
+ if (cfg.getError() != null) {
+ throw cfg.getError().asRuntimeException();
+ }
+ XdsConfig xdsConfig = (XdsConfig) cfg.getConfig();
+
+ String newBalancerName = xdsConfig.balancerName;
+ if (!newBalancerName.equals(balancerName)) {
+ balancerName = newBalancerName; // cache the name and check next time for optimization
+ lookasideChannelLb.switchTo(newLookasideChannelLbProvider(newBalancerName));
+ }
+ lookasideChannelLb.handleResolvedAddresses(resolvedAddresses);
+ }
+
+ private LoadBalancerProvider newLookasideChannelLbProvider(final String balancerName) {
+ return new LoadBalancerProvider() {
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 5;
+ }
+
+ /**
+ * A synthetic policy name for LookasideChannelLb identified by balancerName. The
+ * implementation detail doesn't matter.
+ */
+ @Override
+ public String getPolicyName() {
+ return "xds_child_policy_balancer_name_" + balancerName;
+ }
+
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ return lookasideChannelLbFactory.newLoadBalancer(helper, adsCallback, balancerName);
+ }
+ };
+ }
+
+ @VisibleForTesting
+ interface LookasideChannelLbFactory {
+ LoadBalancer newLoadBalancer(Helper helper, AdsStreamCallback adsCallback, String balancerName);
+ }
+}
diff --git a/xds/src/test/java/io/grpc/xds/LookasideLbTest.java b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java
new file mode 100644
index 0000000..a8a183f
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/LookasideLbTest.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.ResolvedAddresses;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancerRegistry;
+import io.grpc.internal.JsonParser;
+import io.grpc.xds.LookasideLb.LookasideChannelLbFactory;
+import io.grpc.xds.XdsComms.AdsStreamCallback;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LookasideLb}.
+ */
+@RunWith(JUnit4.class)
+public class LookasideLbTest {
+
+ private final Helper helper = mock(Helper.class);
+ private final List<Helper> helpers = new ArrayList<>();
+ private final List<LoadBalancer> balancers = new ArrayList<>();
+ private final LookasideChannelLbFactory lookasideChannelLbFactory =
+ new LookasideChannelLbFactory() {
+ @Override
+ public LoadBalancer newLoadBalancer(
+ Helper helper, AdsStreamCallback adsCallback, String balancerName) {
+ // just return a mock and record helper and balancer.
+ helpers.add(helper);
+ LoadBalancer balancer = mock(LoadBalancer.class);
+ balancers.add(balancer);
+ return balancer;
+ }
+ };
+
+ private LoadBalancer lookasideLb = new LookasideLb(
+ helper, mock(AdsStreamCallback.class), lookasideChannelLbFactory, new LoadBalancerRegistry());
+
+
+ @Test
+ public void handleChildPolicyChangeThenBalancerNameChangeThenChildPolicyChange()
+ throws Exception {
+ assertThat(helpers).isEmpty();
+ assertThat(balancers).isEmpty();
+
+ List<EquivalentAddressGroup> eags = ImmutableList.of();
+ String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}";
+ @SuppressWarnings("unchecked")
+ Map<String, ?> lbConfig11 = (Map<String, ?>) JsonParser.parse(lbConfigRaw11);
+ ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
+ .setAddresses(eags)
+ .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build())
+ .build();
+ lookasideLb.handleResolvedAddresses(resolvedAddresses);
+
+ assertThat(helpers).hasSize(1);
+ assertThat(balancers).hasSize(1);
+ Helper helper1 = helpers.get(0);
+ LoadBalancer balancer1 = balancers.get(0);
+ verify(balancer1).handleResolvedAddresses(resolvedAddresses);
+
+ SubchannelPicker picker1 = mock(SubchannelPicker.class);
+ helper1.updateBalancingState(CONNECTING, picker1);
+ verify(helper).updateBalancingState(CONNECTING, picker1);
+
+ String lbConfigRaw12 = "{"
+ + "\"balancerName\" : \"dns:///balancer1.example.com:8080\","
+ + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]"
+ + "}";
+ @SuppressWarnings("unchecked")
+ Map<String, ?> lbConfig12 = (Map<String, ?>) JsonParser.parse(lbConfigRaw12);
+ resolvedAddresses = ResolvedAddresses.newBuilder()
+ .setAddresses(eags)
+ .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig12).build())
+ .build();
+ lookasideLb.handleResolvedAddresses(resolvedAddresses);
+
+ verify(balancer1).handleResolvedAddresses(resolvedAddresses);
+
+ verify(balancer1, never()).shutdown();
+ assertThat(helpers).hasSize(1);
+ assertThat(balancers).hasSize(1);
+
+ // change balancer name policy to balancer2.example.com
+ String lbConfigRaw21 = "{\"balancerName\" : \"dns:///balancer2.example.com:8080\"}";
+ @SuppressWarnings("unchecked")
+ Map<String, ?> lbConfig21 = (Map<String, ?>) JsonParser.parse(lbConfigRaw21);
+ resolvedAddresses = ResolvedAddresses.newBuilder()
+ .setAddresses(eags)
+ .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig21).build())
+ .build();
+ lookasideLb.handleResolvedAddresses(resolvedAddresses);
+
+ verify(balancer1).shutdown();
+ assertThat(helpers).hasSize(2);
+ assertThat(balancers).hasSize(2);
+ Helper helper2 = helpers.get(1);
+ LoadBalancer balancer2 = balancers.get(1);
+ verify(balancer1, never()).handleResolvedAddresses(resolvedAddresses);
+ verify(balancer2).handleResolvedAddresses(resolvedAddresses);
+
+ picker1 = mock(SubchannelPicker.class);
+ helper1.updateBalancingState(CONNECTING, picker1);
+ verify(helper, never()).updateBalancingState(CONNECTING, picker1);
+ SubchannelPicker picker2 = mock(SubchannelPicker.class);
+ helper2.updateBalancingState(CONNECTING, picker2);
+ verify(helper).updateBalancingState(CONNECTING, picker2);
+
+ String lbConfigRaw22 = "{"
+ + "\"balancerName\" : \"dns:///balancer2.example.com:8080\","
+ + "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]"
+ + "}";
+ @SuppressWarnings("unchecked")
+ Map<String, ?> lbConfig22 = (Map<String, ?>) JsonParser.parse(lbConfigRaw22);
+ resolvedAddresses = ResolvedAddresses.newBuilder()
+ .setAddresses(eags)
+ .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig22).build())
+ .build();
+ lookasideLb.handleResolvedAddresses(resolvedAddresses);
+
+ verify(balancer2).handleResolvedAddresses(resolvedAddresses);
+
+ assertThat(helpers).hasSize(2);
+ assertThat(balancers).hasSize(2);
+
+ verify(balancer2, never()).shutdown();
+ lookasideLb.shutdown();
+ verify(balancer2).shutdown();
+ }
+}