xds: Update logic so that an error being reported when stream is closed gets propagated to subscribers (#9827)
* Stop setting waitForReady in XdsClient's AbstractXdsClient.
* Handle bad URL cleanly.
Fix test cases to deal with asynchronous flow.
diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
index f6b8277..b538461 100644
--- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
@@ -61,6 +61,8 @@
* the xDS RPC stream.
*/
final class AbstractXdsClient {
+
+ public static final String CLOSED_BY_SERVER = "Closed by server";
private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
@@ -217,6 +219,11 @@
return;
}
+ if (isInBackoff()) {
+ rpcRetryTimer.cancel();
+ rpcRetryTimer = null;
+ }
+
timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
}
@@ -315,21 +322,25 @@
}
final void handleRpcCompleted() {
- handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
+ handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
}
private void handleRpcStreamClosed(Status error) {
- checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
+
+ checkArgument(!error.isOk(), "unexpected OK status");
+ String errorMsg = error.getDescription() != null
+ && error.getDescription().equals(CLOSED_BY_SERVER)
+ ? "ADS stream closed with status {0}: {1}. Cause: {2}"
+ : "ADS stream failed with status {0}: {1}. Cause: {2}";
logger.log(
- XdsLogLevel.ERROR,
- "ADS stream closed with status {0}: {1}. Cause: {2}",
- error.getCode(), error.getDescription(), error.getCause());
+ XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
closed = true;
xdsResponseHandler.handleStreamClosed(error);
cleanUp();
+
if (responseReceived || retryBackoffPolicy == null) {
// Reset the backoff sequence if had received a response, or backoff sequence
// has never been initialized.
@@ -423,7 +434,7 @@
});
}
};
- requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);
+ requestWriter = stub.streamAggregatedResources(responseReader);
}
@Override
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
index e7ed64a..e67bff1 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java
@@ -514,11 +514,22 @@
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
- maybeCreateXdsChannelWithLrs(serverInfo);
- this.xdsChannel = serverChannelMap.get(serverInfo);
- if (xdsChannel.isInBackoff()) {
+
+ AbstractXdsClient xdsChannelTemp = null;
+ try {
+ maybeCreateXdsChannelWithLrs(serverInfo);
+ xdsChannelTemp = serverChannelMap.get(serverInfo);
+ if (xdsChannelTemp.isInBackoff()) {
+ return;
+ }
+ } catch (IllegalArgumentException e) {
+ xdsChannelTemp = null;
+ this.errorDescription = "Bad configuration: " + e.getMessage();
return;
+ } finally {
+ this.xdsChannel = xdsChannelTemp;
}
+
restartTimer();
}
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
index 3ee6c91..0f18d3d 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
@@ -18,6 +18,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
+import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
@@ -70,6 +71,7 @@
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
+import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
@@ -114,6 +116,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
@@ -3226,7 +3229,8 @@
// Management server closes the RPC stream with an error.
call.sendError(Status.UNKNOWN.asException());
- verify(ldsResourceWatcher).onError(errorCaptor.capture());
+ verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
+ .onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, "");
@@ -3336,7 +3340,8 @@
RDS_RESOURCE, rdsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendError(Status.UNAVAILABLE.asException());
- verify(ldsResourceWatcher).onError(errorCaptor.capture());
+ verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
+ .onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
verify(rdsResourceWatcher).onError(errorCaptor.capture());
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, "");
@@ -3573,13 +3578,18 @@
.build()
.start());
fakeClock.forwardTime(5, TimeUnit.SECONDS);
+ verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
+ fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
+ if (call == null) { // The first rpcRetry may have happened before the channel was ready
+ fakeClock.forwardTime(50, TimeUnit.SECONDS);
+ call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
+ }
// NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect
// so you cannot rely on the logic being single threaded. The timeout() in verifyRequest
// is therefore necessary to avoid flakiness.
// Send a response and do verifications
- verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
@@ -3592,6 +3602,66 @@
}
}
+ @Test
+ public void sendToBadUrl() throws Exception {
+ // Setup xdsClient to fail on stream creation
+ XdsClientImpl client = createXdsClient("some. garbage");
+
+ client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
+ fakeClock.forwardTime(20, TimeUnit.SECONDS);
+ verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
+ client.shutdown();
+ }
+
+ @Test
+ public void sendToNonexistentHost() throws Exception {
+ // Setup xdsClient to fail on stream creation
+ XdsClientImpl client = createXdsClient("some.garbage");
+ client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
+ fakeClock.forwardTime(20, TimeUnit.SECONDS);
+
+ verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any());
+ fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate
+ assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
+ client.shutdown();
+ }
+
+ private XdsClientImpl createXdsClient(String serverUri) {
+ BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
+ return new XdsClientImpl(
+ DEFAULT_XDS_CHANNEL_FACTORY,
+ bootstrapInfo,
+ Context.ROOT,
+ fakeClock.getScheduledExecutorService(),
+ backoffPolicyProvider,
+ fakeClock.getStopwatchSupplier(),
+ timeProvider,
+ tlsContextManager);
+ }
+
+ private BootstrapInfo buildBootStrap(String serverUri) {
+
+ ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS,
+ ignoreResourceDeletion());
+
+ return Bootstrapper.BootstrapInfo.builder()
+ .servers(Collections.singletonList(xdsServerInfo))
+ .node(NODE)
+ .authorities(ImmutableMap.of(
+ "authority.xds.com",
+ AuthorityInfo.create(
+ "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
+ ImmutableList.of(Bootstrapper.ServerInfo.create(
+ SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))),
+ "",
+ AuthorityInfo.create(
+ "xdstp:///envoy.config.listener.v3.Listener/%s",
+ ImmutableList.of(Bootstrapper.ServerInfo.create(
+ SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
+ .certProviders(ImmutableMap.of("cert-instance-name",
+ CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
+ .build();
+ }
private <T extends ResourceUpdate> DiscoveryRpcCall startResourceWatcher(
XdsResourceType<T> type, String name, ResourceWatcher<T> watcher) {