services: client-side health checking main implementation (#5014)
Spec: https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md
This comes in the form of a wrapper LoadBalancerFactory. The public wrapping utility and the wrapping of RoundRobinLoadBalancer will come in follow-up changes.
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
index 989077e..b6ab74d 100644
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -158,11 +158,6 @@
*/
public abstract void shutdown();
- @Override
- public String toString() {
- return getClass().getSimpleName();
- }
-
/**
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
index 811ef86..fa89dc1 100644
--- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
+++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
@@ -122,7 +122,8 @@
delegate = delegateProvider.newLoadBalancer(helper);
if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
- .setDescription("Load balancer changed from " + old + " to " + delegate)
+ .setDescription("Load balancer changed from " + old.getClass().getSimpleName()
+ + " to " + delegate.getClass().getSimpleName())
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
index 5295327..f112e71 100644
--- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
+++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
@@ -62,6 +62,32 @@
private ServiceConfigUtil() {}
+ /**
+ * Fetch the health-checked service name from service config. {@code null} if can't find one.
+ */
+ @Nullable
+ public static String getHealthCheckedServiceName(@Nullable Map<String, Object> serviceConfig) {
+ String healthCheckKey = "healthCheckConfig";
+ String serviceNameKey = "serviceName";
+ if (serviceConfig == null || !serviceConfig.containsKey(healthCheckKey)) {
+ return null;
+ }
+
+ /* schema as follows
+ {
+ "healthCheckConfig": {
+ // Service name to use in the health-checking request.
+ "serviceName": string
+ }
+ }
+ */
+ Map<String, Object> healthCheck = getObject(serviceConfig, healthCheckKey);
+ if (!healthCheck.containsKey(serviceNameKey)) {
+ return null;
+ }
+ return getString(healthCheck, "serviceName");
+ }
+
@Nullable
static Throttle getThrottlePolicy(@Nullable Map<String, Object> serviceConfig) {
String retryThrottlingKey = "retryThrottling";
diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java
new file mode 100644
index 0000000..5be9926
--- /dev/null
+++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import com.google.common.base.MoreObjects;
+import io.grpc.Attributes;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer;
+import io.grpc.NameResolver;
+import io.grpc.Status;
+import java.util.List;
+
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
+public abstract class ForwardingLoadBalancer extends LoadBalancer {
+ /**
+ * Returns the underlying balancer.
+ */
+ protected abstract LoadBalancer delegate();
+
+ @Override
+ public void handleResolvedAddressGroups(
+ List<EquivalentAddressGroup> servers,
+ @NameResolver.ResolutionResultAttr Attributes attributes) {
+ delegate().handleResolvedAddressGroups(servers, attributes);
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ delegate().handleNameResolutionError(error);
+ }
+
+ @Override
+ public void handleSubchannelState(
+ Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+ delegate().handleSubchannelState(subchannel, stateInfo);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate().shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+ }
+}
diff --git a/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java
new file mode 100644
index 0000000..be6c7f7
--- /dev/null
+++ b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static org.mockito.Mockito.mock;
+
+import io.grpc.ForwardingTestUtil;
+import io.grpc.LoadBalancer;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ForwardingLoadBalancer}. */
+@RunWith(JUnit4.class)
+public class ForwardingLoadBalancerTest {
+ private final LoadBalancer mockDelegate = mock(LoadBalancer.class);
+
+ private final class TestBalancer extends ForwardingLoadBalancer {
+ @Override
+ protected LoadBalancer delegate() {
+ return mockDelegate;
+ }
+ }
+
+ @Test
+ public void allMethodsForwarded() throws Exception {
+ final SocketAddress mockAddr = mock(SocketAddress.class);
+ ForwardingTestUtil.testMethodsForwarded(
+ LoadBalancer.class,
+ mockDelegate,
+ new TestBalancer(),
+ Collections.<Method>emptyList());
+ }
+}
diff --git a/services/build.gradle b/services/build.gradle
index 1679de9..a22b9fd 100644
--- a/services/build.gradle
+++ b/services/build.gradle
@@ -28,7 +28,8 @@
compileOnly libraries.javax_annotation
testCompile project(':grpc-testing'),
- libraries.netty_epoll // for DomainSocketAddress
+ libraries.netty_epoll, // for DomainSocketAddress
+ project(':grpc-core').sourceSets.test.output // for FakeClock
testCompileOnly libraries.javax_annotation
signature "org.codehaus.mojo.signature:java17:1.0@signature"
}
diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
new file mode 100644
index 0000000..e5f15d0
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
@@ -0,0 +1,434 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Factory;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.SynchronizationContext.ScheduledHandle;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.GrpcAttributes;
+import io.grpc.internal.ServiceConfigUtil;
+import io.grpc.internal.TimeProvider;
+import io.grpc.util.ForwardingLoadBalancer;
+import io.grpc.util.ForwardingLoadBalancerHelper;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Wraps a {@link LoadBalancer} and implements the client-side health-checking
+ * (https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md). The
+ * Subchannel received by the states wrapped LoadBalancer will be determined by health-checking.
+ *
+ * <p>Note the original LoadBalancer must call {@code Helper.createSubchannel()} from the
+ * SynchronizationContext, or it will throw.
+ */
+final class HealthCheckingLoadBalancerFactory extends Factory {
+ private static final Attributes.Key<HealthCheckState> KEY_HEALTH_CHECK_STATE =
+ Attributes.Key.create("io.grpc.services.HealthCheckingLoadBalancerFactory.healthCheckState");
+
+ private final Factory delegateFactory;
+ private final BackoffPolicy.Provider backoffPolicyProvider;
+ private final TimeProvider time;
+
+ public HealthCheckingLoadBalancerFactory(
+ Factory delegateFactory, BackoffPolicy.Provider backoffPolicyProvider, TimeProvider time) {
+ this.delegateFactory = checkNotNull(delegateFactory, "delegateFactory");
+ this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
+ this.time = checkNotNull(time, "time");
+ }
+
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ HelperImpl wrappedHelper = new HelperImpl(helper);
+ LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper);
+ wrappedHelper.init(delegateBalancer);
+ return new LoadBalancerImpl(wrappedHelper, delegateBalancer);
+ }
+
+ private final class HelperImpl extends ForwardingLoadBalancerHelper {
+ private final Helper delegate;
+ private final SynchronizationContext syncContext;
+
+ private LoadBalancer delegateBalancer;
+ @Nullable String healthCheckedService;
+
+ final HashSet<HealthCheckState> hcStates = new HashSet<HealthCheckState>();
+
+ HelperImpl(Helper delegate) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
+ }
+
+ void init(LoadBalancer delegateBalancer) {
+ checkState(this.delegateBalancer == null, "init() already called");
+ this.delegateBalancer = checkNotNull(delegateBalancer, "delegateBalancer");
+ }
+
+ @Override
+ protected Helper delegate() {
+ return delegate;
+ }
+
+ @Override
+ public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+ // HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
+ // createSubchannel() from the SynchronizationContext.
+ syncContext.throwIfNotInThisSynchronizationContext();
+ HealthCheckState hcState = new HealthCheckState(
+ delegateBalancer, syncContext, delegate.getScheduledExecutorService());
+ hcStates.add(hcState);
+ Subchannel subchannel = super.createSubchannel(
+ addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build());
+ hcState.init(subchannel);
+ if (healthCheckedService != null) {
+ hcState.setServiceName(healthCheckedService);
+ }
+ return subchannel;
+ }
+
+ void setHealthCheckedService(@Nullable String service) {
+ healthCheckedService = service;
+ for (HealthCheckState hcState : hcStates) {
+ hcState.setServiceName(service);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+ }
+ }
+
+ private static final class LoadBalancerImpl extends ForwardingLoadBalancer {
+ final LoadBalancer delegate;
+ final HelperImpl helper;
+ final SynchronizationContext syncContext;
+ final ScheduledExecutorService timerService;
+
+ LoadBalancerImpl(HelperImpl helper, LoadBalancer delegate) {
+ this.helper = checkNotNull(helper, "helper");
+ this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
+ this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
+ this.delegate = checkNotNull(delegate, "delegate");
+ }
+
+ @Override
+ protected LoadBalancer delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void handleResolvedAddressGroups(
+ List<EquivalentAddressGroup> servers, Attributes attributes) {
+ Map<String, Object> serviceConfig =
+ attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
+ String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig);
+ helper.setHealthCheckedService(serviceName);
+ super.handleResolvedAddressGroups(servers, attributes);
+ }
+
+ @Override
+ public void handleSubchannelState(
+ Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+ HealthCheckState hcState =
+ checkNotNull(subchannel.getAttributes().get(KEY_HEALTH_CHECK_STATE), "hcState");
+ hcState.updateRawState(stateInfo);
+
+ if (Objects.equal(stateInfo.getState(), SHUTDOWN)) {
+ helper.hcStates.remove(hcState);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+ }
+ }
+
+
+ // All methods are run from syncContext
+ private final class HealthCheckState {
+ private final Runnable retryTask = new Runnable() {
+ @Override
+ public void run() {
+ startRpc();
+ }
+ };
+
+ private final LoadBalancer delegate;
+ private final SynchronizationContext syncContext;
+ private final ScheduledExecutorService timerService;
+
+ private Subchannel subchannel;
+
+ // Set when RPC started. Cleared when the RPC has closed or abandoned.
+ @Nullable
+ private HcStream activeRpc;
+
+ // The service name that should be used for health checking
+ private String serviceName;
+ private BackoffPolicy backoffPolicy;
+ // The state from the underlying Subchannel
+ private ConnectivityStateInfo rawState = ConnectivityStateInfo.forNonError(IDLE);
+ // The state concluded from health checking
+ private ConnectivityStateInfo concludedState = ConnectivityStateInfo.forNonError(IDLE);
+ // true if a health check stream should be kept. When true, either there is an active RPC, or a
+ // retry is pending.
+ private boolean running;
+ // true if server returned UNIMPLEMENTED
+ private boolean disabled;
+ private ScheduledHandle retryTimer;
+
+ HealthCheckState(
+ LoadBalancer delegate, SynchronizationContext syncContext,
+ ScheduledExecutorService timerService) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.syncContext = checkNotNull(syncContext, "syncContext");
+ this.timerService = checkNotNull(timerService, "timerService");
+ }
+
+ void init(Subchannel subchannel) {
+ checkState(this.subchannel == null, "init() already called");
+ this.subchannel = checkNotNull(subchannel, "subchannel");
+ }
+
+ void setServiceName(@Nullable String newServiceName) {
+ if (Objects.equal(newServiceName, serviceName)) {
+ return;
+ }
+ serviceName = newServiceName;
+ // If service name has changed while there is active RPC, cancel it so that
+ // a new call will be made with the new name.
+ String cancelMsg =
+ serviceName == null ? "Health check disabled by service config"
+ : "Switching to new service name: " + newServiceName;
+ stopRpc(cancelMsg);
+ adjustHealthCheck();
+ }
+
+ void updateRawState(ConnectivityStateInfo rawState) {
+ if (Objects.equal(this.rawState.getState(), READY)
+ && !Objects.equal(rawState.getState(), READY)) {
+ // A connection was lost. We will reset disabled flag because health check
+ // may be available on the new connection.
+ disabled = false;
+ // TODO(zhangkun83): record this to channel tracer
+ }
+ this.rawState = rawState;
+ adjustHealthCheck();
+ }
+
+ private boolean isRetryTimerPending() {
+ return retryTimer != null && retryTimer.isPending();
+ }
+
+ // Start or stop health check according to the current states.
+ private void adjustHealthCheck() {
+ if (!disabled && serviceName != null && Objects.equal(rawState.getState(), READY)) {
+ running = true;
+ if (activeRpc == null && !isRetryTimerPending()) {
+ startRpc();
+ }
+ } else {
+ running = false;
+ // Prerequisites for health checking not met.
+ // Make sure it's stopped.
+ stopRpc("Client stops health check");
+ backoffPolicy = null;
+ gotoState(rawState);
+ }
+ }
+
+ private void startRpc() {
+ checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
+ checkState(subchannel != null, "init() not called");
+ // Optimization suggested by @markroth: if we are already READY and starting the health
+ // checking RPC, either because health check is just enabled or has switched to a new service
+ // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
+ // waiting for the health check to respond.
+ if (!Objects.equal(concludedState.getState(), READY)) {
+ gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
+ }
+ activeRpc = new HcStream();
+ activeRpc.start();
+ }
+
+ private void stopRpc(String msg) {
+ if (activeRpc != null) {
+ activeRpc.cancel(msg);
+ // Abandon this RPC. We are not interested in anything from this RPC any more.
+ activeRpc = null;
+ }
+ if (retryTimer != null) {
+ retryTimer.cancel();
+ retryTimer = null;
+ }
+ }
+
+ private void gotoState(ConnectivityStateInfo newState) {
+ checkState(subchannel != null, "init() not called");
+ if (!Objects.equal(concludedState, newState)) {
+ concludedState = newState;
+ delegate.handleSubchannelState(subchannel, concludedState);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("running", running)
+ .add("disabled", disabled)
+ .add("activeRpc", activeRpc)
+ .add("serviceName", serviceName)
+ .add("rawState", rawState)
+ .add("concludedState", concludedState)
+ .toString();
+ }
+
+ private class HcStream extends ClientCall.Listener<HealthCheckResponse> {
+ private final ClientCall<HealthCheckRequest, HealthCheckResponse> call;
+ private final String callServiceName;
+ private final long callCreationNanos;
+ private boolean callHasResponded;
+
+ HcStream() {
+ callCreationNanos = time.currentTimeNanos();
+ callServiceName = serviceName;
+ call = subchannel.asChannel().newCall(HealthGrpc.getWatchMethod(), CallOptions.DEFAULT);
+ }
+
+ void start() {
+ call.start(this, new Metadata());
+ call.sendMessage(HealthCheckRequest.newBuilder().setService(serviceName).build());
+ call.halfClose();
+ call.request(1);
+ }
+
+ void cancel(String msg) {
+ call.cancel(msg, null);
+ }
+
+ @Override
+ public void onMessage(final HealthCheckResponse response) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (activeRpc == HcStream.this) {
+ handleResponse(response);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onClose(final Status status, Metadata trailers) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (activeRpc == HcStream.this) {
+ activeRpc = null;
+ handleStreamClosed(status);
+ }
+ }
+ });
+ }
+
+ void handleResponse(HealthCheckResponse response) {
+ callHasResponded = true;
+ backoffPolicy = null;
+ // running == true means the Subchannel's state (rawState) is READY
+ if (Objects.equal(response.getStatus(), ServingStatus.SERVING)) {
+ gotoState(ConnectivityStateInfo.forNonError(READY));
+ } else {
+ gotoState(
+ ConnectivityStateInfo.forTransientFailure(
+ Status.UNAVAILABLE.withDescription(
+ "Health-check service responded "
+ + response.getStatus() + " for '" + callServiceName + "'")));
+ }
+ call.request(1);
+ }
+
+ void handleStreamClosed(Status status) {
+ if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
+ // TODO(zhangkun83): record this to channel tracer
+ disabled = true;
+ gotoState(rawState);
+ return;
+ }
+ long delayNanos = 0;
+ gotoState(
+ ConnectivityStateInfo.forTransientFailure(
+ Status.UNAVAILABLE.withDescription(
+ "Health-check stream unexpectedly closed with "
+ + status + " for '" + callServiceName + "'")));
+ // Use backoff only when server has not responded for the previous call
+ if (!callHasResponded) {
+ if (backoffPolicy == null) {
+ backoffPolicy = backoffPolicyProvider.get();
+ }
+ delayNanos =
+ callCreationNanos + backoffPolicy.nextBackoffNanos() - time.currentTimeNanos();
+ }
+ if (delayNanos <= 0) {
+ startRpc();
+ } else {
+ checkState(!isRetryTimerPending(), "Retry double scheduled");
+ retryTimer = syncContext.schedule(
+ retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("callStarted", call != null)
+ .add("serviceName", callServiceName)
+ .add("hasResponded", callHasResponded)
+ .toString();
+ }
+ }
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
new file mode 100644
index 0000000..68cf840
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
@@ -0,0 +1,1110 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+import static org.junit.Assert.fail;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Attributes;
+import io.grpc.Channel;
+import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.Context;
+import io.grpc.Context.CancellationListener;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Factory;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.ManagedChannel;
+import io.grpc.NameResolver;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.GrpcAttributes;
+import io.grpc.internal.ServiceConfigUtil;
+import io.grpc.stub.StreamObserver;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link HealthCheckingLoadBalancerFactory}. */
+@RunWith(JUnit4.class)
+public class HealthCheckingLoadBalancerFactoryTest {
+ private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
+ Attributes.Key.create("subchannel-attr-for-test");
+
+ // We use in-process channels for Subchannel.asChannel(), so that we make sure we are making RPCs
+ // correctly. Mocking Channel and ClientCall is a bad idea because it can easily be done wrong.
+ // Each Channel goes to a different server, so that we can verify the health check activity on
+ // each Subchannel.
+ private static final int NUM_SUBCHANNELS = 2;
+ private final EquivalentAddressGroup[] eags = new EquivalentAddressGroup[NUM_SUBCHANNELS];
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS];
+ private List<EquivalentAddressGroup> resolvedAddressList;
+ private final Subchannel[] subchannels = new Subchannel[NUM_SUBCHANNELS];
+ private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS];
+ private final Server[] servers = new Server[NUM_SUBCHANNELS];
+ private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS];
+
+ private final SynchronizationContext syncContext = new SynchronizationContext(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new AssertionError(e);
+ }
+ });
+ private final FakeClock clock = new FakeClock();
+ private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper()));
+ // The helper seen by the origLb
+ private Helper wrappedHelper;
+ private final Factory origLbFactory =
+ mock(Factory.class, delegatesTo(new Factory() {
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ checkState(wrappedHelper == null, "LoadBalancer already created");
+ wrappedHelper = helper;
+ return origLb;
+ }
+ }));
+
+ @Mock
+ private LoadBalancer origLb;
+ @Captor
+ ArgumentCaptor<Attributes> attrsCaptor;
+ @Mock
+ private BackoffPolicy.Provider backoffPolicyProvider;
+ @Mock
+ private BackoffPolicy backoffPolicy1;
+ @Mock
+ private BackoffPolicy backoffPolicy2;
+
+ private HealthCheckingLoadBalancerFactory hcLbFactory;
+ private LoadBalancer hcLbEventDelivery;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+ HealthImpl healthImpl = new HealthImpl();
+ healthImpls[i] = healthImpl;
+ Server server =
+ InProcessServerBuilder.forName("health-check-test-" + i)
+ .addService(healthImpl).directExecutor().build().start();
+ servers[i] = server;
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName("health-check-test-" + i).directExecutor().build();
+ channels[i] = channel;
+
+ EquivalentAddressGroup eag =
+ new EquivalentAddressGroup(new FakeSocketAddress("address-" + i));
+ eags[i] = eag;
+ List<EquivalentAddressGroup> eagList = Arrays.asList(eag);
+ eagLists[i] = eagList;
+ }
+ resolvedAddressList = Arrays.asList(eags);
+
+ when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
+ when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L, 31L);
+ when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L, 32L);
+
+ hcLbFactory = new HealthCheckingLoadBalancerFactory(
+ origLbFactory, backoffPolicyProvider, clock.getTimeProvider());
+ final LoadBalancer hcLb = hcLbFactory.newLoadBalancer(origHelper);
+ // Make sure all calls into the hcLb is from the syncContext
+ hcLbEventDelivery = new LoadBalancer() {
+ @Override
+ public void handleResolvedAddressGroups(
+ final List<EquivalentAddressGroup> servers, final Attributes attributes) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ hcLb.handleResolvedAddressGroups(servers, attributes);
+ }
+ });
+ }
+
+ @Override
+ public void handleSubchannelState(
+ final Subchannel subchannel, final ConnectivityStateInfo stateInfo) {
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ hcLb.handleSubchannelState(subchannel, stateInfo);
+ }
+ });
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ throw new AssertionError("Not supposed to be called");
+ }
+
+ @Override
+ public void shutdown() {
+ throw new AssertionError("Not supposed to be called");
+ }
+ };
+ verify(origLbFactory).newLoadBalancer(any(Helper.class));
+ }
+
+ @After
+ public void teardown() throws Exception {
+ // All scheduled tasks have been accounted for
+ assertThat(clock.getPendingTasks()).isEmpty();
+ // Health-check streams are usually not closed in the tests because handleSubchannelState() is
+ // faked. Force closing for clean up.
+ for (Server server : servers) {
+ server.shutdownNow();
+ assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
+ }
+ for (ManagedChannel channel : channels) {
+ channel.shutdownNow();
+ assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
+ }
+ for (HealthImpl impl : healthImpls) {
+ assertThat(impl.checkCalled).isFalse();
+ }
+ }
+
+ @Test
+ public void createSubchannelThrowsIfCalledOutsideSynchronizationContext() {
+ try {
+ wrappedHelper.createSubchannel(eagLists[0], Attributes.EMPTY);
+ fail("Should throw");
+ } catch (IllegalStateException e) {
+ assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
+ }
+ }
+
+ @Test
+ public void typicalWorkflow() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verify(origHelper, atLeast(0)).getSynchronizationContext();
+ verify(origHelper, atLeast(0)).getScheduledExecutorService();
+ verifyNoMoreInteractions(origHelper);
+ verifyNoMoreInteractions(origLb);
+
+ // Simulate that the orignal LB creates Subchannels
+ for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+ // Subchannel attributes set by origLb are correctly plumbed in
+ String subchannelAttrValue = "eag attr " + i;
+ Attributes attrs = Attributes.newBuilder()
+ .set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
+ // We don't wrap Subchannels, thus origLb gets the original Subchannels.
+ assertThat(createSubchannel(i, attrs)).isSameAs(subchannels[i]);
+ verify(origHelper).createSubchannel(same(eagLists[i]), attrsCaptor.capture());
+ assertThat(attrsCaptor.getValue().get(SUBCHANNEL_ATTR_KEY)).isEqualTo(subchannelAttrValue);
+ }
+
+ for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) {
+ // Not starting health check until underlying Subchannel is READY
+ Subchannel subchannel = subchannels[i];
+ HealthImpl healthImpl = healthImpls[i];
+ InOrder inOrder = inOrder(origLb);
+ hcLbEventDelivery.handleSubchannelState(
+ subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
+ hcLbEventDelivery.handleSubchannelState(
+ subchannel, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
+ hcLbEventDelivery.handleSubchannelState(
+ subchannel, ConnectivityStateInfo.forNonError(IDLE));
+
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(IDLE)));
+ verifyNoMoreInteractions(origLb);
+
+ assertThat(healthImpl.calls).isEmpty();
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ assertThat(healthImpl.calls).hasSize(1);
+ ServerSideCall serverCall = healthImpl.calls.peek();
+ assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+ // Starting the health check will make the Subchannel appear CONNECTING to the origLb.
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ verifyNoMoreInteractions(origLb);
+
+ // Simulate a series of responses.
+ for (ServingStatus servingStatus :
+ new ServingStatus[] {
+ ServingStatus.UNKNOWN, ServingStatus.NOT_SERVING, ServingStatus.SERVICE_UNKNOWN,
+ ServingStatus.SERVING, ServingStatus.NOT_SERVING, ServingStatus.SERVING}) {
+ serverCall.responseObserver.onNext(makeResponse(servingStatus));
+ // SERVING is mapped to READY, while other statuses are mapped to TRANSIENT_FAILURE
+ if (servingStatus == ServingStatus.SERVING) {
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+ } else {
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),unavailableStateWithMsg(
+ "Health-check service responded " + servingStatus + " for 'FooService'"));
+ }
+ verifyNoMoreInteractions(origLb);
+ }
+ }
+
+ // origLb shuts down Subchannels
+ for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+ Subchannel subchannel = subchannels[i];
+
+ ServerSideCall serverCall = healthImpls[i].calls.peek();
+ assertThat(serverCall.cancelled).isFalse();
+ verifyNoMoreInteractions(origLb);
+
+ // Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the
+ // health check RPC
+ subchannel.shutdown();
+ assertThat(serverCall.cancelled).isTrue();
+ verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
+ }
+
+ for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+ assertThat(healthImpls[i].calls).hasSize(1);
+ }
+
+ verifyZeroInteractions(backoffPolicyProvider);
+ }
+
+ @Test
+ public void healthCheckDisabledWhenServiceNotImplemented() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("BarService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ // We create 2 Subchannels. One of them connects to a server that doesn't implement health check
+ for (int i = 0; i < 2; i++) {
+ createSubchannel(i, Attributes.EMPTY);
+ }
+
+ InOrder inOrder = inOrder(origLb);
+
+ for (int i = 0; i < 2; i++) {
+ hcLbEventDelivery.handleSubchannelState(
+ subchannels[i], ConnectivityStateInfo.forNonError(READY));
+ assertThat(healthImpls[i].calls).hasSize(1);
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[i]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ }
+
+ ServerSideCall serverCall0 = healthImpls[0].calls.poll();
+ ServerSideCall serverCall1 = healthImpls[1].calls.poll();
+
+ // subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health
+ // checking and it'll use the original state, which is currently READY.
+ // In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell
+ // whether it's the server library or the service implementation that returned this status.
+ serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException());
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ // subchannels[1] has normal health checking
+ serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[1]),
+ unavailableStateWithMsg("Health-check service responded NOT_SERVING for 'BarService'"));
+
+ // Without health checking, states from underlying Subchannel are delivered directly to origLb
+ hcLbEventDelivery.handleSubchannelState(
+ subchannels[0], ConnectivityStateInfo.forNonError(IDLE));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(IDLE)));
+
+ // Re-connecting on a Subchannel will reset the "disabled" flag.
+ assertThat(healthImpls[0].calls).hasSize(0);
+ hcLbEventDelivery.handleSubchannelState(
+ subchannels[0], ConnectivityStateInfo.forNonError(READY));
+ assertThat(healthImpls[0].calls).hasSize(1);
+ serverCall0 = healthImpls[0].calls.poll();
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+ // Health check now works as normal
+ serverCall0.responseObserver.onNext(makeResponse(ServingStatus.SERVICE_UNKNOWN));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannels[0]),
+ unavailableStateWithMsg("Health-check service responded SERVICE_UNKNOWN for 'BarService'"));
+
+ verifyNoMoreInteractions(origLb);
+ verifyZeroInteractions(backoffPolicyProvider);
+ }
+
+ @Test
+ public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // Server closes the health checking RPC without any response
+ healthImpl.calls.poll().responseObserver.onCompleted();
+
+ // which results in TRANSIENT_FAILURE
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+ // Retry with backoff is scheduled
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(clock.getPendingTasks()).hasSize(1);
+
+ verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // Server closes the health checking RPC without any response
+ healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
+
+ // which also results in TRANSIENT_FAILURE, with a different description
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with "
+ + Status.CANCELLED + " for 'TeeService'"));
+
+ // Retry with backoff
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+
+ verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 21);
+
+ // Server responds this time
+ healthImpl.calls.poll().responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ verifyNoMoreInteractions(origLb, backoffPolicyProvider, backoffPolicy1);
+ }
+
+ @Test
+ public void serverRespondResetsBackoff() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // Server closes the health checking RPC without any response
+ healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
+
+ // which results in TRANSIENT_FAILURE
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with "
+ + Status.CANCELLED + " for 'TeeService'"));
+
+ // Retry with backoff is scheduled
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertThat(clock.getPendingTasks()).hasSize(1);
+
+ verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // Server responds
+ healthImpl.calls.peek().responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ verifyNoMoreInteractions(origLb);
+
+ // then closes the stream
+ healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with "
+ + Status.UNAVAILABLE + " for 'TeeService'"));
+
+ // Because server has responded, the first retry is not subject to backoff.
+ // But the backoff policy has been reset. A new backoff policy will be used for
+ // the next backed-off retry.
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ assertThat(healthImpl.calls).hasSize(1);
+ assertThat(clock.getPendingTasks()).isEmpty();
+ inOrder.verifyNoMoreInteractions();
+
+ // then closes the stream for this retry
+ healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with "
+ + Status.UNAVAILABLE + " for 'TeeService'"));
+
+ // New backoff policy is used
+ inOrder.verify(backoffPolicyProvider).get();
+ // Retry with a new backoff policy
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+
+ verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 12);
+ }
+
+ private void verifyRetryAfterNanos(
+ InOrder inOrder, Subchannel subchannel, HealthImpl impl, long nanos) {
+ assertThat(impl.calls).isEmpty();
+ clock.forwardNanos(nanos - 1);
+ assertThat(impl.calls).isEmpty();
+ inOrder.verifyNoMoreInteractions();
+ verifyNoMoreInteractions(origLb);
+ clock.forwardNanos(1);
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ assertThat(impl.calls).hasSize(1);
+ }
+
+ @Test
+ public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
+ // No service config, thus no health check.
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(Attributes.EMPTY));
+ verifyNoMoreInteractions(origLb);
+
+ // First, create Subchannels 0
+ createSubchannel(0, Attributes.EMPTY);
+
+ // No health check activity. Underlying Subchannel states are directly propagated
+ hcLbEventDelivery.handleSubchannelState(
+ subchannels[0], ConnectivityStateInfo.forNonError(READY));
+ assertThat(healthImpls[0].calls).isEmpty();
+ verify(origLb).handleSubchannelState(
+ same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ verifyNoMoreInteractions(origLb);
+
+ // Service config enables health check
+ Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+ verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+
+ // Health check started on existing Subchannel
+ assertThat(healthImpls[0].calls).hasSize(1);
+
+ // State stays in READY, instead of switching to CONNECTING.
+ verifyNoMoreInteractions(origLb);
+
+ // Start Subchannel 1, which will have health check
+ createSubchannel(1, Attributes.EMPTY);
+ assertThat(healthImpls[1].calls).isEmpty();
+ hcLbEventDelivery.handleSubchannelState(
+ subchannels[1], ConnectivityStateInfo.forNonError(READY));
+ assertThat(healthImpls[1].calls).hasSize(1);
+ }
+
+ @Test
+ public void serviceConfigDisablesHealthCheckWhenRpcActive() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ inOrder.verifyNoMoreInteractions();
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+ ServerSideCall serverCall = healthImpl.calls.poll();
+ assertThat(serverCall.cancelled).isFalse();
+
+ // NameResolver gives an update without service config, thus health check will be disabled
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+ // Health check RPC cancelled.
+ assertThat(serverCall.cancelled).isTrue();
+ // Subchannel uses original state
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(Attributes.EMPTY));
+
+ verifyNoMoreInteractions(origLb);
+ assertThat(healthImpl.calls).isEmpty();
+ }
+
+ @Test
+ public void serviceConfigDisablesHealthCheckWhenRetryPending() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+ inOrder.verifyNoMoreInteractions();
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+
+ // Server closes the stream without responding. Client in retry backoff
+ assertThat(clock.getPendingTasks()).isEmpty();
+ healthImpl.calls.poll().responseObserver.onCompleted();
+ assertThat(clock.getPendingTasks()).hasSize(1);
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+ // NameResolver gives an update without service config, thus health check will be disabled
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+ // Retry timer is cancelled
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // No retry was attempted
+ assertThat(healthImpl.calls).isEmpty();
+
+ // Subchannel uses original state
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(Attributes.EMPTY));
+
+ verifyNoMoreInteractions(origLb);
+ }
+
+ @Test
+ public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+
+ // Underlying subchannel is not READY initially
+ ConnectivityStateInfo underlyingErrorState =
+ ConnectivityStateInfo.forTransientFailure(
+ Status.UNAVAILABLE.withDescription("connection refused"));
+ hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState);
+ inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState));
+ inOrder.verifyNoMoreInteractions();
+
+ // NameResolver gives an update without service config, thus health check will be disabled
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(Attributes.EMPTY));
+
+ // Underlying subchannel is now ready
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+
+ // Since health check is disabled, READY state is propagated directly.
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ // and there is no health check activity.
+ assertThat(healthImpls[0].calls).isEmpty();
+
+ verifyNoMoreInteractions(origLb);
+ }
+
+ @Test
+ public void serviceConfigChangesServiceNameWhenRpcActive() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+ ServerSideCall serverCall = healthImpl.calls.poll();
+ assertThat(serverCall.cancelled).isFalse();
+ assertThat(serverCall.request).isEqualTo(makeRequest("TeeService"));
+
+ // Health check responded
+ serverCall.responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+ // Service config returns with the same health check name.
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+ // It's delivered to origLb, but nothing else happens
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ // Service config returns a different health check name.
+ resolutionAttrs = attrsWithHealthCheckService("FooService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+
+ // Current health check RPC cancelled.
+ assertThat(serverCall.cancelled).isTrue();
+
+ // A second RPC is started immediately
+ assertThat(healthImpl.calls).hasSize(1);
+ serverCall = healthImpl.calls.poll();
+ // with the new service name
+ assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+ // State stays in READY, instead of switching to CONNECTING.
+ verifyNoMoreInteractions(origLb);
+ }
+
+ @Test
+ public void serviceConfigChangesServiceNameWhenRetryPending() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+ HealthImpl healthImpl = healthImpls[0];
+ assertThat(healthImpl.calls).hasSize(1);
+ ServerSideCall serverCall = healthImpl.calls.poll();
+ assertThat(serverCall.cancelled).isFalse();
+ assertThat(serverCall.request).isEqualTo(makeRequest("TeeService"));
+
+ // Health check stream closed without responding. Client in retry backoff.
+ assertThat(clock.getPendingTasks()).isEmpty();
+ serverCall.responseObserver.onCompleted();
+ assertThat(clock.getPendingTasks()).hasSize(1);
+ assertThat(healthImpl.calls).isEmpty();
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel),
+ unavailableStateWithMsg(
+ "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+ // Service config returns with the same health check name.
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+ // It's delivered to origLb, but nothing else happens
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+ assertThat(clock.getPendingTasks()).hasSize(1);
+ assertThat(healthImpl.calls).isEmpty();
+
+ // Service config returns a different health check name.
+ resolutionAttrs = attrsWithHealthCheckService("FooService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ // Concluded CONNECTING state
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+
+ // Current retry timer cancelled
+ assertThat(clock.getPendingTasks()).isEmpty();
+
+ // A second RPC is started immediately
+ assertThat(healthImpl.calls).hasSize(1);
+ serverCall = healthImpl.calls.poll();
+ // with the new service name
+ assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+ verifyNoMoreInteractions(origLb);
+ }
+
+ @Test
+ public void serviceConfigChangesServiceNameWhenRpcInactive() {
+ Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+ verifyNoMoreInteractions(origLb);
+
+ Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+ assertThat(subchannel).isSameAs(subchannels[0]);
+ InOrder inOrder = inOrder(origLb);
+ HealthImpl healthImpl = healthImpls[0];
+
+ // Underlying subchannel is not READY initially
+ ConnectivityStateInfo underlyingErrorState =
+ ConnectivityStateInfo.forTransientFailure(
+ Status.UNAVAILABLE.withDescription("connection refused"));
+ hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState);
+ inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState));
+ inOrder.verifyNoMoreInteractions();
+
+ // Service config returns with the same health check name.
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+ // It's delivered to origLb, but nothing else happens
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+ assertThat(healthImpl.calls).isEmpty();
+ verifyNoMoreInteractions(origLb);
+
+ // Service config returns a different health check name.
+ resolutionAttrs = attrsWithHealthCheckService("FooService");
+ hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+ inOrder.verify(origLb).handleResolvedAddressGroups(
+ same(resolvedAddressList), same(resolutionAttrs));
+
+ // Underlying subchannel is now ready
+ hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+
+ // Concluded CONNECTING state
+ inOrder.verify(origLb).handleSubchannelState(
+ same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+ // Health check RPC is started
+ assertThat(healthImpl.calls).hasSize(1);
+ // with the new service name
+ assertThat(healthImpl.calls.poll().request).isEqualTo(makeRequest("FooService"));
+
+ verifyNoMoreInteractions(origLb);
+ }
+
+ @Test
+ public void getHealthCheckedServiceName_nullServiceConfig() {
+ assertThat(ServiceConfigUtil.getHealthCheckedServiceName(null)).isNull();
+ }
+
+ @Test
+ public void getHealthCheckedServiceName_noHealthCheckConfig() {
+ assertThat(ServiceConfigUtil.getHealthCheckedServiceName(new HashMap<String, Object>()))
+ .isNull();
+ }
+
+ @Test
+ public void getHealthCheckedServiceName_healthCheckConfigMissingServiceName() {
+ HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+ HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+ serviceConfig.put("healthCheckConfig", hcConfig);
+ assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig)).isNull();
+ }
+
+ @Test
+ public void getHealthCheckedServiceName_healthCheckConfigHasServiceName() {
+ HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+ HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+ hcConfig.put("serviceName", "FooService");
+ serviceConfig.put("healthCheckConfig", hcConfig);
+ assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig))
+ .isEqualTo("FooService");
+ }
+
+ private Attributes attrsWithHealthCheckService(@Nullable String serviceName) {
+ HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+ HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+ hcConfig.put("serviceName", serviceName);
+ serviceConfig.put("healthCheckConfig", hcConfig);
+ return Attributes.newBuilder()
+ .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
+ }
+
+ private HealthCheckRequest makeRequest(String service) {
+ return HealthCheckRequest.newBuilder().setService(service).build();
+ }
+
+ private HealthCheckResponse makeResponse(ServingStatus status) {
+ return HealthCheckResponse.newBuilder().setStatus(status).build();
+ }
+
+ private ConnectivityStateInfo unavailableStateWithMsg(final String expectedMsg) {
+ return argThat(new org.hamcrest.BaseMatcher<ConnectivityStateInfo>() {
+ @Override
+ public boolean matches(Object item) {
+ if (!(item instanceof ConnectivityStateInfo)) {
+ return false;
+ }
+ ConnectivityStateInfo info = (ConnectivityStateInfo) item;
+ if (!info.getState().equals(TRANSIENT_FAILURE)) {
+ return false;
+ }
+ Status error = info.getStatus();
+ if (!error.getCode().equals(Code.UNAVAILABLE)) {
+ return false;
+ }
+ if (!error.getDescription().equals(expectedMsg)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(org.hamcrest.Description desc) {
+ desc.appendText("Matches unavailable state with msg='" + expectedMsg + "'");
+ }
+ });
+ }
+
+ private static class HealthImpl extends HealthGrpc.HealthImplBase {
+ boolean isImplemented = true;
+ boolean checkCalled;
+ final LinkedList<ServerSideCall> calls = new LinkedList<ServerSideCall>();
+
+ @Override
+ public void check(HealthCheckRequest request,
+ StreamObserver<HealthCheckResponse> responseObserver) {
+ responseObserver.onError(new UnsupportedOperationException("Should never be called"));
+ checkCalled = true;
+ }
+
+ @Override
+ public void watch(HealthCheckRequest request,
+ StreamObserver<HealthCheckResponse> responseObserver) {
+ final ServerSideCall call = new ServerSideCall(request, responseObserver);
+ Context.current().addListener(
+ new CancellationListener() {
+ @Override
+ public void cancelled(Context ctx) {
+ call.cancelled = true;
+ }
+ }, MoreExecutors.directExecutor());
+ calls.add(call);
+ }
+ }
+
+ private static class ServerSideCall {
+ final HealthCheckRequest request;
+ final StreamObserver<HealthCheckResponse> responseObserver;
+ boolean cancelled;
+
+ ServerSideCall(
+ HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
+ this.request = request;
+ this.responseObserver = responseObserver;
+ }
+ }
+
+ private class FakeSubchannel extends Subchannel {
+ final List<EquivalentAddressGroup> eagList;
+ final Attributes attrs;
+ final Channel channel;
+
+ FakeSubchannel(List<EquivalentAddressGroup> eagList, Attributes attrs, Channel channel) {
+ this.eagList = Collections.unmodifiableList(eagList);
+ this.attrs = checkNotNull(attrs);
+ this.channel = checkNotNull(channel);
+ }
+
+ @Override
+ public void shutdown() {
+ hcLbEventDelivery.handleSubchannelState(this, ConnectivityStateInfo.forNonError(SHUTDOWN));
+ }
+
+ @Override
+ public void requestConnection() {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public List<EquivalentAddressGroup> getAllAddresses() {
+ return eagList;
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return attrs;
+ }
+
+ @Override
+ public Channel asChannel() {
+ return channel;
+ }
+ }
+
+ private class FakeHelper extends Helper {
+ @Override
+ public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+ int index = -1;
+ for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+ if (eagLists[i] == addrs) {
+ index = i;
+ break;
+ }
+ }
+ checkState(index >= 0, "addrs " + addrs + " not found");
+ Subchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
+ checkState(subchannels[index] == null, "subchannels[" + index + "] already created");
+ subchannels[index] = subchannel;
+ return subchannel;
+ }
+
+ @Override
+ public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public SynchronizationContext getSynchronizationContext() {
+ return syncContext;
+ }
+
+ @Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return clock.getScheduledExecutorService();
+ }
+
+ @Override
+ public NameResolver.Factory getNameResolverFactory() {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public String getAuthority() {
+ throw new AssertionError("Should not be called");
+ }
+
+ @Override
+ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+ throw new AssertionError("Should not be called");
+ }
+ }
+
+ private static class FakeSocketAddress extends SocketAddress {
+ final String name;
+
+ FakeSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ // In reality wrappedHelper.createSubchannel() is always called from syncContext.
+ // Make sure it's the case in the test too.
+ private Subchannel createSubchannel(final int index, final Attributes attrs) {
+ final AtomicReference<Subchannel> returnedSubchannel = new AtomicReference<Subchannel>();
+ syncContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ returnedSubchannel.set(wrappedHelper.createSubchannel(eagLists[index], attrs));
+ }
+ });
+ return returnedSubchannel.get();
+ }
+}