[xDS] add support for multiple addresses per endpoint (#34506)
Co-authored-by: markdroth <markdroth@users.noreply.github.com>
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index a8d33f0..3740f9b 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -17430,6 +17430,7 @@
run: false
language: c++
headers:
+ - test/core/util/scoped_env_var.h
- test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
@@ -17831,7 +17832,8 @@
gtest: true
build: test
language: c++
- headers: []
+ headers:
+ - test/core/util/scoped_env_var.h
src:
- src/proto/grpc/testing/xds/v3/address.proto
- src/proto/grpc/testing/xds/v3/base.proto
diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc
index 9274cab..2fb3bab 100644
--- a/src/core/ext/xds/xds_endpoint.cc
+++ b/src/core/ext/xds/xds_endpoint.cc
@@ -50,11 +50,26 @@
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_core {
+namespace {
+
+// TODO(roth): Remove this once dualstack support is stable.
+bool XdsDualstackEndpointsEnabled() {
+ auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ if (!value.has_value()) return false;
+ bool parsed_value;
+ bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
+ return parse_succeeded && parsed_value;
+}
+
+} // namespace
+
//
// XdsEndpointResource
//
@@ -150,6 +165,38 @@
}
}
+absl::optional<grpc_resolved_address> ParseCoreAddress(
+ const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
+ if (address == nullptr) {
+ errors->AddError("field not present");
+ return absl::nullopt;
+ }
+ ValidationErrors::ScopedField field(errors, ".socket_address");
+ const envoy_config_core_v3_SocketAddress* socket_address =
+ envoy_config_core_v3_Address_socket_address(address);
+ if (socket_address == nullptr) {
+ errors->AddError("field not present");
+ return absl::nullopt;
+ }
+ std::string address_str = UpbStringToStdString(
+ envoy_config_core_v3_SocketAddress_address(socket_address));
+ uint32_t port;
+ {
+ ValidationErrors::ScopedField field(errors, ".port_value");
+ port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
+ if (GPR_UNLIKELY(port >> 16) != 0) {
+ errors->AddError("invalid port");
+ return absl::nullopt;
+ }
+ }
+ auto addr = StringToSockaddr(address_str, port);
+ if (!addr.ok()) {
+ errors->AddError(addr.status().message());
+ return absl::nullopt;
+ }
+ return *addr;
+}
+
absl::optional<EndpointAddresses> EndpointAddressesParse(
const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
ValidationErrors* errors) {
@@ -172,8 +219,7 @@
}
}
// endpoint
- // TODO(roth): add support for multiple addresses per endpoint
- grpc_resolved_address grpc_address;
+ std::vector<grpc_resolved_address> addresses;
{
ValidationErrors::ScopedField field(errors, ".endpoint");
const envoy_config_endpoint_v3_Endpoint* endpoint =
@@ -182,43 +228,34 @@
errors->AddError("field not present");
return absl::nullopt;
}
- ValidationErrors::ScopedField field2(errors, ".address");
- const envoy_config_core_v3_Address* address =
- envoy_config_endpoint_v3_Endpoint_address(endpoint);
- if (address == nullptr) {
- errors->AddError("field not present");
- return absl::nullopt;
- }
- ValidationErrors::ScopedField field3(errors, ".socket_address");
- const envoy_config_core_v3_SocketAddress* socket_address =
- envoy_config_core_v3_Address_socket_address(address);
- if (socket_address == nullptr) {
- errors->AddError("field not present");
- return absl::nullopt;
- }
- std::string address_str = UpbStringToStdString(
- envoy_config_core_v3_SocketAddress_address(socket_address));
- uint32_t port;
{
- ValidationErrors::ScopedField field(errors, ".port_value");
- port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
- if (GPR_UNLIKELY(port >> 16) != 0) {
- errors->AddError("invalid port");
- return absl::nullopt;
+ ValidationErrors::ScopedField field(errors, ".address");
+ auto address = ParseCoreAddress(
+ envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
+ if (address.has_value()) addresses.push_back(*address);
+ }
+ if (XdsDualstackEndpointsEnabled()) {
+ size_t size;
+ auto* additional_addresses =
+ envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
+ &size);
+ for (size_t i = 0; i < size; ++i) {
+ ValidationErrors::ScopedField field(
+ errors, absl::StrCat(".additional_addresses[", i, "].address"));
+ auto address = ParseCoreAddress(
+ envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
+ additional_addresses[i]),
+ errors);
+ if (address.has_value()) addresses.push_back(*address);
}
}
- auto addr = StringToSockaddr(address_str, port);
- if (!addr.ok()) {
- errors->AddError(addr.status().message());
- } else {
- grpc_address = *addr;
- }
}
+ if (addresses.empty()) return absl::nullopt;
// Convert to EndpointAddresses.
return EndpointAddresses(
- grpc_address, ChannelArgs()
- .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
- .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
+ addresses, ChannelArgs()
+ .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
+ .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
}
struct ParsedLocality {
diff --git a/src/proto/grpc/testing/xds/v3/endpoint.proto b/src/proto/grpc/testing/xds/v3/endpoint.proto
index 7886fb3..1d01a9c 100644
--- a/src/proto/grpc/testing/xds/v3/endpoint.proto
+++ b/src/proto/grpc/testing/xds/v3/endpoint.proto
@@ -29,6 +29,11 @@
// Upstream host identifier.
message Endpoint {
+ message AdditionalAddress {
+ // Additional address that is associated with the endpoint.
+ core.v3.Address address = 1;
+ }
+
// The upstream host address.
//
// .. attention::
@@ -39,6 +44,13 @@
// in the Address). For LOGICAL or STRICT DNS, it is expected to be hostname,
// and will be resolved via DNS.
core.v3.Address address = 1;
+
+ // An ordered list of addresses that together with `address` comprise the
+ // list of addresses for an endpoint. The address given in the `address` is
+ // prepended to this list. It is assumed that the list must already be
+ // sorted by preference order of the addresses. This will only be supported
+ // for STATIC and EDS clusters.
+ repeated AdditionalAddress additional_addresses = 4;
}
// An Endpoint that Envoy can route traffic to.
diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD
index d2f53db..4dc271f 100644
--- a/test/core/xds/BUILD
+++ b/test/core/xds/BUILD
@@ -328,5 +328,6 @@
"//src/core:grpc_xds_client",
"//src/proto/grpc/testing/xds/v3:endpoint_proto",
"//test/core/util:grpc_test_util",
+ "//test/core/util:scoped_env_var",
],
)
diff --git a/test/core/xds/xds_endpoint_resource_type_test.cc b/test/core/xds/xds_endpoint_resource_type_test.cc
index d77b559..820e7b1 100644
--- a/test/core/xds/xds_endpoint_resource_type_test.cc
+++ b/test/core/xds/xds_endpoint_resource_type_test.cc
@@ -22,6 +22,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <vector>
#include <google/protobuf/wrappers.pb.h>
@@ -54,6 +55,7 @@
#include "src/proto/grpc/testing/xds/v3/endpoint.pb.h"
#include "src/proto/grpc/testing/xds/v3/health_check.pb.h"
#include "src/proto/grpc/testing/xds/v3/percent.pb.h"
+#include "test/core/util/scoped_env_var.h"
#include "test/core/util/test_config.h"
using envoy::config::endpoint::v3::ClusterLoadAssignment;
@@ -489,6 +491,254 @@
<< decode_result.resource.status();
}
+TEST_F(XdsEndpointTest, MultipleAddressesPerEndpoint) {
+ testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ socket_address = ep->add_additional_addresses()
+ ->mutable_address()
+ ->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(444);
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ auto& resource =
+ static_cast<const XdsEndpointResource&>(**decode_result.resource);
+ ASSERT_EQ(resource.priorities.size(), 1);
+ const auto& priority = resource.priorities[0];
+ ASSERT_EQ(priority.localities.size(), 1);
+ const auto& p = *priority.localities.begin();
+ ASSERT_EQ(p.first, p.second.name.get());
+ EXPECT_EQ(p.first->region(), "myregion");
+ EXPECT_EQ(p.first->zone(), "myzone");
+ EXPECT_EQ(p.first->sub_zone(), "mysubzone");
+ EXPECT_EQ(p.second.lb_weight, 1);
+ ASSERT_EQ(p.second.endpoints.size(), 1);
+ const auto& endpoint = p.second.endpoints.front();
+ ASSERT_EQ(endpoint.addresses().size(), 2);
+ auto addr =
+ grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
+ ASSERT_TRUE(addr.ok()) << addr.status();
+ EXPECT_EQ(*addr, "127.0.0.1:443");
+ addr = grpc_sockaddr_to_string(&endpoint.addresses()[1], /*normalize=*/false);
+ ASSERT_TRUE(addr.ok()) << addr.status();
+ EXPECT_EQ(*addr, "127.0.0.1:444");
+ EXPECT_EQ(endpoint.args(), ChannelArgs()
+ .Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
+ .Set(GRPC_ARG_XDS_HEALTH_STATUS,
+ XdsHealthStatus::HealthStatus::kUnknown));
+ ASSERT_NE(resource.drop_config, nullptr);
+ EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesMissingAddress) {
+ testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ ep->add_additional_addresses();
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ EXPECT_EQ(decode_result.resource.status().code(),
+ absl::StatusCode::kInvalidArgument);
+ EXPECT_EQ(decode_result.resource.status().message(),
+ "errors parsing EDS resource: ["
+ "field:endpoints[0].lb_endpoints[0].endpoint"
+ ".additional_addresses[0].address error:field not present]")
+ << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesMissingSocketAddress) {
+ testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ ep->add_additional_addresses()->mutable_address();
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ EXPECT_EQ(decode_result.resource.status().code(),
+ absl::StatusCode::kInvalidArgument);
+ EXPECT_EQ(decode_result.resource.status().message(),
+ "errors parsing EDS resource: ["
+ "field:endpoints[0].lb_endpoints[0].endpoint"
+ ".additional_addresses[0].address.socket_address "
+ "error:field not present]")
+ << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesInvalidPort) {
+ testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ socket_address = ep->add_additional_addresses()
+ ->mutable_address()
+ ->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(65537);
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ EXPECT_EQ(decode_result.resource.status().code(),
+ absl::StatusCode::kInvalidArgument);
+ EXPECT_EQ(decode_result.resource.status().message(),
+ "errors parsing EDS resource: ["
+ "field:endpoints[0].lb_endpoints[0].endpoint"
+ ".additional_addresses[0].address.socket_address.port_value "
+ "error:invalid port]")
+ << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, AdditionalAddressesInvalidAddress) {
+ testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ socket_address = ep->add_additional_addresses()
+ ->mutable_address()
+ ->mutable_socket_address();
+ socket_address->set_address("not_an_ip_address");
+ socket_address->set_port_value(444);
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ EXPECT_EQ(decode_result.resource.status().code(),
+ absl::StatusCode::kInvalidArgument);
+ EXPECT_EQ(decode_result.resource.status().message(),
+ "errors parsing EDS resource: ["
+ "field:endpoints[0].lb_endpoints[0].endpoint"
+ ".additional_addresses[0].address.socket_address error:"
+ "Failed to parse address:not_an_ip_address:444]")
+ << decode_result.resource.status();
+}
+
+TEST_F(XdsEndpointTest, IgnoresMultipleAddressesPerEndpointWhenNotEnabled) {
+ ClusterLoadAssignment cla;
+ cla.set_cluster_name("foo");
+ auto* locality = cla.add_endpoints();
+ locality->mutable_load_balancing_weight()->set_value(1);
+ auto* locality_name = locality->mutable_locality();
+ locality_name->set_region("myregion");
+ locality_name->set_zone("myzone");
+ locality_name->set_sub_zone("mysubzone");
+ auto* ep = locality->add_lb_endpoints()->mutable_endpoint();
+ auto* socket_address = ep->mutable_address()->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(443);
+ socket_address = ep->add_additional_addresses()
+ ->mutable_address()
+ ->mutable_socket_address();
+ socket_address->set_address("127.0.0.1");
+ socket_address->set_port_value(444);
+ std::string serialized_resource;
+ ASSERT_TRUE(cla.SerializeToString(&serialized_resource));
+ auto* resource_type = XdsEndpointResourceType::Get();
+ auto decode_result =
+ resource_type->Decode(decode_context_, serialized_resource);
+ ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
+ ASSERT_TRUE(decode_result.name.has_value());
+ EXPECT_EQ(*decode_result.name, "foo");
+ auto& resource =
+ static_cast<const XdsEndpointResource&>(**decode_result.resource);
+ ASSERT_EQ(resource.priorities.size(), 1);
+ const auto& priority = resource.priorities[0];
+ ASSERT_EQ(priority.localities.size(), 1);
+ const auto& p = *priority.localities.begin();
+ ASSERT_EQ(p.first, p.second.name.get());
+ EXPECT_EQ(p.first->region(), "myregion");
+ EXPECT_EQ(p.first->zone(), "myzone");
+ EXPECT_EQ(p.first->sub_zone(), "mysubzone");
+ EXPECT_EQ(p.second.lb_weight, 1);
+ ASSERT_EQ(p.second.endpoints.size(), 1);
+ const auto& endpoint = p.second.endpoints.front();
+ ASSERT_EQ(endpoint.addresses().size(), 1);
+ auto addr =
+ grpc_sockaddr_to_string(&endpoint.addresses()[0], /*normalize=*/false);
+ ASSERT_TRUE(addr.ok()) << addr.status();
+ EXPECT_EQ(*addr, "127.0.0.1:443");
+ EXPECT_EQ(endpoint.args(), ChannelArgs()
+ .Set(GRPC_ARG_ADDRESS_WEIGHT, 1)
+ .Set(GRPC_ARG_XDS_HEALTH_STATUS,
+ XdsHealthStatus::HealthStatus::kUnknown));
+ ASSERT_NE(resource.drop_config, nullptr);
+ EXPECT_TRUE(resource.drop_config->drop_category_list().empty());
+}
+
TEST_F(XdsEndpointTest, MissingEndpoint) {
ClusterLoadAssignment cla;
cla.set_cluster_name("foo");
diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD
index 2e2f31d..863e1d2 100644
--- a/test/cpp/end2end/xds/BUILD
+++ b/test/cpp/end2end/xds/BUILD
@@ -156,6 +156,7 @@
"//:grpc",
"//:grpc++",
"//test/core/util:grpc_test_util",
+ "//test/core/util:scoped_env_var",
"//test/cpp/end2end:connection_attempt_injector",
],
)
diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
index 68a1f3e..a658f31 100644
--- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
+++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc
@@ -26,7 +26,9 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/config/config_vars.h"
+#include "src/core/lib/experiments/experiments.h"
#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
+#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@@ -411,6 +413,43 @@
channel_->GetLoadBalancingPolicyName());
}
+TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
+ if (!grpc_core::IsRoundRobinDelegateToPickFirstEnabled()) return;
+ grpc_core::testing::ScopedExperimentalEnvVar env(
+ "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
+ const size_t kNumRpcsPerAddress = 10;
+ // Create 3 backends, but leave backend 0 unstarted.
+ CreateBackends(3);
+ StartBackend(1);
+ StartBackend(2);
+ // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
+ EdsResourceArgs args({
+ {"locality0",
+ {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
+ });
+ balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+ // Initially, backend 0 is offline, so the first endpoint should
+ // connect to backend 1 instead. Traffic should round-robin across
+ // backends 1 and 2.
+ WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2.
+ CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backends_[1]->backend_service()->request_count());
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backends_[2]->backend_service()->request_count());
+ // Now start backend 0 and shutdown backend 1.
+ StartBackend(0);
+ ShutdownBackend(1);
+ // Wait for traffic to go to backend 0.
+ WaitForBackend(DEBUG_LOCATION, 0);
+ // Traffic should now round-robin across backends 0 and 2.
+ CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backends_[0]->backend_service()->request_count());
+ EXPECT_EQ(kNumRpcsPerAddress,
+ backends_[2]->backend_service()->request_count());
+}
+
TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
CreateAndStartBackends(2);
const size_t kNumRpcsPerAddress = 100;
@@ -435,32 +474,6 @@
}
}
-TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
- CreateAndStartBackends(1);
- // Initial EDS resource has one locality with no endpoints.
- EdsResourceArgs::Locality empty_locality("locality0", {});
- EdsResourceArgs args({std::move(empty_locality)});
- balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
- // RPCs should fail.
- constexpr char kErrorMessage[] =
- "no children in weighted_target policy: "
- "EDS resource eds_service_name contains empty localities: "
- "\\[\\{region=\"xds_default_locality_region\", "
- "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
- CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
- // Send EDS resource that has an endpoint.
- args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
- balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
- // RPCs should eventually succeed.
- WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
- if (!result.status.ok()) {
- EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
- EXPECT_THAT(result.status.error_message(),
- ::testing::MatchesRegex(kErrorMessage));
- }
- });
-}
-
// This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
CreateAndStartBackends(1);
@@ -650,7 +663,7 @@
// Tests that the localities in a locality map are picked according to their
// weights.
-TEST_P(EdsTest, WeightedRoundRobin) {
+TEST_P(EdsTest, LocalityWeights) {
CreateAndStartBackends(2);
const int kLocalityWeight0 = 2;
const int kLocalityWeight1 = 8;
@@ -724,6 +737,32 @@
::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
}
+TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
+ CreateAndStartBackends(1);
+ // Initial EDS resource has one locality with no endpoints.
+ EdsResourceArgs::Locality empty_locality("locality0", {});
+ EdsResourceArgs args({std::move(empty_locality)});
+ balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+ // RPCs should fail.
+ constexpr char kErrorMessage[] =
+ "no children in weighted_target policy: "
+ "EDS resource eds_service_name contains empty localities: "
+ "\\[\\{region=\"xds_default_locality_region\", "
+ "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
+ CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
+ // Send EDS resource that has an endpoint.
+ args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
+ balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
+ // RPCs should eventually succeed.
+ WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
+ if (!result.status.ok()) {
+ EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
+ EXPECT_THAT(result.status.error_message(),
+ ::testing::MatchesRegex(kErrorMessage));
+ }
+ });
+}
+
// Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest, LocalityContainingNoEndpoints) {
CreateAndStartBackends(2);
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
index 43eb44c..80e0859 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc
@@ -641,22 +641,28 @@
endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
endpoints->mutable_locality()->set_sub_zone(locality.sub_zone);
for (size_t i = 0; i < locality.endpoints.size(); ++i) {
- const int& port = locality.endpoints[i].port;
+ const auto& endpoint = locality.endpoints[i];
auto* lb_endpoints = endpoints->add_lb_endpoints();
if (locality.endpoints.size() > i &&
locality.endpoints[i].health_status != HealthStatus::UNKNOWN) {
- lb_endpoints->set_health_status(locality.endpoints[i].health_status);
+ lb_endpoints->set_health_status(endpoint.health_status);
}
- if (locality.endpoints.size() > i &&
- locality.endpoints[i].lb_weight >= 1) {
+ if (locality.endpoints.size() > i && endpoint.lb_weight >= 1) {
lb_endpoints->mutable_load_balancing_weight()->set_value(
- locality.endpoints[i].lb_weight);
+ endpoint.lb_weight);
}
- auto* endpoint = lb_endpoints->mutable_endpoint();
- auto* address = endpoint->mutable_address();
- auto* socket_address = address->mutable_socket_address();
+ auto* endpoint_proto = lb_endpoints->mutable_endpoint();
+ auto* socket_address =
+ endpoint_proto->mutable_address()->mutable_socket_address();
socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
- socket_address->set_port_value(port);
+ socket_address->set_port_value(endpoint.port);
+ for (int port : endpoint.additional_ports) {
+ socket_address = endpoint_proto->add_additional_addresses()
+ ->mutable_address()
+ ->mutable_socket_address();
+ socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
+ socket_address->set_port_value(port);
+ }
}
}
if (!args.drop_categories.empty()) {
diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h
index 0955be9..683b133 100644
--- a/test/cpp/end2end/xds/xds_end2end_test_lib.h
+++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h
@@ -592,12 +592,17 @@
explicit Endpoint(int port,
::envoy::config::core::v3::HealthStatus health_status =
::envoy::config::core::v3::HealthStatus::UNKNOWN,
- int lb_weight = 1)
- : port(port), health_status(health_status), lb_weight(lb_weight) {}
+ int lb_weight = 1,
+ std::vector<int> additional_ports = {})
+ : port(port),
+ health_status(health_status),
+ lb_weight(lb_weight),
+ additional_ports(std::move(additional_ports)) {}
int port;
::envoy::config::core::v3::HealthStatus health_status;
int lb_weight;
+ std::vector<int> additional_ports;
};
// A locality.
@@ -632,9 +637,15 @@
size_t backend_idx,
::envoy::config::core::v3::HealthStatus health_status =
::envoy::config::core::v3::HealthStatus::UNKNOWN,
- int lb_weight = 1) {
+ int lb_weight = 1, std::vector<size_t> additional_backend_indxees = {}) {
+ std::vector<int> additional_ports;
+ additional_ports.reserve(additional_backend_indxees.size());
+ for (size_t idx : additional_backend_indxees) {
+ additional_ports.push_back(backends_[idx]->port());
+ }
return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(),
- health_status, lb_weight);
+ health_status, lb_weight,
+ additional_ports);
}
// Creates a vector of endpoints for a specified range of backends,