Move name resolution retry from managed channel to name resolver. (#9758)

This change has these main aspects to it:

1. Removal of any name resolution responsibility from ManagedChannelImpl
2. Creation of a new RetryScheduler to own generic retry logic
     - Can also be used outside the name resolution context
3. Creation of a new RetryingNameScheduler that can be used to wrap any
   polling name resolver to add retry capability
4. A new facility in NameResolver to allow implementations to notify
   listeners on the success of name resolution attempts
     - RetryingNameScheduler relies on this
diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java
index cfe2e93..acc146f 100644
--- a/api/src/main/java/io/grpc/NameResolver.java
+++ b/api/src/main/java/io/grpc/NameResolver.java
@@ -59,6 +59,10 @@
  */
 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
 public abstract class NameResolver {
+
+  // Outside listeners that get notified of the result of each name resolution.
+  private ArrayList<ResolutionResultListener> resolutionResultListeners = new ArrayList<>();
+
   /**
    * Returns the authority used to authenticate connections to servers.  It <strong>must</strong> be
    * from a trusted source, because if the authority is tampered with, RPCs may be sent to the
@@ -91,8 +95,9 @@
           }
 
           @Override
-          public void onResult(ResolutionResult resolutionResult) {
+          public boolean onResult(ResolutionResult resolutionResult) {
             listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
+            return true;
           }
       });
     }
@@ -132,6 +137,43 @@
   public void refresh() {}
 
   /**
+   * Adds a new {@link ResolutionResultListener} that will get notified of the outcome of each
+   * resolution.
+   *
+   * @since 1.53.0
+   */
+  public final void addResolutionResultListener(ResolutionResultListener listener) {
+    checkArgument(listener != null, "listener");
+    resolutionResultListeners.add(listener);
+  }
+
+  /**
+   * Removes an existing {@link ResolutionResultListener}.
+   *
+   * @return {@code true} if the listener was removed, otherwise {@code false}
+   * @since 1.53.0
+   */
+  public final boolean removeResolutionResultListener(ResolutionResultListener listener) {
+    checkArgument(listener != null);
+    return resolutionResultListeners.remove(listener);
+  }
+
+  /**
+   * Intended for extending classes to call when they know the result of a name resolution.
+   *
+   * <p>Note that while these listeners can be added to any {@link NameResolver}, only concrete
+   * implementations that call this method will actually support this facility.
+   *
+   * @param successful {@code true} if resolution was successful and the addresses were accepted.
+   * @since 1.53.0
+   */
+  protected final void fireResolutionResultEvent(boolean successful) {
+    for (ResolutionResultListener listener : resolutionResultListeners) {
+      listener.resolutionAttempted(successful);
+    }
+  }
+
+  /**
    * Factory that creates {@link NameResolver} instances.
    *
    * @since 1.0.0
@@ -225,9 +267,12 @@
      * {@link ResolutionResult#getAddresses()} is empty, {@link #onError(Status)} will be called.
      *
      * @param resolutionResult the resolved server addresses, attributes, and Service Config.
+     * @return {@code true} if the listener accepts the resolved addresses, otherwise {@code false}.
+     *         If the addresses are not accepted the {@link NameResolver} will refresh and retry
+     *         later if it uses polling.
      * @since 1.21.0
      */
-    public abstract void onResult(ResolutionResult resolutionResult);
+    public abstract boolean onResult(ResolutionResult resolutionResult);
 
     /**
      * Handles a name resolving error from the resolver. The listener is responsible for eventually
@@ -249,6 +294,24 @@
   @Documented
   public @interface ResolutionResultAttr {}
 
+
+  /**
+   * A callback interface called at the end of every resolve operation to indicate if the operation
+   * was successful. Success means that there were no problems with either the name resolution part
+   * nor with {@link Listener} accepting the resolution results.
+   */
+  public interface ResolutionResultListener {
+
+    /**
+     * Called after an attempt at name resolution.
+     *
+     * <p>Note! Implementations of this should return quickly and not throw exceptions.
+     *
+     * @param successful {@code true} if resolution was successful and the addresses were accepted.
+     */
+    void resolutionAttempted(boolean successful);
+  }
+
   /**
    * Information that a {@link Factory} uses to create a {@link NameResolver}.
    *
diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
index 5418a0b..7e552b2 100644
--- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java
+++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
@@ -318,6 +318,7 @@
           result = doResolve(false);
           if (result.error != null) {
             savedListener.onError(result.error);
+            fireResolutionResultEvent(false);
             return;
           }
           if (result.addresses != null) {
@@ -330,7 +331,7 @@
             resolutionResultBuilder.setAttributes(result.attributes);
           }
         }
-        savedListener.onResult(resolutionResultBuilder.build());
+        fireResolutionResultEvent(savedListener.onResult(resolutionResultBuilder.build()));
       } catch (IOException e) {
         savedListener.onError(
             Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
index 8078aa0..ef94811 100644
--- a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
+++ b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
@@ -47,19 +47,23 @@
   private static final String SCHEME = "dns";
 
   @Override
-  public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
+  public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
     if (SCHEME.equals(targetUri.getScheme())) {
       String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
       Preconditions.checkArgument(targetPath.startsWith("/"),
           "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
       String name = targetPath.substring(1);
-      return new DnsNameResolver(
-          targetUri.getAuthority(),
-          name,
-          args,
-          GrpcUtil.SHARED_CHANNEL_EXECUTOR,
-          Stopwatch.createUnstarted(),
-          InternalServiceProviders.isAndroid(getClass().getClassLoader()));
+      return new RetryingNameResolver(
+          new DnsNameResolver(
+              targetUri.getAuthority(),
+              name,
+              args,
+              GrpcUtil.SHARED_CHANNEL_EXECUTOR,
+              Stopwatch.createUnstarted(),
+              InternalServiceProviders.isAndroid(getClass().getClassLoader())),
+          new ExponentialBackoffPolicy.Provider(),
+          args.getScheduledExecutorService(),
+          args.getSynchronizationContext());
     } else {
       return null;
     }
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 0311a6d..f85eb00 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -280,6 +280,10 @@
   // Must be mutated and read from constructor or syncContext
   // used for channel tracing when value changed
   private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
+  // Must be mutated and read from constructor or syncContext
+  // Denotes if the last resolved addresses were accepted by the load balancer. A {@code null}
+  // value indicates no attempt has been made yet.
+  private Boolean lastAddressesAccepted;
 
   @Nullable
   private final ManagedChannelServiceConfig defaultServiceConfig;
@@ -367,7 +371,6 @@
       checkState(lbHelper != null, "lbHelper is null");
     }
     if (nameResolver != null) {
-      cancelNameResolverBackoff();
       nameResolver.shutdown();
       nameResolverStarted = false;
       if (channelIsActive) {
@@ -450,42 +453,10 @@
     idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
   }
 
-  // Run from syncContext
-  @VisibleForTesting
-  class DelayedNameResolverRefresh implements Runnable {
-    @Override
-    public void run() {
-      scheduledNameResolverRefresh = null;
-      refreshNameResolution();
-    }
-  }
-
-  // Must be used from syncContext
-  @Nullable private ScheduledHandle scheduledNameResolverRefresh;
-  // The policy to control backoff between name resolution attempts. Non-null when an attempt is
-  // scheduled. Must be used from syncContext
-  @Nullable private BackoffPolicy nameResolverBackoffPolicy;
-
-  // Must be run from syncContext
-  private void cancelNameResolverBackoff() {
-    syncContext.throwIfNotInThisSynchronizationContext();
-    if (scheduledNameResolverRefresh != null) {
-      scheduledNameResolverRefresh.cancel();
-      scheduledNameResolverRefresh = null;
-      nameResolverBackoffPolicy = null;
-    }
-  }
-
   /**
-   * Force name resolution refresh to happen immediately and reset refresh back-off. Must be run
+   * Force name resolution refresh to happen immediately. Must be run
    * from syncContext.
    */
-  private void refreshAndResetNameResolution() {
-    syncContext.throwIfNotInThisSynchronizationContext();
-    cancelNameResolverBackoff();
-    refreshNameResolution();
-  }
-
   private void refreshNameResolution() {
     syncContext.throwIfNotInThisSynchronizationContext();
     if (nameResolverStarted) {
@@ -1290,7 +1261,7 @@
   // Must be called from syncContext
   private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
     if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
-      refreshAndResetNameResolution();
+      refreshNameResolution();
     }
   }
 
@@ -1337,9 +1308,9 @@
         if (shutdown.get()) {
           return;
         }
-        if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
+        if (lastAddressesAccepted != null && !lastAddressesAccepted) {
           checkState(nameResolverStarted, "name resolver must be started");
-          refreshAndResetNameResolution();
+          refreshNameResolution();
         }
         for (InternalSubchannel subchannel : subchannels) {
           subchannel.resetConnectBackoff();
@@ -1495,7 +1466,7 @@
       final class LoadBalancerRefreshNameResolution implements Runnable {
         @Override
         public void run() {
-          refreshAndResetNameResolution();
+          ManagedChannelImpl.this.refreshNameResolution();
         }
       }
 
@@ -1736,7 +1707,7 @@
     }
 
     @Override
-    public void onResult(final ResolutionResult resolutionResult) {
+    public boolean onResult(final ResolutionResult resolutionResult) {
       final class NamesResolved implements Runnable {
 
         @SuppressWarnings("ReferenceEquality")
@@ -1745,6 +1716,7 @@
           if (ManagedChannelImpl.this.nameResolver != resolver) {
             return;
           }
+          lastAddressesAccepted = false;
 
           List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
           channelLogger.log(
@@ -1758,7 +1730,6 @@
             lastResolutionState = ResolutionState.SUCCESS;
           }
 
-          nameResolverBackoffPolicy = null;
           ConfigOrError configOrError = resolutionResult.getServiceConfig();
           InternalConfigSelector resolvedConfigSelector =
               resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
@@ -1816,6 +1787,7 @@
                 // we later check for these error codes when investigating pick results in
                 // GrpcUtil.getTransportFromPickResult().
                 onError(configOrError.getError());
+                lastAddressesAccepted = false;
                 return;
               } else {
                 effectiveServiceConfig = lastServiceConfig;
@@ -1859,21 +1831,24 @@
             }
             Attributes attributes = attrBuilder.build();
 
-            boolean addressesAccepted = helper.lb.tryAcceptResolvedAddresses(
+            lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
                 ResolvedAddresses.newBuilder()
                     .setAddresses(servers)
                     .setAttributes(attributes)
                     .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
                     .build());
-
-            if (!addressesAccepted) {
-              scheduleExponentialBackOffInSyncContext();
-            }
           }
         }
       }
 
       syncContext.execute(new NamesResolved());
+
+      // If NameResolved did not assign a value to lastAddressesAccepted, we assume there was an
+      // exception and set it to false.
+      if (lastAddressesAccepted == null) {
+        lastAddressesAccepted = false;
+      }
+      return lastAddressesAccepted;
     }
 
     @Override
@@ -1903,29 +1878,6 @@
       }
 
       helper.lb.handleNameResolutionError(error);
-
-      scheduleExponentialBackOffInSyncContext();
-    }
-
-    private void scheduleExponentialBackOffInSyncContext() {
-      if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
-        // The name resolver may invoke onError multiple times, but we only want to
-        // schedule one backoff attempt
-        // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
-        // want to reset the backoff interval upon repeated onError() calls
-        return;
-      }
-      if (nameResolverBackoffPolicy == null) {
-        nameResolverBackoffPolicy = backoffPolicyProvider.get();
-      }
-      long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
-      channelLogger.log(
-          ChannelLogLevel.DEBUG,
-          "Scheduling DNS resolution backoff for {0} ns", delayNanos);
-      scheduledNameResolverRefresh =
-          syncContext.schedule(
-              new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
-              transportFactory .getScheduledExecutorService());
     }
   }
 
diff --git a/core/src/main/java/io/grpc/internal/RetryScheduler.java b/core/src/main/java/io/grpc/internal/RetryScheduler.java
new file mode 100644
index 0000000..19b7958
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/RetryScheduler.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2022 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import io.grpc.SynchronizationContext;
+import io.grpc.SynchronizationContext.ScheduledHandle;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * Schedules a retry operation according to a {@link BackoffPolicy}. The retry is run within a
+ * {@link SynchronizationContext}. At most one retry is scheduled at a time.
+ */
+final class RetryScheduler {
+  private final Runnable retryOperation;
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final SynchronizationContext syncContext;
+  private final BackoffPolicy.Provider policyProvider;
+
+  private BackoffPolicy policy;
+  private ScheduledHandle scheduledHandle;
+
+  private static final Logger logger = Logger.getLogger(RetryScheduler.class.getName());
+
+  RetryScheduler(Runnable retryOperation, ScheduledExecutorService scheduledExecutorService,
+      SynchronizationContext syncContext, BackoffPolicy.Provider policyProvider) {
+    this.retryOperation = retryOperation;
+    this.scheduledExecutorService = scheduledExecutorService;
+    this.syncContext = syncContext;
+    this.policyProvider = policyProvider;
+  }
+
+  /**
+   * Schedules a future retry operation. Only allows one retry to be scheduled at any given time.
+   *
+   * @return The delay in nanos before the operation fires or -1 if it was not scheduled.
+   */
+  long schedule() {
+    if (policy == null) {
+      policy = policyProvider.get();
+    }
+    // If a retry is already scheduled, take no further action.
+    if (scheduledHandle != null && scheduledHandle.isPending()) {
+      return -1;
+    }
+    long delayNanos = policy.nextBackoffNanos();
+    scheduledHandle = syncContext.schedule(retryOperation, delayNanos, TimeUnit.NANOSECONDS,
+        scheduledExecutorService);
+    logger.fine("Scheduling DNS resolution backoff for " + delayNanos + "ns");
+
+    return delayNanos;
+  }
+
+  /**
+   * Resets the {@link RetryScheduler} and cancels any pending retry task. The policy will be
+   * cleared thus also resetting any state associated with it (e.g. a backoff multiplier).
+   */
+  void reset() {
+    if (scheduledHandle != null && scheduledHandle.isPending()) {
+      scheduledHandle.cancel();
+    }
+    policy = null;
+  }
+
+}
diff --git a/core/src/main/java/io/grpc/internal/RetryingNameResolver.java b/core/src/main/java/io/grpc/internal/RetryingNameResolver.java
new file mode 100644
index 0000000..2077899
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/RetryingNameResolver.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2022 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.NameResolver;
+import io.grpc.SynchronizationContext;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * This wrapper class can add retry capability to any polling {@link NameResolver} implementation
+ * that supports calling {@link ResolutionResultListener}s with the outcome of each resolution.
+ *
+ * <p>The {@link NameResolver} used with this
+ */
+final class RetryingNameResolver extends ForwardingNameResolver {
+
+  private final NameResolver retriedNameResolver;
+  private final RetryScheduler retryScheduler;
+
+  /**
+   * Creates a new {@link RetryingNameResolver}.
+   *
+   * @param retriedNameResolver A {@link NameResolver} that will have failed attempt retried.
+   * @param backoffPolicyProvider Provides the policy used to backoff from retry attempts
+   * @param scheduledExecutorService Executes any retry attempts
+   * @param syncContext All retries happen within the given {@code SyncContext}
+   */
+  RetryingNameResolver(NameResolver retriedNameResolver,
+      BackoffPolicy.Provider backoffPolicyProvider,
+      ScheduledExecutorService scheduledExecutorService,
+      SynchronizationContext syncContext) {
+    super(retriedNameResolver);
+    this.retriedNameResolver = retriedNameResolver;
+    this.retriedNameResolver.addResolutionResultListener(new RetryResolutionResultListener());
+    this.retryScheduler = new RetryScheduler(new DelayedNameResolverRefresh(),
+        scheduledExecutorService, syncContext, backoffPolicyProvider);
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    retryScheduler.reset();
+  }
+
+  /**
+   * @return The {@link NameResolver} that is getting its failed attempts retried.
+   */
+  public NameResolver getRetriedNameResolver() {
+    return retriedNameResolver;
+  }
+
+  @VisibleForTesting
+  class DelayedNameResolverRefresh implements Runnable {
+    @Override
+    public void run() {
+      refresh();
+    }
+  }
+
+  private class RetryResolutionResultListener implements ResolutionResultListener {
+
+    @Override
+    public void resolutionAttempted(boolean successful) {
+      if (successful) {
+        retryScheduler.reset();
+      } else {
+        retryScheduler.schedule();
+      }
+    }
+  }
+}
diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverProviderTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverProviderTest.java
index 5d127b7..b07d713 100644
--- a/core/src/test/java/io/grpc/internal/DnsNameResolverProviderTest.java
+++ b/core/src/test/java/io/grpc/internal/DnsNameResolverProviderTest.java
@@ -33,6 +33,8 @@
 /** Unit tests for {@link DnsNameResolverProvider}. */
 @RunWith(JUnit4.class)
 public class DnsNameResolverProviderTest {
+  private final FakeClock fakeClock = new FakeClock();
+
   private final SynchronizationContext syncContext = new SynchronizationContext(
       new Thread.UncaughtExceptionHandler() {
         @Override
@@ -46,6 +48,7 @@
       .setSynchronizationContext(syncContext)
       .setServiceConfigParser(mock(ServiceConfigParser.class))
       .setChannelLogger(mock(ChannelLogger.class))
+      .setScheduledExecutorService(fakeClock.getScheduledExecutorService())
       .build();
 
   private DnsNameResolverProvider provider = new DnsNameResolverProvider();
@@ -58,7 +61,9 @@
   @Test
   public void newNameResolver() {
     assertSame(DnsNameResolver.class,
-        provider.newNameResolver(URI.create("dns:///localhost:443"), args).getClass());
+        ((RetryingNameResolver) provider.newNameResolver(
+            URI.create("dns:///localhost:443"), args))
+            .getRetriedNameResolver().getClass());
     assertNull(
         provider.newNameResolver(URI.create("notdns:///localhost:443"), args));
   }
diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
index c7c0099..88a2b0c 100644
--- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
@@ -25,6 +25,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -111,17 +112,18 @@
           throw new AssertionError(e);
         }
       });
-  private final NameResolver.Args args = NameResolver.Args.newBuilder()
-      .setDefaultPort(DEFAULT_PORT)
-      .setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
-      .setSynchronizationContext(syncContext)
-      .setServiceConfigParser(mock(ServiceConfigParser.class))
-      .setChannelLogger(mock(ChannelLogger.class))
-      .build();
 
   private final DnsNameResolverProvider provider = new DnsNameResolverProvider();
   private final FakeClock fakeClock = new FakeClock();
   private final FakeClock fakeExecutor = new FakeClock();
+  private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
+      new FakeClock.TaskFilter() {
+        @Override
+        public boolean shouldAccept(Runnable command) {
+          return command.toString().contains(
+              RetryingNameResolver.DelayedNameResolverRefresh.class.getName());
+        }
+      };
 
   private final FakeExecutorResource fakeExecutorResource = new FakeExecutorResource();
 
@@ -138,6 +140,15 @@
     public void close(Executor instance) {}
   }
 
+  private final NameResolver.Args args = NameResolver.Args.newBuilder()
+      .setDefaultPort(DEFAULT_PORT)
+      .setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
+      .setSynchronizationContext(syncContext)
+      .setServiceConfigParser(mock(ServiceConfigParser.class))
+      .setChannelLogger(mock(ChannelLogger.class))
+      .setScheduledExecutorService(fakeExecutor.getScheduledExecutorService())
+      .build();
+
   @Mock
   private NameResolver.Listener2 mockListener;
   @Captor
@@ -149,18 +160,18 @@
   @Mock
   private RecordFetcher recordFetcher;
 
-  private DnsNameResolver newResolver(String name, int defaultPort) {
+  private RetryingNameResolver newResolver(String name, int defaultPort) {
     return newResolver(
         name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted());
   }
 
-  private DnsNameResolver newResolver(String name, int defaultPort, boolean isAndroid) {
+  private RetryingNameResolver newResolver(String name, int defaultPort, boolean isAndroid) {
     return newResolver(
         name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(),
         isAndroid);
   }
 
-  private DnsNameResolver newResolver(
+  private RetryingNameResolver newResolver(
       String name,
       int defaultPort,
       ProxyDetector proxyDetector,
@@ -168,7 +179,7 @@
     return newResolver(name, defaultPort, proxyDetector, stopwatch, false);
   }
 
-  private DnsNameResolver newResolver(
+  private RetryingNameResolver newResolver(
       String name,
       final int defaultPort,
       final ProxyDetector proxyDetector,
@@ -181,21 +192,29 @@
             .setSynchronizationContext(syncContext)
             .setServiceConfigParser(mock(ServiceConfigParser.class))
             .setChannelLogger(mock(ChannelLogger.class))
+            .setScheduledExecutorService(fakeExecutor.getScheduledExecutorService())
             .build();
     return newResolver(name, stopwatch, isAndroid, args);
   }
 
-  private DnsNameResolver newResolver(
+  private RetryingNameResolver newResolver(
       String name,
       Stopwatch stopwatch,
       boolean isAndroid,
       NameResolver.Args args) {
-    DnsNameResolver dnsResolver =
-        new DnsNameResolver(
-            null, name, args, fakeExecutorResource, stopwatch, isAndroid);
+    DnsNameResolver dnsResolver = new DnsNameResolver(null, name, args, fakeExecutorResource,
+        stopwatch, isAndroid);
     // By default, using the mocked ResourceResolver to avoid I/O
     dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher));
-    return dnsResolver;
+
+    // In practice the DNS name resolver provider always wraps the resolver in a
+    // RetryingNameResolver which adds retry capabilities to it. We use the same setup here.
+    return new RetryingNameResolver(
+        dnsResolver,
+        new ExponentialBackoffPolicy.Provider(),
+        fakeExecutor.getScheduledExecutorService(),
+        syncContext
+        );
   }
 
   @Before
@@ -203,6 +222,9 @@
     DnsNameResolver.enableJndi = true;
     networkaddressCacheTtlPropertyValue =
         System.getProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
+
+    // By default the mock listener processes the result successfully.
+    when(mockListener.onResult(isA(ResolutionResult.class))).thenReturn(true);
   }
 
   @After
@@ -216,12 +238,6 @@
     }
   }
 
-  @After
-  public void noMorePendingTasks() {
-    assertEquals(0, fakeClock.numPendingTasks());
-    assertEquals(0, fakeExecutor.numPendingTasks());
-  }
-
   @Test
   public void invalidDnsName() throws Exception {
     testInvalidUri(new URI("dns", null, "/[invalid]", null));
@@ -287,10 +303,11 @@
     final List<InetAddress> answer2 = createAddressList(1);
     String name = "foo.googleapis.com";
 
-    DnsNameResolver resolver = newResolver(name, 81, isAndroid);
+    RetryingNameResolver resolver = newResolver(name, 81, isAndroid);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString())).thenReturn(answer1).thenReturn(answer2);
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -303,6 +320,7 @@
     verify(mockListener, times(2)).onResult(resultCaptor.capture());
     assertAnswerMatches(answer2, 81, resultCaptor.getValue());
     assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
 
     resolver.shutdown();
 
@@ -313,16 +331,18 @@
   public void testExecutor_default() throws Exception {
     final List<InetAddress> answer = createAddressList(2);
 
-    DnsNameResolver resolver = newResolver("foo.googleapis.com", 81);
+    RetryingNameResolver resolver = newResolver("foo.googleapis.com", 81);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString())).thenReturn(answer);
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onResult(resultCaptor.capture());
     assertAnswerMatches(answer, 81, resultCaptor.getValue());
     assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
 
     resolver.shutdown();
 
@@ -341,6 +361,7 @@
             .setSynchronizationContext(syncContext)
             .setServiceConfigParser(mock(ServiceConfigParser.class))
             .setChannelLogger(mock(ChannelLogger.class))
+            .setScheduledExecutorService(fakeExecutor.getScheduledExecutorService())
             .setOffloadExecutor(
                 new Executor() {
                   @Override
@@ -351,17 +372,19 @@
                 })
             .build();
 
-    DnsNameResolver resolver =
-        newResolver("foo.googleapis.com", Stopwatch.createUnstarted(), false, args);
+    RetryingNameResolver resolver = newResolver(
+        "foo.googleapis.com", Stopwatch.createUnstarted(), false, args);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString())).thenReturn(answer);
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(0, fakeExecutor.runDueTasks());
     verify(mockListener).onResult(resultCaptor.capture());
     assertAnswerMatches(answer, 81, resultCaptor.getValue());
     assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
 
     resolver.shutdown();
 
@@ -376,13 +399,14 @@
     String name = "foo.googleapis.com";
     FakeTicker fakeTicker = new FakeTicker();
 
-    DnsNameResolver resolver =
-        newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    RetryingNameResolver resolver = newResolver(
+        name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString()))
         .thenReturn(answer1)
         .thenThrow(new AssertionError("should not called twice"));
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -409,13 +433,14 @@
     String name = "foo.googleapis.com";
     FakeTicker fakeTicker = new FakeTicker();
 
-    DnsNameResolver resolver =
-        newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    RetryingNameResolver resolver = newResolver(
+        name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString()))
         .thenReturn(answer)
         .thenThrow(new AssertionError("should not reach here."));
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -444,12 +469,13 @@
     String name = "foo.googleapis.com";
     FakeTicker fakeTicker = new FakeTicker();
 
-    DnsNameResolver resolver =
-        newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    RetryingNameResolver resolver = newResolver(
+        name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString())).thenReturn(answer1)
         .thenReturn(answer2);
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -463,6 +489,7 @@
     verify(mockListener, times(2)).onResult(resultCaptor.capture());
     assertAnswerMatches(answer2, 81, resultCaptor.getValue());
     assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
 
     resolver.shutdown();
 
@@ -487,11 +514,12 @@
     String name = "foo.googleapis.com";
     FakeTicker fakeTicker = new FakeTicker();
 
-    DnsNameResolver resolver =
-        newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    RetryingNameResolver resolver = newResolver(
+        name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockResolver = mock(AddressResolver.class);
     when(mockResolver.resolveAddress(anyString())).thenReturn(answer1).thenReturn(answer2);
-    resolver.setAddressResolver(mockResolver);
+    dnsResolver.setAddressResolver(mockResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -511,6 +539,7 @@
     verify(mockListener, times(2)).onResult(resultCaptor.capture());
     assertAnswerMatches(answer2, 81, resultCaptor.getValue());
     assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
 
     resolver.shutdown();
 
@@ -520,8 +549,9 @@
   @Test
   public void resolve_emptyResult() throws Exception {
     DnsNameResolver.enableTxt = true;
-    DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443);
-    nr.setAddressResolver(new AddressResolver() {
+    RetryingNameResolver resolver = newResolver("dns:///addr.fake:1234", 443);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(new AddressResolver() {
       @Override
       public List<InetAddress> resolveAddress(String host) throws Exception {
         return Collections.emptyList();
@@ -531,9 +561,9 @@
     when(mockResourceResolver.resolveTxt(anyString()))
         .thenReturn(Collections.<String>emptyList());
 
-    nr.setResourceResolver(mockResourceResolver);
+    dnsResolver.setResourceResolver(mockResourceResolver);
 
-    nr.start(mockListener);
+    resolver.start(mockListener);
     assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
 
     ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
@@ -543,6 +573,45 @@
     assertThat(ac.getValue().getAttributes()).isEqualTo(Attributes.EMPTY);
     assertThat(ac.getValue().getServiceConfig()).isNull();
     verify(mockResourceResolver, never()).resolveSrv(anyString());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
+  }
+
+  // Load balancer rejects the empty addresses.
+  @Test
+  public void resolve_emptyResult_notAccepted() throws Exception {
+    when(mockListener.onResult(isA(ResolutionResult.class))).thenReturn(false);
+
+    DnsNameResolver.enableTxt = true;
+    RetryingNameResolver resolver = newResolver("dns:///addr.fake:1234", 443);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(new AddressResolver() {
+      @Override
+      public List<InetAddress> resolveAddress(String host) throws Exception {
+        return Collections.emptyList();
+      }
+    });
+    ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
+    when(mockResourceResolver.resolveTxt(anyString()))
+        .thenReturn(Collections.<String>emptyList());
+
+    dnsResolver.setResourceResolver(mockResourceResolver);
+
+    resolver.start(mockListener);
+    assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
+
+    ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
+    verify(mockListener).onResult(ac.capture());
+    verifyNoMoreInteractions(mockListener);
+    assertThat(ac.getValue().getAddresses()).isEmpty();
+    assertThat(ac.getValue().getAttributes()).isEqualTo(Attributes.EMPTY);
+    assertThat(ac.getValue().getServiceConfig()).isNull();
+    verify(mockResourceResolver, never()).resolveSrv(anyString());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    // A retry should be scheduled
+    assertThat(fakeExecutor.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)).isEqualTo(1);
   }
 
   @Test
@@ -554,9 +623,10 @@
         .thenReturn(Collections.singletonList(backendAddr));
     String name = "foo.googleapis.com";
 
-    DnsNameResolver resolver = newResolver(name, 81);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(null);
+    RetryingNameResolver resolver = newResolver(name, 81);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(null);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onResult(resultCaptor.capture());
@@ -568,6 +638,9 @@
     verify(mockAddressResolver).resolveAddress(name);
     assertThat(result.getAttributes()).isEqualTo(Attributes.EMPTY);
     assertThat(result.getServiceConfig()).isNull();
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
   }
 
   @Test
@@ -578,15 +651,20 @@
         .thenThrow(new IOException("no addr"));
     String name = "foo.googleapis.com";
 
-    DnsNameResolver resolver = newResolver(name, 81);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(null);
+    RetryingNameResolver resolver = newResolver(name, 81);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(null);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onError(errorCaptor.capture());
     Status errorStatus = errorCaptor.getValue();
     assertThat(errorStatus.getCode()).isEqualTo(Code.UNAVAILABLE);
     assertThat(errorStatus.getCause()).hasMessageThat().contains("no addr");
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    // A retry should be scheduled
+    assertThat(fakeExecutor.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)).isEqualTo(1);
   }
 
   @Test
@@ -613,12 +691,14 @@
             .setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
             .setSynchronizationContext(syncContext)
             .setServiceConfigParser(serviceConfigParser)
+            .setScheduledExecutorService(fakeClock.getScheduledExecutorService())
             .build();
 
     String name = "foo.googleapis.com";
-    DnsNameResolver resolver = newResolver(name, Stopwatch.createUnstarted(), false, args);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(mockResourceResolver);
+    RetryingNameResolver resolver = newResolver(name, Stopwatch.createUnstarted(), false, args);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(mockResourceResolver);
 
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
@@ -631,6 +711,9 @@
     assertThat(result.getServiceConfig().getConfig()).isNotNull();
     verify(mockAddressResolver).resolveAddress(name);
     verify(mockResourceResolver).resolveTxt("_grpc_config." + name);
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
   }
 
   @Test
@@ -642,9 +725,10 @@
     String name = "foo.googleapis.com";
 
     ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
-    DnsNameResolver resolver = newResolver(name, 81);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(mockResourceResolver);
+    NameResolver resolver = newResolver(name, 81).getRetriedNameResolver();
+    DnsNameResolver dnsResolver = (DnsNameResolver)resolver;
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(mockResourceResolver);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onError(errorCaptor.capture());
@@ -652,6 +736,10 @@
     assertThat(errorStatus.getCode()).isEqualTo(Code.UNAVAILABLE);
     assertThat(errorStatus.getCause()).hasMessageThat().contains("no addr");
     verify(mockResourceResolver, never()).resolveTxt(anyString());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    // A retry should be scheduled
+    assertThat(fakeExecutor.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER)).isEqualTo(1);
   }
 
   @Test
@@ -666,9 +754,10 @@
     when(mockResourceResolver.resolveTxt(anyString()))
         .thenThrow(new Exception("something like javax.naming.NamingException"));
 
-    DnsNameResolver resolver = newResolver(name, 81);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(mockResourceResolver);
+    RetryingNameResolver resolver = newResolver(name, 81);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(mockResourceResolver);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onResult(resultCaptor.capture());
@@ -681,6 +770,9 @@
     assertThat(result.getAttributes()).isEqualTo(Attributes.EMPTY);
     assertThat(result.getServiceConfig()).isNull();
     verify(mockResourceResolver).resolveTxt(anyString());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
   }
 
   @Test
@@ -695,9 +787,10 @@
     when(mockResourceResolver.resolveTxt(anyString()))
         .thenReturn(Collections.singletonList("grpc_config=something invalid"));
 
-    DnsNameResolver resolver = newResolver(name, 81);
-    resolver.setAddressResolver(mockAddressResolver);
-    resolver.setResourceResolver(mockResourceResolver);
+    RetryingNameResolver resolver = newResolver(name, 81);
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
+    dnsResolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setResourceResolver(mockResourceResolver);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
     verify(mockListener).onResult(resultCaptor.capture());
@@ -711,6 +804,9 @@
     assertThat(result.getServiceConfig()).isNotNull();
     assertThat(result.getServiceConfig().getError()).isNotNull();
     verify(mockResourceResolver).resolveTxt(anyString());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
   }
 
   @Test
@@ -757,11 +853,12 @@
               .setPassword("password").build();
         }
       };
-    DnsNameResolver resolver =
-        newResolver(name, port, alwaysDetectProxy, Stopwatch.createUnstarted());
+    RetryingNameResolver resolver = newResolver(
+        name, port, alwaysDetectProxy, Stopwatch.createUnstarted());
+    DnsNameResolver dnsResolver = (DnsNameResolver) resolver.getRetriedNameResolver();
     AddressResolver mockAddressResolver = mock(AddressResolver.class);
     when(mockAddressResolver.resolveAddress(anyString())).thenThrow(new AssertionError());
-    resolver.setAddressResolver(mockAddressResolver);
+    dnsResolver.setAddressResolver(mockAddressResolver);
     resolver.start(mockListener);
     assertEquals(1, fakeExecutor.runDueTasks());
 
@@ -777,6 +874,9 @@
     assertEquals("username", socketAddress.getUsername());
     assertEquals("password", socketAddress.getPassword());
     assertTrue(socketAddress.getTargetAddress().isUnresolved());
+
+    assertEquals(0, fakeClock.numPendingTasks());
+    assertEquals(0, fakeExecutor.numPendingTasks());
   }
 
   @Test
@@ -1185,7 +1285,8 @@
   }
 
   private void testValidUri(URI uri, String exportedAuthority, int expectedPort) {
-    DnsNameResolver resolver = provider.newNameResolver(uri, args);
+    DnsNameResolver resolver = (DnsNameResolver) ((RetryingNameResolver) provider.newNameResolver(
+        uri, args)).getRetriedNameResolver();
     assertNotNull(resolver);
     assertEquals(expectedPort, resolver.getPort());
     assertEquals(exportedAuthority, resolver.getServiceAuthority());
diff --git a/core/src/test/java/io/grpc/internal/ForwardingNameResolverTest.java b/core/src/test/java/io/grpc/internal/ForwardingNameResolverTest.java
index f8f9ed2..8aca8c6 100644
--- a/core/src/test/java/io/grpc/internal/ForwardingNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/ForwardingNameResolverTest.java
@@ -80,8 +80,8 @@
   public void start_observer() {
     NameResolver.Listener2 listener = new NameResolver.Listener2() {
       @Override
-      public void onResult(ResolutionResult result) {
-
+      public boolean onResult(ResolutionResult result) {
+        return true;
       }
 
       @Override
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 09be9a9..3fe6e8c 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -55,7 +55,6 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
@@ -205,14 +204,6 @@
   private final FakeClock timer = new FakeClock();
   private final FakeClock executor = new FakeClock();
   private final FakeClock balancerRpcExecutor = new FakeClock();
-  private static final FakeClock.TaskFilter NAME_RESOLVER_REFRESH_TASK_FILTER =
-      new FakeClock.TaskFilter() {
-        @Override
-        public boolean shouldAccept(Runnable command) {
-          return command.toString().contains(
-              ManagedChannelImpl.DelayedNameResolverRefresh.class.getName());
-        }
-      };
 
   private final InternalChannelz channelz = new InternalChannelz();
 
@@ -309,10 +300,6 @@
         numExpectedTasks += 1;
       }
 
-      if (getNameResolverRefresh() != null) {
-        numExpectedTasks += 1;
-      }
-
       assertEquals(numExpectedTasks, timer.numPendingTasks());
 
       ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
@@ -1061,139 +1048,6 @@
   }
 
   @Test
-  public void nameResolutionFailed() {
-    Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
-    FakeNameResolverFactory nameResolverFactory =
-        new FakeNameResolverFactory.Builder(expectedUri)
-            .setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
-            .setError(error)
-            .build();
-    channelBuilder.nameResolverFactory(nameResolverFactory);
-    // Name resolution is started as soon as channel is created.
-    createChannel();
-    FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
-    verify(mockLoadBalancer).handleNameResolutionError(same(error));
-    assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
-
-    timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
-    assertEquals(0, resolver.refreshCalled);
-
-    timer.forwardNanos(1);
-    assertEquals(1, resolver.refreshCalled);
-    verify(mockLoadBalancer, times(2)).handleNameResolutionError(same(error));
-
-    // Verify an additional name resolution failure does not schedule another timer
-    resolver.refresh();
-    verify(mockLoadBalancer, times(3)).handleNameResolutionError(same(error));
-    assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
-
-    // Allow the next refresh attempt to succeed
-    resolver.error = null;
-
-    // For the second attempt, the backoff should occur at RECONNECT_BACKOFF_INTERVAL_NANOS * 2
-    timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS * 2 - 1);
-    assertEquals(2, resolver.refreshCalled);
-    timer.forwardNanos(1);
-    assertEquals(3, resolver.refreshCalled);
-    assertEquals(0, timer.numPendingTasks());
-
-    // Verify that the successful resolution reset the backoff policy
-    resolver.listener.onError(error);
-    timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS - 1);
-    assertEquals(3, resolver.refreshCalled);
-    timer.forwardNanos(1);
-    assertEquals(4, resolver.refreshCalled);
-    assertEquals(0, timer.numPendingTasks());
-  }
-
-  @Test
-  public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
-    Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
-
-    FakeNameResolverFactory nameResolverFactory =
-        new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
-    channelBuilder.nameResolverFactory(nameResolverFactory);
-    // Name resolution is started as soon as channel is created.
-    createChannel();
-    verify(mockLoadBalancer).handleNameResolutionError(same(error));
-
-    FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
-    assertNotNull(nameResolverBackoff);
-    assertFalse(nameResolverBackoff.isCancelled());
-
-    // Add a pending call to the delayed transport
-    ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
-    Metadata headers = new Metadata();
-    call.start(mockCallListener, headers);
-
-    // The pending call on the delayed transport stops the name resolver backoff from cancelling
-    channel.shutdown();
-    assertFalse(nameResolverBackoff.isCancelled());
-
-    // Notify that a subchannel is ready, which drains the delayed transport
-    SubchannelPicker picker = mock(SubchannelPicker.class);
-    Status status = Status.UNAVAILABLE.withDescription("for test");
-    when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
-        .thenReturn(PickResult.withDrop(status));
-    updateBalancingStateSafely(helper, READY, picker);
-    executor.runDueTasks();
-    verify(mockCallListener).onClose(same(status), any(Metadata.class));
-
-    assertTrue(nameResolverBackoff.isCancelled());
-  }
-
-  @Test
-  public void nameResolverReturnsEmptySubLists_resolutionRetry() throws Exception {
-    // The mock LB is set to reject the addresses.
-    when(mockLoadBalancer.acceptResolvedAddresses(isA(ResolvedAddresses.class))).thenReturn(false);
-
-    // Pass a FakeNameResolverFactory with an empty list and LB config
-    FakeNameResolverFactory nameResolverFactory =
-        new FakeNameResolverFactory.Builder(expectedUri).build();
-    Map<String, Object> rawServiceConfig =
-        parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": { \"setting1\": \"high\" } } ] }");
-    ManagedChannelServiceConfig parsedServiceConfig =
-        createManagedChannelServiceConfig(rawServiceConfig, null);
-    nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig));
-    channelBuilder.nameResolverFactory(nameResolverFactory);
-    createChannel();
-
-    // A resolution retry has been scheduled
-    assertEquals(1, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
-  }
-
-  @Test
-  public void nameResolverReturnsEmptySubLists_optionallyAllowed() throws Exception {
-    // Pass a FakeNameResolverFactory with an empty list and LB config
-    FakeNameResolverFactory nameResolverFactory =
-        new FakeNameResolverFactory.Builder(expectedUri).build();
-    String rawLbConfig = "{ \"setting1\": \"high\" }";
-    Object parsedLbConfig = new Object();
-    Map<String, Object> rawServiceConfig =
-        parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": " + rawLbConfig + " } ] }");
-    ManagedChannelServiceConfig parsedServiceConfig =
-        createManagedChannelServiceConfig(
-            rawServiceConfig,
-            new PolicySelection(
-                mockLoadBalancerProvider,
-                parsedLbConfig));
-    nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig));
-    channelBuilder.nameResolverFactory(nameResolverFactory);
-    createChannel();
-
-    // LoadBalancer received the empty list and the LB config
-    verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
-    ArgumentCaptor<ResolvedAddresses> resultCaptor =
-        ArgumentCaptor.forClass(ResolvedAddresses.class);
-    verify(mockLoadBalancer).acceptResolvedAddresses(resultCaptor.capture());
-    assertThat(resultCaptor.getValue().getAddresses()).isEmpty();
-    assertThat(resultCaptor.getValue().getLoadBalancingPolicyConfig()).isEqualTo(parsedLbConfig);
-
-    // A no resolution retry
-    assertEquals(0, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
-  }
-
-  @Test
   public void loadBalancerThrowsInHandleResolvedAddresses() {
     RuntimeException ex = new RuntimeException("simulated");
     // Delay the success of name resolution until allResolved() is called
@@ -3017,36 +2871,6 @@
   }
 
   @Test
-  public void resetConnectBackoff() {
-    // Start with a name resolution failure to trigger backoff attempts
-    Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
-    FakeNameResolverFactory nameResolverFactory =
-        new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
-    channelBuilder.nameResolverFactory(nameResolverFactory);
-    // Name resolution is started as soon as channel is created.
-    createChannel();
-    FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
-    verify(mockLoadBalancer).handleNameResolutionError(same(error));
-
-    FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
-    assertNotNull("There should be a name resolver backoff task", nameResolverBackoff);
-    assertEquals(0, resolver.refreshCalled);
-
-    // Verify resetConnectBackoff() calls refresh and cancels the scheduled backoff
-    channel.resetConnectBackoff();
-    assertEquals(1, resolver.refreshCalled);
-    assertTrue(nameResolverBackoff.isCancelled());
-
-    // Simulate a race between cancel and the task scheduler. Should be a no-op.
-    nameResolverBackoff.command.run();
-    assertEquals(1, resolver.refreshCalled);
-
-    // Verify that the reconnect policy was recreated and the backoff multiplier reset to 1
-    timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS);
-    assertEquals(2, resolver.refreshCalled);
-  }
-
-  @Test
   public void resetConnectBackoff_noOpWithoutPendingResolverBackoff() {
     FakeNameResolverFactory nameResolverFactory =
         new FakeNameResolverFactory.Builder(expectedUri)
@@ -4514,10 +4338,6 @@
     return instrumented.getStats().get();
   }
 
-  private FakeClock.ScheduledTask getNameResolverRefresh() {
-    return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
-  }
-
   // Helper methods to call methods from SynchronizationContext
   private static Subchannel createSubchannelSafely(
       final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs,
diff --git a/core/src/test/java/io/grpc/internal/ServiceConfigErrorHandlingTest.java b/core/src/test/java/io/grpc/internal/ServiceConfigErrorHandlingTest.java
index f592ebc..0baa617 100644
--- a/core/src/test/java/io/grpc/internal/ServiceConfigErrorHandlingTest.java
+++ b/core/src/test/java/io/grpc/internal/ServiceConfigErrorHandlingTest.java
@@ -98,7 +98,7 @@
         @Override
         public boolean shouldAccept(Runnable command) {
           return command.toString().contains(
-              ManagedChannelImpl.DelayedNameResolverRefresh.class.getName());
+              RetryingNameResolver.DelayedNameResolverRefresh.class.getName());
         }
       };
 
@@ -542,7 +542,7 @@
     final URI expectedUri;
     final List<EquivalentAddressGroup> servers;
     final boolean resolvedAtStart;
-    final ArrayList<FakeNameResolver> resolvers = new ArrayList<>();
+    final ArrayList<RetryingNameResolver> resolvers = new ArrayList<>();
     final AtomicReference<Map<String, ?>> nextRawServiceConfig = new AtomicReference<>();
     final AtomicReference<Attributes> nextAttributes = new AtomicReference<>(Attributes.EMPTY);
 
@@ -561,7 +561,11 @@
         return null;
       }
       assertEquals(DEFAULT_PORT, args.getDefaultPort());
-      FakeNameResolver resolver = new FakeNameResolver(args.getServiceConfigParser());
+      RetryingNameResolver resolver = new RetryingNameResolver(
+          new FakeNameResolver(args.getServiceConfigParser()),
+          new FakeBackoffPolicyProvider(),
+          args.getScheduledExecutorService(),
+          args.getSynchronizationContext());
       resolvers.add(resolver);
       return resolver;
     }
@@ -572,8 +576,8 @@
     }
 
     void allResolved() {
-      for (FakeNameResolver resolver : resolvers) {
-        resolver.resolved();
+      for (RetryingNameResolver resolver : resolvers) {
+        ((FakeNameResolver)resolver.getRetriedNameResolver()).resolved();
       }
     }
 
@@ -613,7 +617,7 @@
               .setServiceConfig(serviceConfigParser.parseServiceConfig(rawServiceConfig));
         }
 
-        listener.onResult(builder.build());
+        fireResolutionResultEvent(listener.onResult(builder.build()));
       }
 
       @Override public void shutdown() {
@@ -647,7 +651,8 @@
   }
 
   private FakeClock.ScheduledTask getNameResolverRefresh() {
-    return Iterables.getOnlyElement(timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
+    return Iterables.getOnlyElement(
+       timer.getPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER), null);
   }
 
   private static class FakeLoadBalancer extends LoadBalancer {
diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
index 3af58ef..00673ef 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
@@ -602,7 +602,7 @@
 
       private class NameResolverListener extends NameResolver.Listener2 {
         @Override
-        public void onResult(final ResolutionResult resolutionResult) {
+        public boolean onResult(final ResolutionResult resolutionResult) {
           class NameResolved implements Runnable {
             @Override
             public void run() {
@@ -634,6 +634,7 @@
           }
 
           syncContext.execute(new NameResolved());
+          return true;
         }
 
         @Override