xds: Update logic so that an error being reported when stream is closed gets propagated to subscribers  (#9827)

* Stop setting waitForReady in XdsClient's AbstractXdsClient.
* Handle bad URL cleanly.  

Fix test cases to deal with asynchronous flow.
diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
index f6b8277..b538461 100644
--- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
@@ -61,6 +61,8 @@
  * the xDS RPC stream.
  */
 final class AbstractXdsClient {
+
+  public static final String CLOSED_BY_SERVER = "Closed by server";
   private final SynchronizationContext syncContext;
   private final InternalLogId logId;
   private final XdsLogger logger;
@@ -217,6 +219,11 @@
       return;
     }
 
+    if (isInBackoff()) {
+      rpcRetryTimer.cancel();
+      rpcRetryTimer = null;
+    }
+
     timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
   }
 
@@ -315,21 +322,25 @@
     }
 
     final void handleRpcCompleted() {
-      handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
+      handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
     }
 
     private void handleRpcStreamClosed(Status error) {
-      checkArgument(!error.isOk(), "unexpected OK status");
       if (closed) {
         return;
       }
+
+      checkArgument(!error.isOk(), "unexpected OK status");
+      String errorMsg = error.getDescription() != null
+          && error.getDescription().equals(CLOSED_BY_SERVER)
+              ? "ADS stream closed with status {0}: {1}. Cause: {2}"
+              : "ADS stream failed with status {0}: {1}. Cause: {2}";
       logger.log(
-          XdsLogLevel.ERROR,
-          "ADS stream closed with status {0}: {1}. Cause: {2}",
-          error.getCode(), error.getDescription(), error.getCause());
+          XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
       closed = true;
       xdsResponseHandler.handleStreamClosed(error);
       cleanUp();
+
       if (responseReceived || retryBackoffPolicy == null) {
         // Reset the backoff sequence if had received a response, or backoff sequence
         // has never been initialized.
@@ -423,7 +434,7 @@
           });
         }
       };
-      requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);
+      requestWriter = stub.streamAggregatedResources(responseReader);
     }
 
     @Override
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
index e7ed64a..e67bff1 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
@@ -514,11 +514,22 @@
       // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
       // is created but not yet requested because the client is in backoff.
       this.metadata = ResourceMetadata.newResourceMetadataUnknown();
-      maybeCreateXdsChannelWithLrs(serverInfo);
-      this.xdsChannel = serverChannelMap.get(serverInfo);
-      if (xdsChannel.isInBackoff()) {
+
+      AbstractXdsClient xdsChannelTemp = null;
+      try {
+        maybeCreateXdsChannelWithLrs(serverInfo);
+        xdsChannelTemp = serverChannelMap.get(serverInfo);
+        if (xdsChannelTemp.isInBackoff()) {
+          return;
+        }
+      } catch (IllegalArgumentException e) {
+        xdsChannelTemp = null;
+        this.errorDescription = "Bad configuration:  " + e.getMessage();
         return;
+      } finally {
+        this.xdsChannel = xdsChannelTemp;
       }
+
       restartTimer();
     }
 
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
index 3ee6c91..0f18d3d 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
@@ -18,6 +18,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
@@ -70,6 +71,7 @@
 import io.grpc.stub.StreamObserver;
 import io.grpc.testing.GrpcCleanupRule;
 import io.grpc.xds.Bootstrapper.AuthorityInfo;
+import io.grpc.xds.Bootstrapper.BootstrapInfo;
 import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
 import io.grpc.xds.Bootstrapper.ServerInfo;
 import io.grpc.xds.Endpoints.DropOverload;
@@ -114,6 +116,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.Mock;
@@ -3226,7 +3229,8 @@
 
     // Management server closes the RPC stream with an error.
     call.sendError(Status.UNKNOWN.asException());
-    verify(ldsResourceWatcher).onError(errorCaptor.capture());
+    verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
+        .onError(errorCaptor.capture());
     verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
     verify(rdsResourceWatcher).onError(errorCaptor.capture());
     verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
@@ -3336,7 +3340,8 @@
         RDS_RESOURCE, rdsResourceWatcher);
     DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
     call.sendError(Status.UNAVAILABLE.asException());
-    verify(ldsResourceWatcher).onError(errorCaptor.capture());
+    verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
+        .onError(errorCaptor.capture());
     verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
     verify(rdsResourceWatcher).onError(errorCaptor.capture());
     verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
@@ -3573,13 +3578,18 @@
               .build()
               .start());
       fakeClock.forwardTime(5, TimeUnit.SECONDS);
+      verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
+      fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer
       DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
+      if (call == null) { // The first rpcRetry may have happened before the channel was ready
+        fakeClock.forwardTime(50, TimeUnit.SECONDS);
+        call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
+      }
 
       // NOTE:  There is a ScheduledExecutorService that may get involved due to the reconnect
       // so you cannot rely on the logic being single threaded.  The timeout() in verifyRequest
       // is therefore necessary to avoid flakiness.
       // Send a response and do verifications
-      verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
       call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001");
       call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE);
       verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
@@ -3592,6 +3602,66 @@
     }
   }
 
+  @Test
+  public void sendToBadUrl() throws Exception {
+    // Setup xdsClient to fail on stream creation
+    XdsClientImpl client = createXdsClient("some. garbage");
+
+    client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
+    fakeClock.forwardTime(20, TimeUnit.SECONDS);
+    verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
+    client.shutdown();
+  }
+
+  @Test
+  public void sendToNonexistentHost() throws Exception {
+    // Setup xdsClient to fail on stream creation
+    XdsClientImpl client = createXdsClient("some.garbage");
+    client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
+    fakeClock.forwardTime(20, TimeUnit.SECONDS);
+
+    verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
+    fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate
+    assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
+    client.shutdown();
+  }
+
+  private XdsClientImpl createXdsClient(String serverUri) {
+    BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
+    return new XdsClientImpl(
+        DEFAULT_XDS_CHANNEL_FACTORY,
+        bootstrapInfo,
+        Context.ROOT,
+        fakeClock.getScheduledExecutorService(),
+        backoffPolicyProvider,
+        fakeClock.getStopwatchSupplier(),
+        timeProvider,
+        tlsContextManager);
+  }
+
+  private  BootstrapInfo buildBootStrap(String serverUri) {
+
+    ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
+        ignoreResourceDeletion());
+
+    return Bootstrapper.BootstrapInfo.builder()
+        .servers(Collections.singletonList(xdsServerInfo))
+        .node(NODE)
+        .authorities(ImmutableMap.of(
+            "authority.xds.com",
+            AuthorityInfo.create(
+                "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
+                ImmutableList.of(Bootstrapper.ServerInfo.create(
+                    SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
+            "",
+            AuthorityInfo.create(
+                "xdstp:///envoy.config.listener.v3.Listener/%s",
+                ImmutableList.of(Bootstrapper.ServerInfo.create(
+                    SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
+        .certProviders(ImmutableMap.of("cert-instance-name",
+            CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
+        .build();
+  }
 
   private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
       XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {