[xDS] add support for multiple addresses per endpoint (#34506)

Co-authored-by: markdroth <markdroth@users.noreply.github.com>
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index a8d33f0..3740f9b 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -17430,6 +17430,7 @@
   run: false
   language: c++
   headers:
+  - test/core/util/scoped_env_var.h
   - test/cpp/end2end/connection_attempt_injector.h
   - test/cpp/end2end/counted_service.h
   - test/cpp/end2end/test_service_impl.h
@@ -17831,7 +17832,8 @@
   gtest: true
   build: test
   language: c++
-  headers: []
+  headers:
+  - test/core/util/scoped_env_var.h
   src:
   - src/proto/grpc/testing/xds/v3/address.proto
   - src/proto/grpc/testing/xds/v3/base.proto
diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc
index 9274cab..2fb3bab 100644
--- a/src/core/ext/xds/xds_endpoint.cc
+++ b/src/core/ext/xds/xds_endpoint.cc
@@ -50,11 +50,26 @@
 #include "src/core/lib/address_utils/sockaddr_utils.h"
 #include "src/core/lib/channel/channel_args.h"
 #include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/env.h"
 #include "src/core/lib/gprpp/validation_errors.h"
 #include "src/core/lib/iomgr/resolved_address.h"
 
 namespace grpc_core {
 
+namespace {
+
+// TODO(roth): Remove this once dualstack support is stable.
+bool XdsDualstackEndpointsEnabled() {
+  auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  if (!value.has_value()) return false;
+  bool parsed_value;
+  bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
+  return parse_succeeded && parsed_value;
+}
+
+}  // namespace
+
 //
 // XdsEndpointResource
 //
@@ -150,6 +165,38 @@
   }
 }
 
+absl::optional<grpc_resolved_address> ParseCoreAddress(
+    const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
+  if (address == nullptr) {
+    errors->AddError("field not present");
+    return absl::nullopt;
+  }
+  ValidationErrors::ScopedField field(errors, ".socket_address");
+  const envoy_config_core_v3_SocketAddress* socket_address =
+      envoy_config_core_v3_Address_socket_address(address);
+  if (socket_address == nullptr) {
+    errors->AddError("field not present");
+    return absl::nullopt;
+  }
+  std::string address_str = UpbStringToStdString(
+      envoy_config_core_v3_SocketAddress_address(socket_address));
+  uint32_t port;
+  {
+    ValidationErrors::ScopedField field(errors, ".port_value");
+    port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
+    if (GPR_UNLIKELY(port >> 16) != 0) {
+      errors->AddError("invalid port");
+      return absl::nullopt;
+    }
+  }
+  auto addr = StringToSockaddr(address_str, port);
+  if (!addr.ok()) {
+    errors->AddError(addr.status().message());
+    return absl::nullopt;
+  }
+  return *addr;
+}
+
 absl::optional<EndpointAddresses> EndpointAddressesParse(
     const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
     ValidationErrors* errors) {
@@ -172,8 +219,7 @@
     }
   }
   // endpoint
-  // TODO(roth): add support for multiple addresses per endpoint
-  grpc_resolved_address grpc_address;
+  std::vector<grpc_resolved_address> addresses;
   {
     ValidationErrors::ScopedField field(errors, ".endpoint");
     const envoy_config_endpoint_v3_Endpoint* endpoint =
@@ -182,43 +228,34 @@
       errors->AddError("field not present");
       return absl::nullopt;
     }
-    ValidationErrors::ScopedField field2(errors, ".address");
-    const envoy_config_core_v3_Address* address =
-        envoy_config_endpoint_v3_Endpoint_address(endpoint);
-    if (address == nullptr) {
-      errors->AddError("field not present");
-      return absl::nullopt;
-    }
-    ValidationErrors::ScopedField field3(errors, ".socket_address");
-    const envoy_config_core_v3_SocketAddress* socket_address =
-        envoy_config_core_v3_Address_socket_address(address);
-    if (socket_address == nullptr) {
-      errors->AddError("field not present");
-      return absl::nullopt;
-    }
-    std::string address_str = UpbStringToStdString(
-        envoy_config_core_v3_SocketAddress_address(socket_address));
-    uint32_t port;
     {
-      ValidationErrors::ScopedField field(errors, ".port_value");
-      port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
-      if (GPR_UNLIKELY(port >> 16) != 0) {
-        errors->AddError("invalid port");
-        return absl::nullopt;
+      ValidationErrors::ScopedField field(errors, ".address");
+      auto address = ParseCoreAddress(
+          envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
+      if (address.has_value()) addresses.push_back(*address);
+    }
+    if (XdsDualstackEndpointsEnabled()) {
+      size_t size;
+      auto* additional_addresses =
+          envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
+                                                                 &size);
+      for (size_t i = 0; i < size; ++i) {
+        ValidationErrors::ScopedField field(
+            errors, absl::StrCat(".additional_addresses[", i, "].address"));
+        auto address = ParseCoreAddress(
+            envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
+                additional_addresses[i]),
+            errors);
+        if (address.has_value()) addresses.push_back(*address);
       }
     }
-    auto addr = StringToSockaddr(address_str, port);
-    if (!addr.ok()) {
-      errors->AddError(addr.status().message());
-    } else {
-      grpc_address = *addr;
-    }
   }
+  if (addresses.empty()) return absl::nullopt;
   // Convert to EndpointAddresses.
   return EndpointAddresses(
-      grpc_address, ChannelArgs()
-                        .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
-                        .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
+      addresses, ChannelArgs()
+                     .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
+                     .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
 }
 
 struct ParsedLocality {
diff --git a/src/proto/grpc/testing/xds/v3/endpoint.proto b/src/proto/grpc/testing/xds/v3/endpoint.proto
index 7886fb3..1d01a9c 100644
--- a/src/proto/grpc/testing/xds/v3/endpoint.proto
+++ b/src/proto/grpc/testing/xds/v3/endpoint.proto
@@ -29,6 +29,11 @@
 
 // Upstream host identifier.
 message Endpoint {
+  message AdditionalAddress {
+    // Additional address that is associated with the endpoint.
+    core.v3.Address address = 1;
+  }
+
   // The upstream host address.
   //
   // .. attention::
@@ -39,6 +44,13 @@
   //   in the Address). For LOGICAL or STRICT DNS, it is expected to be hostname,
   //   and will be resolved via DNS.
   core.v3.Address address = 1;
+
+  // An ordered list of addresses that together with `address` comprise the
+  // list of addresses for an endpoint. The address given in the `address` is
+  // prepended to this list. It is assumed that the list must already be
+  // sorted by preference order of the addresses. This will only be supported
+  // for STATIC and EDS clusters.
+  repeated AdditionalAddress additional_addresses = 4;
 }
 
 // An Endpoint that Envoy can route traffic to.
diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD
index d2f53db..4dc271f 100644
--- a/test/core/xds/BUILD
+++ b/test/core/xds/BUILD
@@ -328,5 +328,6 @@
         "//src/core:grpc_xds_client",
         "//src/proto/grpc/testing/xds/v3:endpoint_proto",
         "//test/core/util:grpc_test_util",
+        "//test/core/util:scoped_env_var",
     ],
 )
diff --git a/test/core/xds/xds_endpoint_resource_type_test.cc b/test/core/xds/xds_endpoint_resource_type_test.cc
index d77b559..820e7b1 100644
--- a/test/core/xds/xds_endpoint_resource_type_test.cc
+++ b/test/core/xds/xds_endpoint_resource_type_test.cc
@@ -22,6 +22,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include <google/protobuf/wrappers.pb.h>
 
@@ -54,6 +55,7 @@
 #include "src/proto/grpc/testing/xds/v3/endpoint.pb.h"
 #include "src/proto/grpc/testing/xds/v3/health_check.pb.h"
 #include "src/proto/grpc/testing/xds/v3/percent.pb.h"
+#include "test/core/util/scoped_env_var.h"
 #include "test/core/util/test_config.h"
 
 using envoy::config::endpoint::v3::ClusterLoadAssignment;
@@ -489,6 +491,254 @@
       << decode_result.resource.status();
 }
 
+TEST_F(XdsEndpointTest, MultipleAddressesPerEndpoint) {
+  testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  socket_address = ep->add_additional_addresses()
+                       ->mutable_address()
+                       ->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(444);
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  auto& resource =
+      static_cast<const XdsEndpointResource&>(**decode_result.resource);
+  ASSERT_EQ(resource.priorities.size(), 1);
+  const auto& priority = resource.priorities[0];
+  ASSERT_EQ(priority.localities.size(), 1);
+  const auto& p = *priority.localities.begin();
+  ASSERT_EQ(p.first, p.second.name.get());
+  EXPECT_EQ(p.first->region(), "myregion");
+  EXPECT_EQ(p.first->zone(), "myzone");
+  EXPECT_EQ(p.first->sub_zone(), "mysubzone");
+  EXPECT_EQ(p.second.lb_weight, 1);
+  ASSERT_EQ(p.second.endpoints.size(), 1);
+  const auto& endpoint = p.second.endpoints.front();
+  ASSERT_EQ(endpoint.addresses().size(), 2);
+  auto addr =
+      grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
+  ASSERT_TRUE(addr.ok()) << addr.status();
+  EXPECT_EQ(*addr, "127.0.0.1:443");
+  addr = grpc_sockaddr_to_string(&endpoint.addresses()[1], /*normalize=*/false);
+  ASSERT_TRUE(addr.ok()) << addr.status();
+  EXPECT_EQ(*addr, "127.0.0.1:444");
+  EXPECT_EQ(endpoint.args(), ChannelArgs()
+                                 .Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
+                                 .Set(GRPC_ARG_XDS_HEALTH_STATUS,
+                                      XdsHealthStatus::HealthStatus::kUnknown));
+  ASSERT_NE(resource.drop_config, nullptr);
+  EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesMissingAddress) {
+  testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  ep->add_additional_addresses();
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  EXPECT_EQ(decode_result.resource.status().code(),
+            absl::StatusCode::kInvalidArgument);
+  EXPECT_EQ(decode_result.resource.status().message(),
+            "errors parsing EDS resource: ["
+            "field:endpoints[0].lb_endpoints[0].endpoint"
+            ".additional_addresses[0].address error:field not present]")
+      << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesMissingSocketAddress) {
+  testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  ep->add_additional_addresses()->mutable_address();
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  EXPECT_EQ(decode_result.resource.status().code(),
+            absl::StatusCode::kInvalidArgument);
+  EXPECT_EQ(decode_result.resource.status().message(),
+            "errors parsing EDS resource: ["
+            "field:endpoints[0].lb_endpoints[0].endpoint"
+            ".additional_addresses[0].address.socket_address "
+            "error:field not present]")
+      << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesInvalidPort) {
+  testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  socket_address = ep->add_additional_addresses()
+                       ->mutable_address()
+                       ->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(65537);
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  EXPECT_EQ(decode_result.resource.status().code(),
+            absl::StatusCode::kInvalidArgument);
+  EXPECT_EQ(decode_result.resource.status().message(),
+            "errors parsing EDS resource: ["
+            "field:endpoints[0].lb_endpoints[0].endpoint"
+            ".additional_addresses[0].address.socket_address.port_value "
+            "error:invalid port]")
+      << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesInvalidAddress) {
+  testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  socket_address = ep->add_additional_addresses()
+                       ->mutable_address()
+                       ->mutable_socket_address();
+  socket_address->set_address("not_an_ip_address");
+  socket_address->set_port_value(444);
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  EXPECT_EQ(decode_result.resource.status().code(),
+            absl::StatusCode::kInvalidArgument);
+  EXPECT_EQ(decode_result.resource.status().message(),
+            "errors parsing EDS resource: ["
+            "field:endpoints[0].lb_endpoints[0].endpoint"
+            ".additional_addresses[0].address.socket_address error:"
+            "Failed to parse address:not_an_ip_address:444]")
+      << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, IgnoresMultipleAddressesPerEndpointWhenNotEnabled) {
+  ClusterLoadAssignment cla;
+  cla.set_cluster_name("foo");
+  auto* locality = cla.add_endpoints();
+  locality->mutable_load_balancing_weight()->set_value(1);
+  auto* locality_name = locality->mutable_locality();
+  locality_name->set_region("myregion");
+  locality_name->set_zone("myzone");
+  locality_name->set_sub_zone("mysubzone");
+  auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+  auto* socket_address = ep->mutable_address()->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(443);
+  socket_address = ep->add_additional_addresses()
+                       ->mutable_address()
+                       ->mutable_socket_address();
+  socket_address->set_address("127.0.0.1");
+  socket_address->set_port_value(444);
+  std::string serialized_resource;
+  ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+  auto* resource_type = XdsEndpointResourceType::Get();
+  auto decode_result =
+      resource_type->Decode(decode_context_, serialized_resource);
+  ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
+  ASSERT_TRUE(decode_result.name.has_value());
+  EXPECT_EQ(*decode_result.name, "foo");
+  auto& resource =
+      static_cast<const XdsEndpointResource&>(**decode_result.resource);
+  ASSERT_EQ(resource.priorities.size(), 1);
+  const auto& priority = resource.priorities[0];
+  ASSERT_EQ(priority.localities.size(), 1);
+  const auto& p = *priority.localities.begin();
+  ASSERT_EQ(p.first, p.second.name.get());
+  EXPECT_EQ(p.first->region(), "myregion");
+  EXPECT_EQ(p.first->zone(), "myzone");
+  EXPECT_EQ(p.first->sub_zone(), "mysubzone");
+  EXPECT_EQ(p.second.lb_weight, 1);
+  ASSERT_EQ(p.second.endpoints.size(), 1);
+  const auto& endpoint = p.second.endpoints.front();
+  ASSERT_EQ(endpoint.addresses().size(), 1);
+  auto addr =
+      grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
+  ASSERT_TRUE(addr.ok()) << addr.status();
+  EXPECT_EQ(*addr, "127.0.0.1:443");
+  EXPECT_EQ(endpoint.args(), ChannelArgs()
+                                 .Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
+                                 .Set(GRPC_ARG_XDS_HEALTH_STATUS,
+                                      XdsHealthStatus::HealthStatus::kUnknown));
+  ASSERT_NE(resource.drop_config, nullptr);
+  EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
+}
+
 TEST_F(XdsEndpointTest, MissingEndpoint) {
   ClusterLoadAssignment cla;
   cla.set_cluster_name("foo");
diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD
index 2e2f31d..863e1d2 100644
--- a/test/cpp/end2end/xds/BUILD
+++ b/test/cpp/end2end/xds/BUILD
@@ -156,6 +156,7 @@
         "//:grpc",
         "//:grpc++",
         "//test/core/util:grpc_test_util",
+        "//test/core/util:scoped_env_var",
         "//test/cpp/end2end:connection_attempt_injector",
     ],
 )
diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
index 68a1f3e..a658f31 100644
--- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
+++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
@@ -26,7 +26,9 @@
 #include "src/core/ext/filters/client_channel/backup_poller.h"
 #include "src/core/lib/address_utils/sockaddr_utils.h"
 #include "src/core/lib/config/config_vars.h"
+#include "src/core/lib/experiments/experiments.h"
 #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
+#include "test/core/util/scoped_env_var.h"
 #include "test/cpp/end2end/connection_attempt_injector.h"
 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
 
@@ -411,6 +413,43 @@
             channel_->GetLoadBalancingPolicyName());
 }
 
+TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
+  if (!grpc_core::IsRoundRobinDelegateToPickFirstEnabled()) return;
+  grpc_core::testing::ScopedExperimentalEnvVar env(
+      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+  const size_t kNumRpcsPerAddress = 10;
+  // Create 3 backends, but leave backend 0 unstarted.
+  CreateBackends(3);
+  StartBackend(1);
+  StartBackend(2);
+  // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
+  EdsResourceArgs args({
+      {"locality0",
+       {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
+  });
+  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+  // Initially, backend 0 is offline, so the first endpoint should
+  // connect to backend 1 instead.  Traffic should round-robin across
+  // backends 1 and 2.
+  WaitForAllBackends(DEBUG_LOCATION, 1);  // Wait for backends 1 and 2.
+  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
+  EXPECT_EQ(kNumRpcsPerAddress,
+            backends_[1]->backend_service()->request_count());
+  EXPECT_EQ(kNumRpcsPerAddress,
+            backends_[2]->backend_service()->request_count());
+  // Now start backend 0 and shutdown backend 1.
+  StartBackend(0);
+  ShutdownBackend(1);
+  // Wait for traffic to go to backend 0.
+  WaitForBackend(DEBUG_LOCATION, 0);
+  // Traffic should now round-robin across backends 0 and 2.
+  CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
+  EXPECT_EQ(kNumRpcsPerAddress,
+            backends_[0]->backend_service()->request_count());
+  EXPECT_EQ(kNumRpcsPerAddress,
+            backends_[2]->backend_service()->request_count());
+}
+
 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
   CreateAndStartBackends(2);
   const size_t kNumRpcsPerAddress = 100;
@@ -435,32 +474,6 @@
   }
 }
 
-TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
-  CreateAndStartBackends(1);
-  // Initial EDS resource has one locality with no endpoints.
-  EdsResourceArgs::Locality empty_locality("locality0", {});
-  EdsResourceArgs args({std::move(empty_locality)});
-  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
-  // RPCs should fail.
-  constexpr char kErrorMessage[] =
-      "no children in weighted_target policy: "
-      "EDS resource eds_service_name contains empty localities: "
-      "\\[\\{region=\"xds_default_locality_region\", "
-      "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
-  CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
-  // Send EDS resource that has an endpoint.
-  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
-  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
-  // RPCs should eventually succeed.
-  WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
-    if (!result.status.ok()) {
-      EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
-      EXPECT_THAT(result.status.error_message(),
-                  ::testing::MatchesRegex(kErrorMessage));
-    }
-  });
-}
-
 // This tests the bug described in https://github.com/grpc/grpc/issues/32486.
 TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
   CreateAndStartBackends(1);
@@ -650,7 +663,7 @@
 
 // Tests that the localities in a locality map are picked according to their
 // weights.
-TEST_P(EdsTest, WeightedRoundRobin) {
+TEST_P(EdsTest, LocalityWeights) {
   CreateAndStartBackends(2);
   const int kLocalityWeight0 = 2;
   const int kLocalityWeight1 = 8;
@@ -724,6 +737,32 @@
               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
 }
 
+TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
+  CreateAndStartBackends(1);
+  // Initial EDS resource has one locality with no endpoints.
+  EdsResourceArgs::Locality empty_locality("locality0", {});
+  EdsResourceArgs args({std::move(empty_locality)});
+  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+  // RPCs should fail.
+  constexpr char kErrorMessage[] =
+      "no children in weighted_target policy: "
+      "EDS resource eds_service_name contains empty localities: "
+      "\\[\\{region=\"xds_default_locality_region\", "
+      "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
+  CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
+  // Send EDS resource that has an endpoint.
+  args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
+  balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+  // RPCs should eventually succeed.
+  WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
+    if (!result.status.ok()) {
+      EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
+      EXPECT_THAT(result.status.error_message(),
+                  ::testing::MatchesRegex(kErrorMessage));
+    }
+  });
+}
+
 // Tests that we correctly handle a locality containing no endpoints.
 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
   CreateAndStartBackends(2);
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
index 43eb44c..80e0859 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
@@ -641,22 +641,28 @@
     endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
     endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
     for (size_t i = 0; i < locality.endpoints.size(); ++i) {
-      const int& port = locality.endpoints[i].port;
+      const auto& endpoint = locality.endpoints[i];
       auto* lb_endpoints = endpoints->add_lb_endpoints();
       if (locality.endpoints.size() > i &&
           locality.endpoints[i].health_status != HealthStatus::UNKNOWN) {
-        lb_endpoints->set_health_status(locality.endpoints[i].health_status);
+        lb_endpoints->set_health_status(endpoint.health_status);
       }
-      if (locality.endpoints.size() > i &&
-          locality.endpoints[i].lb_weight >= 1) {
+      if (locality.endpoints.size() > i && endpoint.lb_weight >= 1) {
         lb_endpoints->mutable_load_balancing_weight()->set_value(
-            locality.endpoints[i].lb_weight);
+            endpoint.lb_weight);
       }
-      auto* endpoint = lb_endpoints->mutable_endpoint();
-      auto* address = endpoint->mutable_address();
-      auto* socket_address = address->mutable_socket_address();
+      auto* endpoint_proto = lb_endpoints->mutable_endpoint();
+      auto* socket_address =
+          endpoint_proto->mutable_address()->mutable_socket_address();
       socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
-      socket_address->set_port_value(port);
+      socket_address->set_port_value(endpoint.port);
+      for (int port : endpoint.additional_ports) {
+        socket_address = endpoint_proto->add_additional_addresses()
+                             ->mutable_address()
+                             ->mutable_socket_address();
+        socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
+        socket_address->set_port_value(port);
+      }
     }
   }
   if (!args.drop_categories.empty()) {
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h
index 0955be9..683b133 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.h
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h
@@ -592,12 +592,17 @@
       explicit Endpoint(int port,
                         ::envoy::config::core::v3::HealthStatus health_status =
                             ::envoy::config::core::v3::HealthStatus::UNKNOWN,
-                        int lb_weight = 1)
-          : port(port), health_status(health_status), lb_weight(lb_weight) {}
+                        int lb_weight = 1,
+                        std::vector<int> additional_ports = {})
+          : port(port),
+            health_status(health_status),
+            lb_weight(lb_weight),
+            additional_ports(std::move(additional_ports)) {}
 
       int port;
       ::envoy::config::core::v3::HealthStatus health_status;
       int lb_weight;
+      std::vector<int> additional_ports;
     };
 
     // A locality.
@@ -632,9 +637,15 @@
       size_t backend_idx,
       ::envoy::config::core::v3::HealthStatus health_status =
           ::envoy::config::core::v3::HealthStatus::UNKNOWN,
-      int lb_weight = 1) {
+      int lb_weight = 1, std::vector<size_t> additional_backend_indxees = {}) {
+    std::vector<int> additional_ports;
+    additional_ports.reserve(additional_backend_indxees.size());
+    for (size_t idx : additional_backend_indxees) {
+      additional_ports.push_back(backends_[idx]->port());
+    }
     return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(),
-                                     health_status, lb_weight);
+                                     health_status, lb_weight,
+                                     additional_ports);
   }
 
   // Creates a vector of endpoints for a specified range of backends,