services: client-side health checking main implementation (#5014)

Spec: https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md

This comes in the form of a wrapper LoadBalancerFactory. The public wrapping utility and the wrapping of RoundRobinLoadBalancer will come in follow-up changes.
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
index 989077e..b6ab74d 100644
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -158,11 +158,6 @@
    */
   public abstract void shutdown();
 
-  @Override
-  public String toString() {
-    return getClass().getSimpleName();
-  }
-
   /**
    * The main balancing logic.  It <strong>must be thread-safe</strong>. Typically it should only
    * synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
index 811ef86..fa89dc1 100644
--- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
+++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
@@ -122,7 +122,8 @@
         delegate = delegateProvider.newLoadBalancer(helper);
         if (channelTracer != null) {
           channelTracer.reportEvent(new ChannelTrace.Event.Builder()
-              .setDescription("Load balancer changed from " + old + " to " + delegate)
+              .setDescription("Load balancer changed from " + old.getClass().getSimpleName()
+                  + " to " + delegate.getClass().getSimpleName())
               .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
               .setTimestampNanos(timeProvider.currentTimeNanos())
               .build());
diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
index 5295327..f112e71 100644
--- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
+++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
@@ -62,6 +62,32 @@
 
   private ServiceConfigUtil() {}
 
+  /**
+   * Fetch the health-checked service name from service config. {@code null} if can't find one.
+   */
+  @Nullable
+  public static String getHealthCheckedServiceName(@Nullable Map<String, Object> serviceConfig) {
+    String healthCheckKey = "healthCheckConfig";
+    String serviceNameKey = "serviceName";
+    if (serviceConfig == null || !serviceConfig.containsKey(healthCheckKey)) {
+      return null;
+    }
+
+    /* schema as follows
+    {
+      "healthCheckConfig": {
+        // Service name to use in the health-checking request.
+        "serviceName": string
+      }
+    }
+    */
+    Map<String, Object> healthCheck = getObject(serviceConfig, healthCheckKey);
+    if (!healthCheck.containsKey(serviceNameKey)) {
+      return null;
+    }
+    return getString(healthCheck, "serviceName");
+  }
+
   @Nullable
   static Throttle getThrottlePolicy(@Nullable Map<String, Object> serviceConfig) {
     String retryThrottlingKey = "retryThrottling";
diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java
new file mode 100644
index 0000000..5be9926
--- /dev/null
+++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import com.google.common.base.MoreObjects;
+import io.grpc.Attributes;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer;
+import io.grpc.NameResolver;
+import io.grpc.Status;
+import java.util.List;
+
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
+public abstract class ForwardingLoadBalancer extends LoadBalancer {
+  /**
+   * Returns the underlying balancer.
+   */
+  protected abstract LoadBalancer delegate();
+
+  @Override
+  public void handleResolvedAddressGroups(
+        List<EquivalentAddressGroup> servers,
+        @NameResolver.ResolutionResultAttr Attributes attributes) {
+    delegate().handleResolvedAddressGroups(servers, attributes);
+  }
+
+  @Override
+  public void handleNameResolutionError(Status error) {
+    delegate().handleNameResolutionError(error);
+  }
+
+  @Override
+  public void handleSubchannelState(
+      Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+    delegate().handleSubchannelState(subchannel, stateInfo);
+  }
+
+  @Override
+  public void shutdown() {
+    delegate().shutdown();
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+  }
+}
diff --git a/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java
new file mode 100644
index 0000000..be6c7f7
--- /dev/null
+++ b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static org.mockito.Mockito.mock;
+
+import io.grpc.ForwardingTestUtil;
+import io.grpc.LoadBalancer;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ForwardingLoadBalancer}. */
+@RunWith(JUnit4.class)
+public class ForwardingLoadBalancerTest {
+  private final LoadBalancer mockDelegate = mock(LoadBalancer.class);
+
+  private final class TestBalancer extends ForwardingLoadBalancer {
+    @Override
+    protected LoadBalancer delegate() {
+      return mockDelegate;
+    }
+  }
+
+  @Test
+  public void allMethodsForwarded() throws Exception {
+    final SocketAddress mockAddr = mock(SocketAddress.class);
+    ForwardingTestUtil.testMethodsForwarded(
+        LoadBalancer.class,
+        mockDelegate,
+        new TestBalancer(),
+        Collections.<Method>emptyList());
+  }
+}
diff --git a/services/build.gradle b/services/build.gradle
index 1679de9..a22b9fd 100644
--- a/services/build.gradle
+++ b/services/build.gradle
@@ -28,7 +28,8 @@
 
     compileOnly libraries.javax_annotation
     testCompile project(':grpc-testing'),
-            libraries.netty_epoll // for DomainSocketAddress
+            libraries.netty_epoll, // for DomainSocketAddress
+            project(':grpc-core').sourceSets.test.output  // for FakeClock
     testCompileOnly libraries.javax_annotation
     signature "org.codehaus.mojo.signature:java17:1.0@signature"
 }
diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
new file mode 100644
index 0000000..e5f15d0
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java
@@ -0,0 +1,434 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Factory;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.SynchronizationContext.ScheduledHandle;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.GrpcAttributes;
+import io.grpc.internal.ServiceConfigUtil;
+import io.grpc.internal.TimeProvider;
+import io.grpc.util.ForwardingLoadBalancer;
+import io.grpc.util.ForwardingLoadBalancerHelper;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Wraps a {@link LoadBalancer} and implements the client-side health-checking
+ * (https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md).  The
+ * Subchannel received by the states wrapped LoadBalancer will be determined by health-checking.
+ *
+ * <p>Note the original LoadBalancer must call {@code Helper.createSubchannel()} from the
+ * SynchronizationContext, or it will throw.
+ */
+final class HealthCheckingLoadBalancerFactory extends Factory {
+  private static final Attributes.Key<HealthCheckState> KEY_HEALTH_CHECK_STATE =
+      Attributes.Key.create("io.grpc.services.HealthCheckingLoadBalancerFactory.healthCheckState");
+
+  private final Factory delegateFactory;
+  private final BackoffPolicy.Provider backoffPolicyProvider;
+  private final TimeProvider time;
+
+  public HealthCheckingLoadBalancerFactory(
+      Factory delegateFactory, BackoffPolicy.Provider backoffPolicyProvider, TimeProvider time) {
+    this.delegateFactory = checkNotNull(delegateFactory, "delegateFactory");
+    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
+    this.time = checkNotNull(time, "time");
+  }
+
+  @Override
+  public LoadBalancer newLoadBalancer(Helper helper) {
+    HelperImpl wrappedHelper = new HelperImpl(helper);
+    LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper);
+    wrappedHelper.init(delegateBalancer);
+    return new LoadBalancerImpl(wrappedHelper, delegateBalancer);
+  }
+
+  private final class HelperImpl extends ForwardingLoadBalancerHelper {
+    private final Helper delegate;
+    private final SynchronizationContext syncContext;
+    
+    private LoadBalancer delegateBalancer;
+    @Nullable String healthCheckedService;
+
+    final HashSet<HealthCheckState> hcStates = new HashSet<HealthCheckState>();
+
+    HelperImpl(Helper delegate) {
+      this.delegate = checkNotNull(delegate, "delegate");
+      this.syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
+    }
+
+    void init(LoadBalancer delegateBalancer) {
+      checkState(this.delegateBalancer == null, "init() already called");
+      this.delegateBalancer = checkNotNull(delegateBalancer, "delegateBalancer");
+    }
+
+    @Override
+    protected Helper delegate() {
+      return delegate;
+    }
+
+    @Override
+    public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+      // HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
+      // createSubchannel() from the SynchronizationContext.
+      syncContext.throwIfNotInThisSynchronizationContext();
+      HealthCheckState hcState = new HealthCheckState(
+          delegateBalancer, syncContext, delegate.getScheduledExecutorService());
+      hcStates.add(hcState);
+      Subchannel subchannel = super.createSubchannel(
+          addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build());
+      hcState.init(subchannel);
+      if (healthCheckedService != null) {
+        hcState.setServiceName(healthCheckedService);
+      }
+      return subchannel;
+    }
+
+    void setHealthCheckedService(@Nullable String service) {
+      healthCheckedService = service;
+      for (HealthCheckState hcState : hcStates) {
+        hcState.setServiceName(service);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+    }
+  }
+
+  private static final class LoadBalancerImpl extends ForwardingLoadBalancer {
+    final LoadBalancer delegate;
+    final HelperImpl helper;
+    final SynchronizationContext syncContext;
+    final ScheduledExecutorService timerService;
+
+    LoadBalancerImpl(HelperImpl helper, LoadBalancer delegate) {
+      this.helper = checkNotNull(helper, "helper");
+      this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
+      this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
+      this.delegate = checkNotNull(delegate, "delegate");
+    }
+
+    @Override
+    protected LoadBalancer delegate() {
+      return delegate;
+    }
+
+    @Override
+    public void handleResolvedAddressGroups(
+        List<EquivalentAddressGroup> servers, Attributes attributes) {
+      Map<String, Object> serviceConfig =
+          attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
+      String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig);
+      helper.setHealthCheckedService(serviceName);
+      super.handleResolvedAddressGroups(servers, attributes);
+    }
+
+    @Override
+    public void handleSubchannelState(
+        Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+      HealthCheckState hcState =
+          checkNotNull(subchannel.getAttributes().get(KEY_HEALTH_CHECK_STATE), "hcState");
+      hcState.updateRawState(stateInfo);
+
+      if (Objects.equal(stateInfo.getState(), SHUTDOWN)) {
+        helper.hcStates.remove(hcState);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+    }
+  }
+
+  
+  // All methods are run from syncContext
+  private final class HealthCheckState {
+    private final Runnable retryTask = new Runnable() {
+        @Override
+        public void run() {
+          startRpc();
+        }
+      };
+
+    private final LoadBalancer delegate;
+    private final SynchronizationContext syncContext;
+    private final ScheduledExecutorService timerService;
+
+    private Subchannel subchannel;
+
+    // Set when RPC started. Cleared when the RPC has closed or abandoned.
+    @Nullable
+    private HcStream activeRpc;
+
+    // The service name that should be used for health checking
+    private String serviceName;
+    private BackoffPolicy backoffPolicy;
+    // The state from the underlying Subchannel
+    private ConnectivityStateInfo rawState = ConnectivityStateInfo.forNonError(IDLE);
+    // The state concluded from health checking
+    private ConnectivityStateInfo concludedState = ConnectivityStateInfo.forNonError(IDLE);
+    // true if a health check stream should be kept.  When true, either there is an active RPC, or a
+    // retry is pending.
+    private boolean running;
+    // true if server returned UNIMPLEMENTED
+    private boolean disabled;
+    private ScheduledHandle retryTimer;
+
+    HealthCheckState(
+        LoadBalancer delegate, SynchronizationContext syncContext,
+        ScheduledExecutorService timerService) {
+      this.delegate = checkNotNull(delegate, "delegate");
+      this.syncContext = checkNotNull(syncContext, "syncContext");
+      this.timerService = checkNotNull(timerService, "timerService");
+    }
+
+    void init(Subchannel subchannel) {
+      checkState(this.subchannel == null, "init() already called");
+      this.subchannel = checkNotNull(subchannel, "subchannel");
+    }
+
+    void setServiceName(@Nullable String newServiceName) {
+      if (Objects.equal(newServiceName, serviceName)) {
+        return;
+      }
+      serviceName = newServiceName;
+      // If service name has changed while there is active RPC, cancel it so that
+      // a new call will be made with the new name.
+      String cancelMsg =
+          serviceName == null ? "Health check disabled by service config"
+          : "Switching to new service name: " + newServiceName;
+      stopRpc(cancelMsg);
+      adjustHealthCheck();
+    }
+
+    void updateRawState(ConnectivityStateInfo rawState) {
+      if (Objects.equal(this.rawState.getState(), READY)
+          && !Objects.equal(rawState.getState(), READY)) {
+        // A connection was lost.  We will reset disabled flag because health check
+        // may be available on the new connection.
+        disabled = false;
+        // TODO(zhangkun83): record this to channel tracer
+      }
+      this.rawState = rawState;
+      adjustHealthCheck();
+    }
+
+    private boolean isRetryTimerPending() {
+      return retryTimer != null && retryTimer.isPending();
+    }
+
+    // Start or stop health check according to the current states.
+    private void adjustHealthCheck() {
+      if (!disabled && serviceName != null && Objects.equal(rawState.getState(), READY)) {
+        running = true;
+        if (activeRpc == null && !isRetryTimerPending()) {
+          startRpc();
+        }
+      } else {
+        running = false;
+        // Prerequisites for health checking not met.
+        // Make sure it's stopped.
+        stopRpc("Client stops health check");
+        backoffPolicy = null;
+        gotoState(rawState);
+      }
+    }
+
+    private void startRpc() {
+      checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
+      checkState(subchannel != null, "init() not called");
+      // Optimization suggested by @markroth: if we are already READY and starting the health
+      // checking RPC, either because health check is just enabled or has switched to a new service
+      // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
+      // waiting for the health check to respond.
+      if (!Objects.equal(concludedState.getState(), READY)) {
+        gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
+      }
+      activeRpc = new HcStream();
+      activeRpc.start();
+    }
+
+    private void stopRpc(String msg) {
+      if (activeRpc != null) {
+        activeRpc.cancel(msg);
+        // Abandon this RPC.  We are not interested in anything from this RPC any more.
+        activeRpc = null;
+      }
+      if (retryTimer != null) {
+        retryTimer.cancel();
+        retryTimer = null;
+      }
+    }
+
+    private void gotoState(ConnectivityStateInfo newState) {
+      checkState(subchannel != null, "init() not called");
+      if (!Objects.equal(concludedState, newState)) {
+        concludedState = newState;
+        delegate.handleSubchannelState(subchannel, concludedState);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("running", running)
+          .add("disabled", disabled)
+          .add("activeRpc", activeRpc)
+          .add("serviceName", serviceName)
+          .add("rawState", rawState)
+          .add("concludedState", concludedState)
+          .toString();
+    }
+
+    private class HcStream extends ClientCall.Listener<HealthCheckResponse> {
+      private final ClientCall<HealthCheckRequest, HealthCheckResponse> call;
+      private final String callServiceName;
+      private final long callCreationNanos;
+      private boolean callHasResponded;
+
+      HcStream() {
+        callCreationNanos = time.currentTimeNanos();
+        callServiceName = serviceName;
+        call = subchannel.asChannel().newCall(HealthGrpc.getWatchMethod(), CallOptions.DEFAULT);
+      }
+
+      void start() {
+        call.start(this, new Metadata());
+        call.sendMessage(HealthCheckRequest.newBuilder().setService(serviceName).build());
+        call.halfClose();
+        call.request(1);
+      }
+
+      void cancel(String msg) {
+        call.cancel(msg, null);
+      }
+
+      @Override
+      public void onMessage(final HealthCheckResponse response) {
+        syncContext.execute(new Runnable() {
+            @Override
+            public void run() {
+              if (activeRpc == HcStream.this) {
+                handleResponse(response);
+              }
+            }
+          });
+      }
+
+      @Override
+      public void onClose(final Status status, Metadata trailers) {
+        syncContext.execute(new Runnable() {
+            @Override
+            public void run() {
+              if (activeRpc == HcStream.this) {
+                activeRpc = null;
+                handleStreamClosed(status);
+              }
+            }
+          });
+      }
+
+      void handleResponse(HealthCheckResponse response) {
+        callHasResponded = true;
+        backoffPolicy = null;
+        // running == true means the Subchannel's state (rawState) is READY
+        if (Objects.equal(response.getStatus(), ServingStatus.SERVING)) {
+          gotoState(ConnectivityStateInfo.forNonError(READY));
+        } else {
+          gotoState(
+              ConnectivityStateInfo.forTransientFailure(
+                  Status.UNAVAILABLE.withDescription(
+                      "Health-check service responded "
+                      + response.getStatus() + " for '" + callServiceName + "'")));
+        }
+        call.request(1);
+      }
+
+      void handleStreamClosed(Status status) {
+        if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
+          // TODO(zhangkun83): record this to channel tracer
+          disabled = true;
+          gotoState(rawState);
+          return;
+        }
+        long delayNanos = 0;
+        gotoState(
+            ConnectivityStateInfo.forTransientFailure(
+                Status.UNAVAILABLE.withDescription(
+                    "Health-check stream unexpectedly closed with "
+                    + status + " for '" + callServiceName + "'")));
+        // Use backoff only when server has not responded for the previous call
+        if (!callHasResponded) {
+          if (backoffPolicy == null) {
+            backoffPolicy = backoffPolicyProvider.get();
+          }
+          delayNanos =
+              callCreationNanos + backoffPolicy.nextBackoffNanos() - time.currentTimeNanos();
+        }
+        if (delayNanos <= 0) {
+          startRpc();
+        } else {
+          checkState(!isRetryTimerPending(), "Retry double scheduled");
+          retryTimer = syncContext.schedule(
+              retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
+        }
+      }
+
+      @Override
+      public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("callStarted", call != null)
+            .add("serviceName", callServiceName)
+            .add("hasResponded", callHasResponded)
+            .toString();
+      }
+    }
+  }
+}
diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
new file mode 100644
index 0000000..68cf840
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java
@@ -0,0 +1,1110 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.CONNECTING;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+import static org.junit.Assert.fail;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Attributes;
+import io.grpc.Channel;
+import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.Context;
+import io.grpc.Context.CancellationListener;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Factory;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.ManagedChannel;
+import io.grpc.NameResolver;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.health.v1.HealthGrpc;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.GrpcAttributes;
+import io.grpc.internal.ServiceConfigUtil;
+import io.grpc.stub.StreamObserver;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link HealthCheckingLoadBalancerFactory}. */
+@RunWith(JUnit4.class)
+public class HealthCheckingLoadBalancerFactoryTest {
+  private static final Attributes.Key<String> SUBCHANNEL_ATTR_KEY =
+      Attributes.Key.create("subchannel-attr-for-test");
+
+  // We use in-process channels for Subchannel.asChannel(), so that we make sure we are making RPCs
+  // correctly.  Mocking Channel and ClientCall is a bad idea because it can easily be done wrong.
+  // Each Channel goes to a different server, so that we can verify the health check activity on
+  // each Subchannel.
+  private static final int NUM_SUBCHANNELS = 2;
+  private final EquivalentAddressGroup[] eags = new EquivalentAddressGroup[NUM_SUBCHANNELS];
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private final List<EquivalentAddressGroup>[] eagLists = new List[NUM_SUBCHANNELS];
+  private List<EquivalentAddressGroup> resolvedAddressList;
+  private final Subchannel[] subchannels = new Subchannel[NUM_SUBCHANNELS];
+  private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS];
+  private final Server[] servers = new Server[NUM_SUBCHANNELS];
+  private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS];
+
+  private final SynchronizationContext syncContext = new SynchronizationContext(
+      new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          throw new AssertionError(e);
+        }
+      });
+  private final FakeClock clock = new FakeClock();
+  private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper()));
+  // The helper seen by the origLb
+  private Helper wrappedHelper;
+  private final Factory origLbFactory =
+      mock(Factory.class, delegatesTo(new Factory() {
+          @Override
+          public LoadBalancer newLoadBalancer(Helper helper) {
+            checkState(wrappedHelper == null, "LoadBalancer already created");
+            wrappedHelper = helper;
+            return origLb;
+          }
+        }));
+
+  @Mock
+  private LoadBalancer origLb;
+  @Captor
+  ArgumentCaptor<Attributes> attrsCaptor;
+  @Mock
+  private BackoffPolicy.Provider backoffPolicyProvider;
+  @Mock
+  private BackoffPolicy backoffPolicy1;
+  @Mock
+  private BackoffPolicy backoffPolicy2;
+
+  private HealthCheckingLoadBalancerFactory hcLbFactory;
+  private LoadBalancer hcLbEventDelivery;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+      HealthImpl healthImpl = new HealthImpl();
+      healthImpls[i] = healthImpl;
+      Server server =
+          InProcessServerBuilder.forName("health-check-test-" + i)
+          .addService(healthImpl).directExecutor().build().start();
+      servers[i] = server;
+      ManagedChannel channel =
+          InProcessChannelBuilder.forName("health-check-test-" + i).directExecutor().build();
+      channels[i] = channel;
+
+      EquivalentAddressGroup eag =
+          new EquivalentAddressGroup(new FakeSocketAddress("address-" + i));
+      eags[i] = eag;
+      List<EquivalentAddressGroup> eagList = Arrays.asList(eag);
+      eagLists[i] = eagList;
+    }
+    resolvedAddressList = Arrays.asList(eags);
+    
+    when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
+    when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L, 31L);
+    when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L, 32L);
+
+    hcLbFactory = new HealthCheckingLoadBalancerFactory(
+        origLbFactory, backoffPolicyProvider, clock.getTimeProvider());
+    final LoadBalancer hcLb = hcLbFactory.newLoadBalancer(origHelper);
+    // Make sure all calls into the hcLb is from the syncContext
+    hcLbEventDelivery = new LoadBalancer() {
+        @Override
+        public void handleResolvedAddressGroups(
+            final List<EquivalentAddressGroup> servers, final Attributes attributes) {
+          syncContext.execute(new Runnable() {
+              @Override
+              public void run() {
+                hcLb.handleResolvedAddressGroups(servers, attributes);
+              }
+            });
+        }
+
+        @Override
+        public void handleSubchannelState(
+            final Subchannel subchannel, final ConnectivityStateInfo stateInfo) {
+          syncContext.execute(new Runnable() {
+              @Override
+              public void run() {
+                hcLb.handleSubchannelState(subchannel, stateInfo);
+              }
+            });
+        }
+
+        @Override
+        public void handleNameResolutionError(Status error) {
+          throw new AssertionError("Not supposed to be called");
+        }
+
+        @Override
+        public void shutdown() {
+          throw new AssertionError("Not supposed to be called");
+        }
+      };
+    verify(origLbFactory).newLoadBalancer(any(Helper.class));
+  }
+
+  @After
+  public void teardown() throws Exception {
+    // All scheduled tasks have been accounted for
+    assertThat(clock.getPendingTasks()).isEmpty();
+    // Health-check streams are usually not closed in the tests because handleSubchannelState() is
+    // faked.  Force closing for clean up.
+    for (Server server : servers) {
+      server.shutdownNow();
+      assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
+    }
+    for (ManagedChannel channel : channels) {
+      channel.shutdownNow();
+      assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
+    }
+    for (HealthImpl impl : healthImpls) {
+      assertThat(impl.checkCalled).isFalse();
+    }
+  }
+
+  @Test
+  public void createSubchannelThrowsIfCalledOutsideSynchronizationContext() {
+    try {
+      wrappedHelper.createSubchannel(eagLists[0], Attributes.EMPTY);
+      fail("Should throw");
+    } catch (IllegalStateException e) {
+      assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext");
+    }
+  }
+
+  @Test
+  public void typicalWorkflow() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verify(origHelper, atLeast(0)).getSynchronizationContext();
+    verify(origHelper, atLeast(0)).getScheduledExecutorService();
+    verifyNoMoreInteractions(origHelper);
+    verifyNoMoreInteractions(origLb);
+
+    // Simulate that the orignal LB creates Subchannels
+    for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+      // Subchannel attributes set by origLb are correctly plumbed in
+      String subchannelAttrValue = "eag attr " + i;
+      Attributes attrs = Attributes.newBuilder()
+          .set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
+      // We don't wrap Subchannels, thus origLb gets the original Subchannels.
+      assertThat(createSubchannel(i, attrs)).isSameAs(subchannels[i]);
+      verify(origHelper).createSubchannel(same(eagLists[i]), attrsCaptor.capture());
+      assertThat(attrsCaptor.getValue().get(SUBCHANNEL_ATTR_KEY)).isEqualTo(subchannelAttrValue);
+    }
+
+    for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) {
+      // Not starting health check until underlying Subchannel is READY
+      Subchannel subchannel = subchannels[i];
+      HealthImpl healthImpl = healthImpls[i];
+      InOrder inOrder = inOrder(origLb);
+      hcLbEventDelivery.handleSubchannelState(
+          subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
+      hcLbEventDelivery.handleSubchannelState(
+          subchannel, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
+      hcLbEventDelivery.handleSubchannelState(
+          subchannel, ConnectivityStateInfo.forNonError(IDLE));
+
+      inOrder.verify(origLb).handleSubchannelState(
+          same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+      inOrder.verify(origLb).handleSubchannelState(
+          same(subchannel), eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)));
+      inOrder.verify(origLb).handleSubchannelState(
+          same(subchannel), eq(ConnectivityStateInfo.forNonError(IDLE)));
+      verifyNoMoreInteractions(origLb);
+
+      assertThat(healthImpl.calls).isEmpty();
+      hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+      assertThat(healthImpl.calls).hasSize(1);
+      ServerSideCall serverCall = healthImpl.calls.peek();
+      assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+      // Starting the health check will make the Subchannel appear CONNECTING to the origLb.
+      inOrder.verify(origLb).handleSubchannelState(
+          same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+      verifyNoMoreInteractions(origLb);
+
+      // Simulate a series of responses.
+      for (ServingStatus servingStatus :
+               new ServingStatus[] {
+                 ServingStatus.UNKNOWN, ServingStatus.NOT_SERVING, ServingStatus.SERVICE_UNKNOWN,
+                 ServingStatus.SERVING, ServingStatus.NOT_SERVING, ServingStatus.SERVING}) {
+        serverCall.responseObserver.onNext(makeResponse(servingStatus));
+        // SERVING is mapped to READY, while other statuses are mapped to TRANSIENT_FAILURE
+        if (servingStatus == ServingStatus.SERVING) {
+          inOrder.verify(origLb).handleSubchannelState(
+              same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+        } else {
+          inOrder.verify(origLb).handleSubchannelState(
+              same(subchannel),unavailableStateWithMsg(
+                  "Health-check service responded " + servingStatus + " for 'FooService'"));
+        }
+        verifyNoMoreInteractions(origLb);
+      }
+    }
+
+    // origLb shuts down Subchannels
+    for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+      Subchannel subchannel = subchannels[i];
+
+      ServerSideCall serverCall = healthImpls[i].calls.peek();
+      assertThat(serverCall.cancelled).isFalse();
+      verifyNoMoreInteractions(origLb);
+
+      // Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the
+      // health check RPC
+      subchannel.shutdown();
+      assertThat(serverCall.cancelled).isTrue();
+      verify(origLb).handleSubchannelState(
+          same(subchannel), eq(ConnectivityStateInfo.forNonError(SHUTDOWN)));
+    }
+
+    for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+      assertThat(healthImpls[i].calls).hasSize(1);
+    }
+
+    verifyZeroInteractions(backoffPolicyProvider);
+  }
+
+  @Test
+  public void healthCheckDisabledWhenServiceNotImplemented() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("BarService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    // We create 2 Subchannels. One of them connects to a server that doesn't implement health check
+    for (int i = 0; i < 2; i++) {
+      createSubchannel(i, Attributes.EMPTY);
+    }
+
+    InOrder inOrder = inOrder(origLb);
+
+    for (int i = 0; i < 2; i++) {
+      hcLbEventDelivery.handleSubchannelState(
+          subchannels[i], ConnectivityStateInfo.forNonError(READY));
+      assertThat(healthImpls[i].calls).hasSize(1);
+      inOrder.verify(origLb).handleSubchannelState(
+          same(subchannels[i]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    }
+
+    ServerSideCall serverCall0 = healthImpls[0].calls.poll();
+    ServerSideCall serverCall1 = healthImpls[1].calls.poll();
+
+    // subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health
+    // checking and it'll use the original state, which is currently READY.
+    // In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell
+    // whether it's the server library or the service implementation that returned this status.
+    serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException());
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    // subchannels[1] has normal health checking
+    serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannels[1]),
+        unavailableStateWithMsg("Health-check service responded NOT_SERVING for 'BarService'"));
+
+    // Without health checking, states from underlying Subchannel are delivered directly to origLb
+    hcLbEventDelivery.handleSubchannelState(
+        subchannels[0], ConnectivityStateInfo.forNonError(IDLE));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(IDLE)));
+
+    // Re-connecting on a Subchannel will reset the "disabled" flag.
+    assertThat(healthImpls[0].calls).hasSize(0);
+    hcLbEventDelivery.handleSubchannelState(
+        subchannels[0], ConnectivityStateInfo.forNonError(READY));
+    assertThat(healthImpls[0].calls).hasSize(1);
+    serverCall0 = healthImpls[0].calls.poll();
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+    // Health check now works as normal
+    serverCall0.responseObserver.onNext(makeResponse(ServingStatus.SERVICE_UNKNOWN));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannels[0]),
+        unavailableStateWithMsg("Health-check service responded SERVICE_UNKNOWN for 'BarService'"));
+
+    verifyNoMoreInteractions(origLb);
+    verifyZeroInteractions(backoffPolicyProvider);
+  }
+
+  @Test
+  public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+    assertThat(clock.getPendingTasks()).isEmpty();
+
+    // Server closes the health checking RPC without any response
+    healthImpl.calls.poll().responseObserver.onCompleted();
+
+    // which results in TRANSIENT_FAILURE
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+    // Retry with backoff is scheduled
+    inOrder.verify(backoffPolicyProvider).get();
+    inOrder.verify(backoffPolicy1).nextBackoffNanos();
+    assertThat(clock.getPendingTasks()).hasSize(1);
+
+    verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
+    assertThat(clock.getPendingTasks()).isEmpty();
+    
+    // Server closes the health checking RPC without any response
+    healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
+
+    // which also results in TRANSIENT_FAILURE, with a different description
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with "
+            + Status.CANCELLED + " for 'TeeService'"));
+
+    // Retry with backoff
+    inOrder.verify(backoffPolicy1).nextBackoffNanos();
+
+    verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 21);
+    
+    // Server responds this time
+    healthImpl.calls.poll().responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    verifyNoMoreInteractions(origLb, backoffPolicyProvider, backoffPolicy1);
+  }
+
+  @Test
+  public void serverRespondResetsBackoff() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+    assertThat(clock.getPendingTasks()).isEmpty();
+
+    // Server closes the health checking RPC without any response
+    healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException());
+
+    // which results in TRANSIENT_FAILURE
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with "
+            + Status.CANCELLED + " for 'TeeService'"));
+
+    // Retry with backoff is scheduled
+    inOrder.verify(backoffPolicyProvider).get();
+    inOrder.verify(backoffPolicy1).nextBackoffNanos();
+    assertThat(clock.getPendingTasks()).hasSize(1);
+
+    verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11);
+    assertThat(clock.getPendingTasks()).isEmpty();
+
+    // Server responds
+    healthImpl.calls.peek().responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    verifyNoMoreInteractions(origLb);
+
+    // then closes the stream
+    healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with "
+            + Status.UNAVAILABLE + " for 'TeeService'"));
+
+    // Because server has responded, the first retry is not subject to backoff.
+    // But the backoff policy has been reset.  A new backoff policy will be used for
+    // the next backed-off retry.
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    assertThat(healthImpl.calls).hasSize(1);
+    assertThat(clock.getPendingTasks()).isEmpty();
+    inOrder.verifyNoMoreInteractions();
+
+    // then closes the stream for this retry
+    healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException());
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with "
+            + Status.UNAVAILABLE + " for 'TeeService'"));
+
+    // New backoff policy is used
+    inOrder.verify(backoffPolicyProvider).get();
+    // Retry with a new backoff policy
+    inOrder.verify(backoffPolicy2).nextBackoffNanos();
+
+    verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 12);
+  }
+
+  private void verifyRetryAfterNanos(
+      InOrder inOrder, Subchannel subchannel, HealthImpl impl, long nanos) {
+    assertThat(impl.calls).isEmpty();
+    clock.forwardNanos(nanos - 1);
+    assertThat(impl.calls).isEmpty();
+    inOrder.verifyNoMoreInteractions();
+    verifyNoMoreInteractions(origLb);
+    clock.forwardNanos(1);
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    assertThat(impl.calls).hasSize(1);
+  }
+
+  @Test
+  public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
+    // No service config, thus no health check.
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(Attributes.EMPTY));
+    verifyNoMoreInteractions(origLb);
+
+    // First, create Subchannels 0
+    createSubchannel(0, Attributes.EMPTY);
+
+    // No health check activity.  Underlying Subchannel states are directly propagated
+    hcLbEventDelivery.handleSubchannelState(
+        subchannels[0], ConnectivityStateInfo.forNonError(READY));
+    assertThat(healthImpls[0].calls).isEmpty();
+    verify(origLb).handleSubchannelState(
+        same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    verifyNoMoreInteractions(origLb);
+
+    // Service config enables health check
+    Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+    verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+
+    // Health check started on existing Subchannel
+    assertThat(healthImpls[0].calls).hasSize(1);
+
+    // State stays in READY, instead of switching to CONNECTING.
+    verifyNoMoreInteractions(origLb);
+
+    // Start Subchannel 1, which will have health check
+    createSubchannel(1, Attributes.EMPTY);
+    assertThat(healthImpls[1].calls).isEmpty();
+    hcLbEventDelivery.handleSubchannelState(
+        subchannels[1], ConnectivityStateInfo.forNonError(READY));
+    assertThat(healthImpls[1].calls).hasSize(1);
+  }
+
+  @Test
+  public void serviceConfigDisablesHealthCheckWhenRpcActive() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    inOrder.verifyNoMoreInteractions();
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+    ServerSideCall serverCall = healthImpl.calls.poll();
+    assertThat(serverCall.cancelled).isFalse();
+
+    // NameResolver gives an update without service config, thus health check will be disabled
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+    // Health check RPC cancelled.
+    assertThat(serverCall.cancelled).isTrue();
+    // Subchannel uses original state
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(Attributes.EMPTY));
+
+    verifyNoMoreInteractions(origLb);
+    assertThat(healthImpl.calls).isEmpty();
+  }
+
+  @Test
+  public void serviceConfigDisablesHealthCheckWhenRetryPending() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+    inOrder.verifyNoMoreInteractions();
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+
+    // Server closes the stream without responding.  Client in retry backoff
+    assertThat(clock.getPendingTasks()).isEmpty();
+    healthImpl.calls.poll().responseObserver.onCompleted();
+    assertThat(clock.getPendingTasks()).hasSize(1);
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+    // NameResolver gives an update without service config, thus health check will be disabled
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+    // Retry timer is cancelled
+    assertThat(clock.getPendingTasks()).isEmpty();
+
+    // No retry was attempted
+    assertThat(healthImpl.calls).isEmpty();
+
+    // Subchannel uses original state
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(Attributes.EMPTY));
+
+    verifyNoMoreInteractions(origLb);
+  }
+
+  @Test
+  public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+
+    // Underlying subchannel is not READY initially
+    ConnectivityStateInfo underlyingErrorState =
+        ConnectivityStateInfo.forTransientFailure(
+            Status.UNAVAILABLE.withDescription("connection refused"));
+    hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState);
+    inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState));
+    inOrder.verifyNoMoreInteractions();
+
+    // NameResolver gives an update without service config, thus health check will be disabled
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(Attributes.EMPTY));
+
+    // Underlying subchannel is now ready
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+
+    // Since health check is disabled, READY state is propagated directly.
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    // and there is no health check activity.
+    assertThat(healthImpls[0].calls).isEmpty();
+
+    verifyNoMoreInteractions(origLb);
+  }
+
+  @Test
+  public void serviceConfigChangesServiceNameWhenRpcActive() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+    ServerSideCall serverCall = healthImpl.calls.poll();
+    assertThat(serverCall.cancelled).isFalse();
+    assertThat(serverCall.request).isEqualTo(makeRequest("TeeService"));
+
+    // Health check responded
+    serverCall.responseObserver.onNext(makeResponse(ServingStatus.SERVING));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
+
+    // Service config returns with the same health check name.
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+    // It's delivered to origLb, but nothing else happens
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    // Service config returns a different health check name.
+    resolutionAttrs = attrsWithHealthCheckService("FooService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+
+    // Current health check RPC cancelled.
+    assertThat(serverCall.cancelled).isTrue();
+
+    // A second RPC is started immediately
+    assertThat(healthImpl.calls).hasSize(1);
+    serverCall = healthImpl.calls.poll();
+    // with the new service name
+    assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+    // State stays in READY, instead of switching to CONNECTING.
+    verifyNoMoreInteractions(origLb);
+  }
+
+  @Test
+  public void serviceConfigChangesServiceNameWhenRetryPending() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+    HealthImpl healthImpl = healthImpls[0];
+    assertThat(healthImpl.calls).hasSize(1);
+    ServerSideCall serverCall = healthImpl.calls.poll();
+    assertThat(serverCall.cancelled).isFalse();
+    assertThat(serverCall.request).isEqualTo(makeRequest("TeeService"));
+
+    // Health check stream closed without responding.  Client in retry backoff.
+    assertThat(clock.getPendingTasks()).isEmpty();
+    serverCall.responseObserver.onCompleted();
+    assertThat(clock.getPendingTasks()).hasSize(1);
+    assertThat(healthImpl.calls).isEmpty();
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel),
+        unavailableStateWithMsg(
+            "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
+
+    // Service config returns with the same health check name.
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+    // It's delivered to origLb, but nothing else happens
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+    assertThat(clock.getPendingTasks()).hasSize(1);
+    assertThat(healthImpl.calls).isEmpty();
+
+    // Service config returns a different health check name.
+    resolutionAttrs = attrsWithHealthCheckService("FooService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    // Concluded CONNECTING state
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+
+    // Current retry timer cancelled
+    assertThat(clock.getPendingTasks()).isEmpty();
+
+    // A second RPC is started immediately
+    assertThat(healthImpl.calls).hasSize(1);
+    serverCall = healthImpl.calls.poll();
+    // with the new service name
+    assertThat(serverCall.request).isEqualTo(makeRequest("FooService"));
+
+    verifyNoMoreInteractions(origLb);
+  }
+
+  @Test
+  public void serviceConfigChangesServiceNameWhenRpcInactive() {
+    Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
+    verifyNoMoreInteractions(origLb);
+
+    Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
+    assertThat(subchannel).isSameAs(subchannels[0]);
+    InOrder inOrder = inOrder(origLb);
+    HealthImpl healthImpl = healthImpls[0];
+
+    // Underlying subchannel is not READY initially
+    ConnectivityStateInfo underlyingErrorState =
+        ConnectivityStateInfo.forTransientFailure(
+            Status.UNAVAILABLE.withDescription("connection refused"));
+    hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState);
+    inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState));
+    inOrder.verifyNoMoreInteractions();
+
+    // Service config returns with the same health check name.
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+    // It's delivered to origLb, but nothing else happens
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+    assertThat(healthImpl.calls).isEmpty();
+    verifyNoMoreInteractions(origLb);
+
+    // Service config returns a different health check name.
+    resolutionAttrs = attrsWithHealthCheckService("FooService");
+    hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
+
+    inOrder.verify(origLb).handleResolvedAddressGroups(
+        same(resolvedAddressList), same(resolutionAttrs));
+
+    // Underlying subchannel is now ready
+    hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+
+    // Concluded CONNECTING state
+    inOrder.verify(origLb).handleSubchannelState(
+        same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
+
+    // Health check RPC is started
+    assertThat(healthImpl.calls).hasSize(1);
+    // with the new service name
+    assertThat(healthImpl.calls.poll().request).isEqualTo(makeRequest("FooService"));
+
+    verifyNoMoreInteractions(origLb);
+  }
+
+  @Test
+  public void getHealthCheckedServiceName_nullServiceConfig() {
+    assertThat(ServiceConfigUtil.getHealthCheckedServiceName(null)).isNull();
+  }
+
+  @Test
+  public void getHealthCheckedServiceName_noHealthCheckConfig() {
+    assertThat(ServiceConfigUtil.getHealthCheckedServiceName(new HashMap<String, Object>()))
+        .isNull();
+  }
+
+  @Test
+  public void getHealthCheckedServiceName_healthCheckConfigMissingServiceName() {
+    HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+    HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+    serviceConfig.put("healthCheckConfig", hcConfig);
+    assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig)).isNull();
+  }
+
+  @Test
+  public void getHealthCheckedServiceName_healthCheckConfigHasServiceName() {
+    HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+    HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+    hcConfig.put("serviceName", "FooService");
+    serviceConfig.put("healthCheckConfig", hcConfig);
+    assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig))
+        .isEqualTo("FooService");
+  }
+
+  private Attributes attrsWithHealthCheckService(@Nullable String serviceName) {
+    HashMap<String, Object> serviceConfig = new HashMap<String, Object>();
+    HashMap<String, Object> hcConfig = new HashMap<String, Object>();
+    hcConfig.put("serviceName", serviceName);
+    serviceConfig.put("healthCheckConfig", hcConfig);
+    return Attributes.newBuilder()
+        .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
+  }
+
+  private HealthCheckRequest makeRequest(String service) {
+    return HealthCheckRequest.newBuilder().setService(service).build();
+  }
+
+  private HealthCheckResponse makeResponse(ServingStatus status) {
+    return HealthCheckResponse.newBuilder().setStatus(status).build();
+  }
+
+  private ConnectivityStateInfo unavailableStateWithMsg(final String expectedMsg) {
+    return argThat(new org.hamcrest.BaseMatcher<ConnectivityStateInfo>() {
+        @Override
+        public boolean matches(Object item) {
+          if (!(item instanceof ConnectivityStateInfo)) {
+            return false;
+          }
+          ConnectivityStateInfo info = (ConnectivityStateInfo) item;
+          if (!info.getState().equals(TRANSIENT_FAILURE)) {
+            return false;
+          }
+          Status error = info.getStatus();
+          if (!error.getCode().equals(Code.UNAVAILABLE)) {
+            return false;
+          }
+          if (!error.getDescription().equals(expectedMsg)) {
+            return false;
+          }
+          return true;
+        }
+
+        @Override
+        public void describeTo(org.hamcrest.Description desc) {
+          desc.appendText("Matches unavailable state with msg='" + expectedMsg + "'");
+        }
+      });
+  }
+
+  private static class HealthImpl extends HealthGrpc.HealthImplBase {
+    boolean isImplemented = true;
+    boolean checkCalled;
+    final LinkedList<ServerSideCall> calls = new LinkedList<ServerSideCall>();
+
+    @Override
+    public void check(HealthCheckRequest request,
+        StreamObserver<HealthCheckResponse> responseObserver) {
+      responseObserver.onError(new UnsupportedOperationException("Should never be called"));
+      checkCalled = true;
+    }
+
+    @Override
+    public void watch(HealthCheckRequest request,
+        StreamObserver<HealthCheckResponse> responseObserver) {
+      final ServerSideCall call = new ServerSideCall(request, responseObserver);
+      Context.current().addListener(
+          new CancellationListener() {
+            @Override
+            public void cancelled(Context ctx) {
+              call.cancelled = true;
+            }
+          }, MoreExecutors.directExecutor());
+      calls.add(call);
+    }
+  }
+
+  private static class ServerSideCall {
+    final HealthCheckRequest request;
+    final StreamObserver<HealthCheckResponse> responseObserver;
+    boolean cancelled;
+
+    ServerSideCall(
+        HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
+      this.request = request;
+      this.responseObserver = responseObserver;
+    }
+  }
+
+  private class FakeSubchannel extends Subchannel {
+    final List<EquivalentAddressGroup> eagList;
+    final Attributes attrs;
+    final Channel channel;
+
+    FakeSubchannel(List<EquivalentAddressGroup> eagList, Attributes attrs, Channel channel) {
+      this.eagList = Collections.unmodifiableList(eagList);
+      this.attrs = checkNotNull(attrs);
+      this.channel = checkNotNull(channel);
+    }
+
+    @Override
+    public void shutdown() {
+      hcLbEventDelivery.handleSubchannelState(this, ConnectivityStateInfo.forNonError(SHUTDOWN));
+    }
+
+    @Override
+    public void requestConnection() {
+      throw new AssertionError("Should not be called");
+    }
+
+    @Override
+    public List<EquivalentAddressGroup> getAllAddresses() {
+      return eagList;
+    }
+
+    @Override
+    public Attributes getAttributes() {
+      return attrs;
+    }
+
+    @Override
+    public Channel asChannel() {
+      return channel;
+    }
+  }
+
+  private class FakeHelper extends Helper {
+    @Override
+    public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+      int index = -1;
+      for (int i = 0; i < NUM_SUBCHANNELS; i++) {
+        if (eagLists[i] == addrs) {
+          index = i;
+          break;
+        }
+      }
+      checkState(index >= 0, "addrs " + addrs + " not found");
+      Subchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]);
+      checkState(subchannels[index] == null, "subchannels[" + index + "] already created");
+      subchannels[index] = subchannel;
+      return subchannel;
+    }
+
+    @Override
+    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
+      throw new AssertionError("Should not be called");
+    }
+
+    @Override
+    public SynchronizationContext getSynchronizationContext() {
+      return syncContext;
+    }
+
+    @Override
+    public ScheduledExecutorService getScheduledExecutorService() {
+      return clock.getScheduledExecutorService();
+    }
+
+    @Override
+    public NameResolver.Factory getNameResolverFactory() {
+      throw new AssertionError("Should not be called");
+    }
+
+    @Override
+    public String getAuthority() {
+      throw new AssertionError("Should not be called");
+    }
+
+    @Override
+    public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+      throw new AssertionError("Should not be called");
+    }
+  }
+
+  private static class FakeSocketAddress extends SocketAddress {
+    final String name;
+
+    FakeSocketAddress(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
+
+  // In reality wrappedHelper.createSubchannel() is always called from syncContext.
+  // Make sure it's the case in the test too.
+  private Subchannel createSubchannel(final int index, final Attributes attrs) {
+    final AtomicReference<Subchannel> returnedSubchannel = new AtomicReference<Subchannel>();
+    syncContext.execute(new Runnable() {
+        @Override
+        public void run() {
+          returnedSubchannel.set(wrappedHelper.createSubchannel(eagLists[index], attrs));
+        }
+      });
+    return returnedSubchannel.get();
+  }
+}