xds: implement the new v3 and old fallback server xDS API (#7553)

diff --git a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
index 0a33d4b..ae746b7 100644
--- a/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
+++ b/xds/src/main/java/io/grpc/xds/ServerXdsClient.java
@@ -23,9 +23,11 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.envoyproxy.envoy.config.core.v3.Address;
+import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
 import io.envoyproxy.envoy.config.listener.v3.FilterChain;
 import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch;
 import io.envoyproxy.envoy.config.listener.v3.Listener;
@@ -56,14 +58,18 @@
   @Nullable
   private ListenerWatcher listenerWatcher;
   private int listenerPort = -1;
+  private final boolean newServerApi;
+  @Nullable private final String instanceIp;
   @Nullable
   private ScheduledHandle ldsRespTimer;
 
   ServerXdsClient(XdsChannel channel, Node node, SynchronizationContext syncContext,
       ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
-      Supplier<Stopwatch> stopwatchSupplier) {
+      Supplier<Stopwatch> stopwatchSupplier, boolean newServerApi, String instanceIp) {
     super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
     this.syncContext = checkNotNull(syncContext, "syncContext");
+    this.newServerApi = channel.isUseProtocolV3() && newServerApi;
+    this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
   }
 
   @Override
@@ -73,7 +79,9 @@
     checkArgument(port > 0, "port needs to be > 0");
     this.listenerPort = port;
     getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
-    updateNodeMetadataForListenerRequest(port);
+    if (!newServerApi) {
+      updateNodeMetadataForListenerRequest(port);
+    }
     adjustResourceSubscription(ResourceType.LDS);
     if (!isInBackoff()) {
       ldsRespTimer =
@@ -87,10 +95,13 @@
   @Nullable
   @Override
   Collection<String> getSubscribedResources(ResourceType type) {
-    if (type != ResourceType.LDS || listenerWatcher == null) {
-      return null;
+    if (newServerApi) {
+      String listeningAddress = instanceIp + ":" + listenerPort;
+      String resourceName = "grpc/server?udpa.resource.listening_address=" + listeningAddress;
+      return ImmutableList.<String>of(resourceName);
+    } else {
+      return Collections.emptyList();
     }
-    return Collections.emptyList();
   }
 
   /** In case of Listener watcher metadata to be updated to include port. */
@@ -99,12 +110,10 @@
     if (node.getMetadata() != null) {
       newMetadata.putAll(node.getMetadata());
     }
-    newMetadata.put("TRAFFICDIRECTOR_PROXYLESS", "1");
-    // TODO(sanjaypujare): eliminate usage of listening_addresses.
-    EnvoyProtoData.Address listeningAddress =
-        new EnvoyProtoData.Address("0.0.0.0", port);
-    node =
-        node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build();
+    newMetadata.put("TRAFFICDIRECTOR_INBOUND_INTERCEPTION_PORT", "15001");
+    newMetadata.put("TRAFFICDIRECTOR_INBOUND_BACKEND_PORTS", "" + port);
+    newMetadata.put("INSTANCE_IP", instanceIp);
+    node = node.toBuilder().setMetadata(newMetadata).build();
   }
 
   @Override
@@ -156,15 +165,18 @@
   }
 
   private boolean isRequestedListener(Listener listener) {
-    // TODO(sanjaypujare): check listener.getName() once we know what xDS server returns
+    if (newServerApi) {
+      return "TRAFFICDIRECTOR_INBOUND_LISTENER".equals(listener.getName())
+              && listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
+              && hasMatchingFilter(listener.getFilterChainsList());
+    }
     return isAddressMatching(listener.getAddress())
         && hasMatchingFilter(listener.getFilterChainsList());
   }
 
   private boolean isAddressMatching(Address address) {
-    // TODO(sanjaypujare): check IP address once we know xDS server will include it
-    return address.hasSocketAddress()
-        && (address.getSocketAddress().getPortValue() == listenerPort);
+    return newServerApi || (address.hasSocketAddress()
+            && (address.getSocketAddress().getPortValue() == 15001));
   }
 
   private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {
diff --git a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java
index eaefbb6..60547ec 100644
--- a/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java
+++ b/xds/src/main/java/io/grpc/xds/XdsClientWrapperForServerSds.java
@@ -39,6 +39,7 @@
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
+import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -68,6 +69,10 @@
   private static final TimeServiceResource timeServiceResource =
       new TimeServiceResource("GrpcServerXdsClient");
 
+  @VisibleForTesting
+  static boolean experimentalNewServerApiEnvVar = Boolean.parseBoolean(
+          System.getenv("GRPC_XDS_EXPERIMENTAL_NEW_SERVER_API"));
+
   private EnvoyServerProtoData.Listener curListener;
   @SuppressWarnings("unused")
   @Nullable private XdsClient xdsClient;
@@ -135,6 +140,12 @@
     }
     Node node = bootstrapInfo.getNode();
     timeService = SharedResourceHolder.get(timeServiceResource);
+    String instanceIp;
+    try {
+      instanceIp = Inet4Address.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      instanceIp = "0.0.0.0";
+    }
     XdsClient xdsClientImpl =
         new ServerXdsClient(
             channel,
@@ -142,7 +153,9 @@
             createSynchronizationContext(),
             timeService,
             new ExponentialBackoffPolicy.Provider(),
-            GrpcUtil.STOPWATCH_SUPPLIER);
+            GrpcUtil.STOPWATCH_SUPPLIER,
+            experimentalNewServerApiEnvVar,
+            instanceIp);
     start(xdsClientImpl);
   }
 
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
new file mode 100644
index 0000000..dba035c
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientNewServerApiTest.java
@@ -0,0 +1,690 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.xds.XdsClientTestHelper.buildDiscoveryResponse;
+import static io.grpc.xds.XdsClientTestHelper.buildListener;
+import static io.grpc.xds.XdsClientTestHelper.buildRouteConfiguration;
+import static io.grpc.xds.XdsClientTestHelper.buildVirtualHost;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UInt32Value;
+import io.envoyproxy.envoy.config.core.v3.CidrRange;
+import io.envoyproxy.envoy.config.core.v3.SocketAddress;
+import io.envoyproxy.envoy.config.core.v3.TrafficDirection;
+import io.envoyproxy.envoy.config.core.v3.TransportSocket;
+import io.envoyproxy.envoy.config.listener.v3.Filter;
+import io.envoyproxy.envoy.config.listener.v3.FilterChain;
+import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch;
+import io.envoyproxy.envoy.config.listener.v3.Listener;
+import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
+import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
+import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext;
+import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
+import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
+import io.grpc.Context;
+import io.grpc.Context.CancellationListener;
+import io.grpc.ManagedChannel;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.SynchronizationContext;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.BackoffPolicy;
+import io.grpc.internal.FakeClock;
+import io.grpc.internal.FakeClock.TaskFilter;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.xds.AbstractXdsClient.ResourceType;
+import io.grpc.xds.EnvoyProtoData.Node;
+import io.grpc.xds.XdsClient.ListenerUpdate;
+import io.grpc.xds.XdsClient.ListenerWatcher;
+import io.grpc.xds.XdsClient.XdsChannel;
+import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link ServerXdsClient for server side Listeners using newServerApi}.
+ */
+@RunWith(JUnit4.class)
+public class ServerXdsClientNewServerApiTest {
+
+  private static final int PORT = 7000;
+  private static final String LOCAL_IP = "192.168.3.5";
+  private static final String INSTANCE_IP = "192.168.3.7";
+  private static final String TYPE_URL_HCM =
+      "type.googleapis.com/"
+          + "envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager";
+
+  private static final Node NODE = Node.newBuilder().build();
+  private static final TaskFilter RPC_RETRY_TASK_FILTER =
+      new TaskFilter() {
+        @Override
+        public boolean shouldAccept(Runnable command) {
+          return command.toString().contains(AbstractXdsClient.RpcRetryTask.class.getSimpleName());
+        }
+      };
+  private static final TaskFilter LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER =
+      new TaskFilter() {
+        @Override
+        public boolean shouldAccept(Runnable command) {
+          return command.toString()
+              .contains(ServerXdsClient.ListenerResourceFetchTimeoutTask.class.getSimpleName());
+        }
+      };
+  private static final String LISTENER_NAME = "TRAFFICDIRECTOR_INBOUND_LISTENER";
+
+  @Rule
+  public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
+
+  private final SynchronizationContext syncContext = new SynchronizationContext(
+      new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+          throw new AssertionError(e);
+        }
+      });
+  private final FakeClock fakeClock = new FakeClock();
+
+  private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
+  private final Queue<StreamObserver<DiscoveryRequest>> requestObservers = new ArrayDeque<>();
+  private final AtomicBoolean callEnded = new AtomicBoolean(true);
+
+  @Mock
+  private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
+  @Mock
+  private BackoffPolicy.Provider backoffPolicyProvider;
+  @Mock
+  private BackoffPolicy backoffPolicy1;
+  @Mock
+  private BackoffPolicy backoffPolicy2;
+  @Mock
+  private ListenerWatcher listenerWatcher;
+
+  private ManagedChannel channel;
+  private XdsClient xdsClient;
+
+  @Before
+  public void setUp() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
+    when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
+    when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
+
+    final String serverName = InProcessServerBuilder.generateName();
+    AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() {
+      @Override
+      public StreamObserver<DiscoveryRequest> streamAggregatedResources(
+          final StreamObserver<DiscoveryResponse> responseObserver) {
+        assertThat(callEnded.get()).isTrue();  // ensure previous call was ended
+        callEnded.set(false);
+        Context.current().addListener(
+            new CancellationListener() {
+              @Override
+              public void cancelled(Context context) {
+                callEnded.set(true);
+              }
+            }, MoreExecutors.directExecutor());
+        responseObservers.offer(responseObserver);
+        @SuppressWarnings("unchecked")
+        StreamObserver<DiscoveryRequest> requestObserver = mock(StreamObserver.class);
+        requestObservers.offer(requestObserver);
+        return requestObserver;
+      }
+    };
+    mockedDiscoveryService =
+        mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(adsServiceImpl));
+
+    cleanupRule.register(
+        InProcessServerBuilder
+            .forName(serverName)
+            .addService(mockedDiscoveryService)
+            .directExecutor()
+            .build()
+            .start());
+    channel =
+        cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
+
+    xdsClient =
+        new ServerXdsClient(new XdsChannel(channel, /* useProtocolV3= */ true), NODE,
+            syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
+            fakeClock.getStopwatchSupplier(), true, INSTANCE_IP);
+    // Only the connection to management server is established, no RPC request is sent until at
+    // least one watcher is registered.
+    assertThat(responseObservers).isEmpty();
+    assertThat(requestObservers).isEmpty();
+  }
+
+  @After
+  public void tearDown() {
+    xdsClient.shutdown();
+    assertThat(callEnded.get()).isTrue();
+    assertThat(channel.isShutdown()).isTrue();
+    assertThat(fakeClock.getPendingTasks()).isEmpty();
+  }
+
+  private static DiscoveryRequest buildDiscoveryRequest(
+          Node node, String versionInfo, List<String> resourceNames, String typeUrl, String nonce) {
+    return DiscoveryRequest.newBuilder()
+        .setVersionInfo(versionInfo)
+        .setNode(node.toEnvoyProtoNode())
+        .setTypeUrl(typeUrl)
+        .setResponseNonce(nonce)
+        .addAllResourceNames(resourceNames)
+        .build();
+  }
+
+  /**
+   * Client receives an LDS response that contains listener with no match i.e. no port match.
+   */
+  @Test
+  public void ldsResponse_nonMatchingFilterChain_notFoundError() {
+    xdsClient.watchListenerData(PORT, listenerWatcher);
+    StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
+    StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
+
+    // Client sends an LDS request with null in lds resource name
+    verify(requestObserver)
+        .onNext(eq(XdsClientTestHelper.buildDiscoveryRequest(NODE, "",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+    assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
+
+    List<Any> listeners = ImmutableList.of(
+        Any.pack(buildListener("bar.googleapis.com",
+            Any.pack(HttpConnectionManager.newBuilder()
+                .setRouteConfig(
+                    buildRouteConfiguration("route-bar.googleapis.com",
+                        ImmutableList.of(
+                            buildVirtualHost(
+                                ImmutableList.of("bar.googleapis.com"),
+                                "cluster-bar.googleapis.com"))))
+                .build()))),
+        Any.pack(buildListener(LISTENER_NAME,
+            Any.pack(HttpConnectionManager.newBuilder()
+                .setRouteConfig(
+                    buildRouteConfiguration("route-baz.googleapis.com",
+                        ImmutableList.of(
+                            buildVirtualHost(
+                                ImmutableList.of("baz.googleapis.com"),
+                                "cluster-baz.googleapis.com"))))
+                .build()))));
+    DiscoveryResponse response =
+        buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
+    responseObserver.onNext(response);
+
+    // Client sends an ACK LDS request.
+    verify(requestObserver)
+        .onNext(eq(XdsClientTestHelper.buildDiscoveryRequest(NODE, "0",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "0000")));
+
+    verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
+    verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
+    verify(listenerWatcher, never()).onError(any(Status.class));
+    fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
+    verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
+    assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
+  }
+
+  /** Client receives a Listener with all match. */
+  @Test
+  public void ldsResponseWith_matchingListenerFound() throws InvalidProtocolBufferException {
+    xdsClient.watchListenerData(PORT, listenerWatcher);
+    StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
+    StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
+
+    // Client sends an LDS request with null in lds resource name
+    verify(requestObserver)
+        .onNext(
+            eq(
+                XdsClientTestHelper.buildDiscoveryRequest(
+                    NODE,
+                    "",
+                    ImmutableList.of(
+                        "grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                    ResourceType.LDS.typeUrl(),
+                    "")));
+    assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(1);
+
+    final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
+    final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
+        CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
+            .setPrefixLen(UInt32Value.of(32)).build()),
+        CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
+            "ROOTCA"),
+        buildTestFilter("envoy.http_connection_manager"));
+    List<Any> listeners = ImmutableList.of(
+        Any.pack(buildListener("bar.googleapis.com",
+            Any.pack(HttpConnectionManager.newBuilder()
+                .setRouteConfig(
+                    buildRouteConfiguration("route-bar.googleapis.com",
+                        ImmutableList.of(
+                            buildVirtualHost(
+                                ImmutableList.of("bar.googleapis.com"),
+                                "cluster-bar.googleapis.com"))))
+                .build()))),
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
+            filterChainOutbound,
+            filterChainInbound
+        )));
+    DiscoveryResponse response =
+        buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
+    responseObserver.onNext(response);
+
+    // Client sends an ACK LDS request.
+    verify(requestObserver)
+        .onNext(
+            eq(
+                XdsClientTestHelper.buildDiscoveryRequest(
+                    NODE,
+                    "0",
+                    ImmutableList.of(
+                        "grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                    ResourceType.LDS.typeUrl(),
+                    "0000")));
+
+    ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
+    verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
+    ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
+    EnvoyServerProtoData.Listener listener = configUpdate.getListener();
+    assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
+    assertThat(listener.getAddress()).isEqualTo("0.0.0.0:15001");
+    assertThat(listener.getFilterChains()).hasSize(2);
+    EnvoyServerProtoData.FilterChain filterChainOutboundInListenerUpdate
+        = listener.getFilterChains().get(0);
+    assertThat(filterChainOutboundInListenerUpdate.getFilterChainMatch().getDestinationPort())
+        .isEqualTo(8000);
+    EnvoyServerProtoData.FilterChain filterChainInboundInListenerUpdate
+        = listener.getFilterChains().get(1);
+    EnvoyServerProtoData.FilterChainMatch inBoundfilterChainMatch =
+        filterChainInboundInListenerUpdate.getFilterChainMatch();
+    assertThat(inBoundfilterChainMatch.getDestinationPort()).isEqualTo(PORT);
+    assertThat(inBoundfilterChainMatch.getPrefixRanges()).containsExactly(
+        new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
+    CommonTlsContext downstreamCommonTlsContext =
+        filterChainInboundInListenerUpdate.getDownstreamTlsContext().getCommonTlsContext();
+    assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
+        .isEqualTo("google-sds-config-default");
+    assertThat(
+            downstreamCommonTlsContext
+                .getCombinedValidationContext()
+                .getValidationContextSdsSecretConfig()
+                .getName())
+        .isEqualTo("ROOTCA");
+    assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
+  }
+
+  /** Client receives LDS responses for updating Listener previously received. */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void notifyUpdatedListener() throws InvalidProtocolBufferException {
+    xdsClient.watchListenerData(PORT, listenerWatcher);
+    StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
+    StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
+
+    final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
+    final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
+        CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
+            .setPrefixLen(UInt32Value.of(32)).build()),
+        CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
+            "ROOTCA"),
+        buildTestFilter("envoy.http_connection_manager"));
+    List<Any> listeners = ImmutableList.of(
+        Any.pack(buildListener("bar.googleapis.com",
+            Any.pack(HttpConnectionManager.newBuilder()
+                .setRouteConfig(
+                    buildRouteConfiguration("route-bar.googleapis.com",
+                        ImmutableList.of(
+                            buildVirtualHost(
+                                ImmutableList.of("bar.googleapis.com"),
+                                "cluster-bar.googleapis.com"))))
+                .build()))),
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
+            filterChainOutbound,
+            filterChainInbound
+        )));
+    DiscoveryResponse response =
+        buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
+    responseObserver.onNext(response);
+
+    ArgumentCaptor<ListenerUpdate> listenerUpdateCaptor = ArgumentCaptor.forClass(null);
+    verify(listenerWatcher, times(1)).onListenerChanged(listenerUpdateCaptor.capture());
+
+    reset(requestObserver);
+    // Management server sends another LDS response with updates for Listener.
+    final FilterChain filterChainNewInbound = buildFilterChain(buildFilterChainMatch(PORT,
+        CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
+            .setPrefixLen(UInt32Value.of(32)).build()),
+        CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default1",
+            "ROOTCA2"),
+        buildTestFilter("envoy.http_connection_manager"));
+    List<Any> listeners1 =
+        ImmutableList.of(
+            Any.pack(
+                buildListenerWithFilterChain(
+                    LISTENER_NAME, 15001, "0.0.0.0", filterChainNewInbound)));
+    DiscoveryResponse response1 =
+        buildDiscoveryResponse("1", listeners1, ResourceType.LDS.typeUrl(), "0001");
+    responseObserver.onNext(response1);
+
+    // Client sends an ACK LDS request.
+    verify(requestObserver)
+        .onNext(
+            eq(
+                XdsClientTestHelper.buildDiscoveryRequest(
+                    NODE,
+                    "1",
+                    ImmutableList.of(
+                        "grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                    ResourceType.LDS.typeUrl(),
+                    "0001")));
+
+    // Updated listener is notified to listener watcher.
+    listenerUpdateCaptor = ArgumentCaptor.forClass(null);
+    verify(listenerWatcher, times(2)).onListenerChanged(listenerUpdateCaptor.capture());
+    ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
+    EnvoyServerProtoData.Listener listener = configUpdate.getListener();
+    assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
+    assertThat(listener.getFilterChains()).hasSize(1);
+    EnvoyServerProtoData.FilterChain filterChain =
+        Iterables.getOnlyElement(listener.getFilterChains());
+    EnvoyServerProtoData.FilterChainMatch filterChainMatch = filterChain.getFilterChainMatch();
+    assertThat(filterChainMatch.getDestinationPort()).isEqualTo(PORT);
+    assertThat(filterChainMatch.getPrefixRanges()).containsExactly(
+        new EnvoyServerProtoData.CidrRange(LOCAL_IP, 32));
+    CommonTlsContext downstreamCommonTlsContext =
+        filterChain.getDownstreamTlsContext().getCommonTlsContext();
+    assertThat(downstreamCommonTlsContext.getTlsCertificateSdsSecretConfigs(0).getName())
+        .isEqualTo("google-sds-config-default1");
+    assertThat(
+            downstreamCommonTlsContext
+                .getCombinedValidationContext()
+                .getValidationContextSdsSecretConfig()
+                .getName())
+        .isEqualTo("ROOTCA2");
+  }
+
+  /** Client receives LDS response containing non-matching port in the filterMatch. */
+  @Test
+  public void ldsResponse_nonMatchingPort() {
+    xdsClient.watchListenerData(PORT, listenerWatcher);
+    StreamObserver<DiscoveryResponse> responseObserver = responseObservers.poll();
+    requestObservers.poll();
+
+    final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(8000), null);
+    final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(
+        PORT + 1,  // add 1 to mismatch
+        CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
+            .setPrefixLen(UInt32Value.of(32)).build()),
+
+        CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
+            "ROOTCA"),
+        buildTestFilter("envoy.http_connection_manager"));
+    List<Any> listeners = ImmutableList.of(
+        Any.pack(buildListener("bar.googleapis.com",
+            Any.pack(HttpConnectionManager.newBuilder()
+                .setRouteConfig(
+                    buildRouteConfiguration("route-bar.googleapis.com",
+                        ImmutableList.of(
+                            buildVirtualHost(
+                                ImmutableList.of("bar.googleapis.com"),
+                                "cluster-bar.googleapis.com"))))
+                .build()))),
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
+            filterChainInbound,
+            filterChainOutbound
+        )));
+    DiscoveryResponse response =
+        buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
+    responseObserver.onNext(response);
+
+    verify(listenerWatcher, never()).onListenerChanged(any(ListenerUpdate.class));
+    verify(listenerWatcher, never()).onResourceDoesNotExist(":" + PORT);
+    verify(listenerWatcher, never()).onError(any(Status.class));
+    fakeClock.forwardTime(ServerXdsClient.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS);
+    verify(listenerWatcher).onResourceDoesNotExist(":" + PORT);
+    assertThat(fakeClock.getPendingTasks(LISTENER_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
+  }
+
+  /**
+   * RPC stream close and retry while there is listener watcher registered.
+   */
+  @Test
+  public void streamClosedAndRetry() {
+    InOrder inOrder =
+        Mockito.inOrder(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+            backoffPolicy2);
+    xdsClient.watchListenerData(PORT, listenerWatcher);
+
+    ArgumentCaptor<StreamObserver<DiscoveryResponse>> responseObserverCaptor =
+        ArgumentCaptor.forClass(null);
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    StreamObserver<DiscoveryResponse> responseObserver =
+        responseObserverCaptor.getValue();  // same as responseObservers.poll()
+    requestObservers.poll();
+
+    final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
+    final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
+        CidrRange.newBuilder().setAddressPrefix(LOCAL_IP)
+            .setPrefixLen(UInt32Value.of(32)).build()),
+        CommonTlsContextTestsUtil.buildTestDownstreamTlsContext("google-sds-config-default",
+            "ROOTCA"),
+        buildTestFilter("envoy.http_connection_manager"));
+    List<Any> listeners = ImmutableList.of(
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
+            filterChainOutbound,
+            filterChainInbound
+        )));
+    DiscoveryResponse response =
+        buildDiscoveryResponse("0", listeners, ResourceType.LDS.typeUrl(), "0000");
+    responseObserver.onNext(response);
+
+    // Client sent an ACK CDS request (Omitted).
+
+    ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
+
+    // Management server closes the RPC stream with an error.
+    responseObserver.onError(Status.UNKNOWN.asException());
+    verify(listenerWatcher).onError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNKNOWN);
+
+    // Resets backoff and retry immediately.
+    inOrder.verify(backoffPolicyProvider).get();
+    fakeClock.runDueTasks();
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    responseObserver = responseObserverCaptor.getValue();
+    StreamObserver<DiscoveryRequest> requestObserver = requestObservers.poll();
+
+    // Retry resumes requests for all wanted resources.
+    verify(requestObserver)
+        .onNext(eq(buildDiscoveryRequest(NODE, "0",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+
+    // Management server becomes unreachable.
+    responseObserver.onError(Status.UNAVAILABLE.asException());
+    verify(listenerWatcher, times(2)).onError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
+    inOrder.verify(backoffPolicy1).nextBackoffNanos();
+    assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+    // Retry after backoff.
+    fakeClock.forwardNanos(9L);
+    assertThat(requestObservers).isEmpty();
+    fakeClock.forwardNanos(1L);
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    responseObserver = responseObserverCaptor.getValue();
+    requestObserver = requestObservers.poll();
+    verify(requestObserver)
+        .onNext(eq(buildDiscoveryRequest(NODE, "0",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+
+    // Management server is still not reachable.
+    responseObserver.onError(Status.UNAVAILABLE.asException());
+    verify(listenerWatcher, times(3)).onError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
+    inOrder.verify(backoffPolicy1).nextBackoffNanos();
+    assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+    // Retry after backoff.
+    fakeClock.forwardNanos(99L);
+    assertThat(requestObservers).isEmpty();
+    fakeClock.forwardNanos(1L);
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    responseObserver = responseObserverCaptor.getValue();
+    requestObserver = requestObservers.poll();
+    verify(requestObserver)
+        .onNext(eq(buildDiscoveryRequest(NODE, "0",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+
+    // Management server sends back a LDS response.
+    response = buildDiscoveryResponse("1", listeners,
+        ResourceType.LDS.typeUrl(), "0001");
+    responseObserver.onNext(response);
+
+    // Client sent an LDS ACK request (Omitted).
+
+    // Management server closes the RPC stream.
+    responseObserver.onCompleted();
+    verify(listenerWatcher, times(4)).onError(any(Status.class));
+
+    // Resets backoff and retry immediately
+    inOrder.verify(backoffPolicyProvider).get();
+    fakeClock.runDueTasks();
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    responseObserver = responseObserverCaptor.getValue();
+    requestObserver = requestObservers.poll();
+
+    verify(requestObserver)
+        .onNext(eq(buildDiscoveryRequest(NODE, "1",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+
+    // Management server becomes unreachable again.
+    responseObserver.onError(Status.UNAVAILABLE.asException());
+    verify(listenerWatcher, times(5)).onError(statusCaptor.capture());
+    assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
+    inOrder.verify(backoffPolicy2).nextBackoffNanos();
+    assertThat(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)).hasSize(1);
+
+    // Retry after backoff.
+    fakeClock.forwardNanos(19L);
+    assertThat(requestObservers).isEmpty();
+    fakeClock.forwardNanos(1L);
+    inOrder.verify(mockedDiscoveryService)
+        .streamAggregatedResources(responseObserverCaptor.capture());
+    requestObserver = requestObservers.poll();
+    verify(requestObserver)
+        .onNext(eq(buildDiscoveryRequest(NODE, "1",
+                ImmutableList.of("grpc/server?udpa.resource.listening_address=192.168.3.7:7000"),
+                ResourceType.LDS.typeUrl(), "")));
+
+    verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
+        backoffPolicy2);
+  }
+
+  static Listener buildListenerWithFilterChain(String name, int portValue, String address,
+                                               FilterChain... filterChains) {
+    io.envoyproxy.envoy.config.core.v3.Address listenerAddress =
+            io.envoyproxy.envoy.config.core.v3.Address.newBuilder()
+            .setSocketAddress(
+                SocketAddress.newBuilder().setPortValue(portValue).setAddress(address))
+            .build();
+    return
+        Listener.newBuilder()
+            .setName(name)
+            .setAddress(listenerAddress)
+            .addAllFilterChains(Arrays.asList(filterChains))
+            .setTrafficDirection(TrafficDirection.INBOUND)
+            .build();
+  }
+
+  @SuppressWarnings("deprecation")
+  static FilterChain buildFilterChain(FilterChainMatch filterChainMatch,
+                                      DownstreamTlsContext tlsContext, Filter...filters) {
+    return FilterChain.newBuilder()
+        .setFilterChainMatch(filterChainMatch)
+        .setTransportSocket(
+            tlsContext == null
+                ? TransportSocket.getDefaultInstance()
+                : TransportSocket.newBuilder()
+                    .setName("envoy.transport_sockets.tls")
+                    .setTypedConfig(Any.pack(tlsContext))
+                    .build())
+        .addAllFilters(Arrays.asList(filters))
+        .build();
+  }
+
+  static FilterChainMatch buildFilterChainMatch(int destPort, CidrRange...prefixRanges) {
+    return
+        FilterChainMatch.newBuilder()
+            .setDestinationPort(UInt32Value.of(destPort))
+            .addAllPrefixRanges(Arrays.asList(prefixRanges))
+            .build();
+  }
+
+  static Filter buildTestFilter(String name) {
+    return
+        Filter.newBuilder()
+            .setName(name)
+            .setTypedConfig(
+                Any.newBuilder()
+                .setTypeUrl(TYPE_URL_HCM))
+            .build();
+  }
+}
diff --git a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
index 94cf6d9..587d32e 100644
--- a/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
+++ b/xds/src/test/java/io/grpc/xds/ServerXdsClientTest.java
@@ -65,7 +65,6 @@
 import io.grpc.stub.StreamObserver;
 import io.grpc.testing.GrpcCleanupRule;
 import io.grpc.xds.AbstractXdsClient.ResourceType;
-import io.grpc.xds.EnvoyProtoData.Address;
 import io.grpc.xds.EnvoyProtoData.Node;
 import io.grpc.xds.XdsClient.ListenerUpdate;
 import io.grpc.xds.XdsClient.ListenerWatcher;
@@ -102,6 +101,7 @@
   private static final int PORT = 7000;
   private static final String LOCAL_IP = "192.168.3.5";
   private static final String DIFFERENT_IP = "192.168.3.6";
+  private static final String INSTANCE_IP = "192.168.3.7";
   private static final String TYPE_URL_HCM =
       "type.googleapis.com/"
           + "envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager";
@@ -198,7 +198,7 @@
     xdsClient =
         new ServerXdsClient(new XdsChannel(channel, /* useProtocolV3= */ false), NODE,
             syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
-            fakeClock.getStopwatchSupplier());
+            fakeClock.getStopwatchSupplier(), false, INSTANCE_IP);
     // Only the connection to management server is established, no RPC request is sent until at
     // least one watcher is registered.
     assertThat(responseObservers).isEmpty();
@@ -218,11 +218,11 @@
     if (NODE.getMetadata() != null) {
       newMetadata.putAll(NODE.getMetadata());
     }
-    newMetadata.put("TRAFFICDIRECTOR_PROXYLESS", "1");
-    Address listeningAddress = new Address("0.0.0.0", PORT);
+    newMetadata.put("TRAFFICDIRECTOR_INBOUND_INTERCEPTION_PORT", "15001");
+    newMetadata.put("TRAFFICDIRECTOR_INBOUND_BACKEND_PORTS", "" + PORT);
+    newMetadata.put("INSTANCE_IP", INSTANCE_IP);
     return NODE.toBuilder()
         .setMetadata(newMetadata)
-        .addListeningAddresses(listeningAddress)
         .build();
   }
 
@@ -332,7 +332,7 @@
                                 ImmutableList.of("bar.googleapis.com"),
                                 "cluster-bar.googleapis.com"))))
                 .build()))),
-        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15002, "0.0.0.0",
             filterChainOutbound,
             filterChainInbound
         )));
@@ -384,7 +384,7 @@
                                 ImmutableList.of("bar.googleapis.com"),
                                 "cluster-bar.googleapis.com"))))
                 .build()))),
-        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
             filterChainOutbound,
             filterChainInbound
         )));
@@ -402,7 +402,7 @@
     ListenerUpdate configUpdate = listenerUpdateCaptor.getValue();
     EnvoyServerProtoData.Listener listener = configUpdate.getListener();
     assertThat(listener.getName()).isEqualTo(LISTENER_NAME);
-    assertThat(listener.getAddress()).isEqualTo("0.0.0.0:" + PORT);
+    assertThat(listener.getAddress()).isEqualTo("0.0.0.0:15001");
     assertThat(listener.getFilterChains()).hasSize(2);
     EnvoyServerProtoData.FilterChain filterChainOutboundInListenerUpdate
         = listener.getFilterChains().get(0);
@@ -459,7 +459,7 @@
                                 ImmutableList.of("bar.googleapis.com"),
                                 "cluster-bar.googleapis.com"))))
                 .build()))),
-        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
             filterChainOutbound,
             filterChainInbound
         )));
@@ -483,7 +483,7 @@
             "ROOTCA2"),
         buildTestFilter("envoy.http_connection_manager"));
     List<Any> listeners1 = ImmutableList.of(
-        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, PORT, "0.0.0.0",
+        Any.pack(buildListenerWithFilterChain(LISTENER_NAME, 15001, "0.0.0.0",
             filterChainNewInbound
         )));
     DiscoveryResponse response1 =
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
index 66cce2b..2167a84 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
@@ -268,7 +268,7 @@
     if (upstreamTlsContext != null) {
       clusterBuilder.setTransportSocket(
           io.envoyproxy.envoy.api.v2.core.TransportSocket.newBuilder()
-              .setName("tls").setTypedConfig(Any.pack(upstreamTlsContext)));
+              .setName("envoy.transport_sockets.tls").setTypedConfig(Any.pack(upstreamTlsContext)));
     }
     return clusterBuilder.build();
   }