Add locality map tests
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 4f119ed..468c9d8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -509,7 +509,7 @@
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<PickerWrapper> picker_wrapper_;
- grpc_connectivity_state connectivity_state_;
+ grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
uint32_t locality_weight_;
};
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 5d114c0..dda5bdf 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -98,6 +98,7 @@
constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone";
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
+constexpr int kDefaultLocalityWeight = 3;
template <typename ServiceType>
class CountedService : public ServiceType {
@@ -317,6 +318,7 @@
static DiscoveryResponse BuildResponse(
const std::vector<std::vector<int>>& backend_ports,
+ const std::vector<int>& lb_weights = {},
const std::map<grpc::string, uint32_t>& drop_categories = {},
const FractionalPercent::DenominatorType denominator =
FractionalPercent::MILLION) {
@@ -324,7 +326,9 @@
assignment.set_cluster_name("service name");
for (size_t i = 0; i < backend_ports.size(); ++i) {
auto* endpoints = assignment.add_endpoints();
- endpoints->mutable_load_balancing_weight()->set_value(3);
+ const int lb_weight =
+ lb_weights.empty() ? kDefaultLocalityWeight : lb_weights[i];
+ endpoints->mutable_load_balancing_weight()->set_value(lb_weight);
endpoints->set_priority(0);
endpoints->mutable_locality()->set_region(kDefaultLocalityRegion);
endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
@@ -620,11 +624,14 @@
return std::make_tuple(num_ok, num_failure, num_drops);
}
- void WaitForBackend(size_t backend_idx) {
+ void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
+ gpr_log(GPR_INFO,
+ "========= WAITING FOR BACKEND %lu ==========", backend_idx);
do {
(void)SendRpc();
} while (backends_[backend_idx]->backend_service()->request_count() == 0);
- ResetBackendCounters();
+ if (reset_counters) ResetBackendCounters();
+ gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========", backend_idx);
}
grpc_core::ServerAddressList CreateLbAddressesFromPortList(
@@ -1047,6 +1054,75 @@
EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
}
+TEST_F(SingleBalancerTest, LocalityMapWeightedRoundRobin) {
+ SetNextResolution({}, kDefaultServiceConfig_.c_str());
+ SetNextResolutionForLbChannelAllBalancers();
+ const size_t kNumRpcs = 5000;
+ const int kLocalityWeight0 = 2;
+ const int kLocalityWeight1 = 8;
+ const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
+ const double kLocalityWeightRate0 =
+ static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
+ const double kLocalityWeightRate1 =
+ static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
+ // EDS response contains 2 localities, each of which contains 1 backend.
+ ScheduleResponseForBalancer(
+ 0,
+ EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(0, 2, 2),
+ {kLocalityWeight0, kLocalityWeight1}),
+ 0);
+ // Wait for both backends to be ready.
+ WaitForAllBackends(1, 0, 2);
+ // Send kNumRpcs RPCs.
+ CheckRpcSendOk(kNumRpcs);
+ // The locality picking rates should be roughly equal to the expectation.
+ const double locality_picked_rate_0 =
+ static_cast<double>(backends_[0]->backend_service()->request_count()) /
+ kNumRpcs;
+ const double locality_picked_rate_1 =
+ static_cast<double>(backends_[1]->backend_service()->request_count()) /
+ kNumRpcs;
+ const double kErrorTolerance = 0.2;
+ EXPECT_THAT(locality_picked_rate_0,
+ ::testing::AllOf(
+ ::testing::Ge(kLocalityWeightRate0 * (1 - kErrorTolerance)),
+ ::testing::Le(kLocalityWeightRate0 * (1 + kErrorTolerance))));
+ EXPECT_THAT(locality_picked_rate_1,
+ ::testing::AllOf(
+ ::testing::Ge(kLocalityWeightRate1 * (1 - kErrorTolerance)),
+ ::testing::Le(kLocalityWeightRate1 * (1 + kErrorTolerance))));
+ // The EDS service got a single request, and sent a single response.
+ EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
+ EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
+}
+
+TEST_F(SingleBalancerTest, LocalityMapStressTest) {
+ SetNextResolution({}, kDefaultServiceConfig_.c_str());
+ SetNextResolutionForLbChannelAllBalancers();
+ const size_t kNumLocalities = 100;
+ // The first EDS response contains kNumLocalities localities, each of which
+ // contains backend 0.
+ const std::vector<std::vector<int>> locality_list_0(kNumLocalities,
+ {backends_[0]->port()});
+ // The second EDS response contains 1 locality, which contains backend 1.
+ const std::vector<std::vector<int>> locality_list_1 =
+ GetBackendPortsInGroups(1, 2);
+ ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_0),
+ 0);
+ ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_1),
+ 60 * 1000);
+ // Wait until backend 0 is ready, before which kNumLocalities localities are
+ // received and handled by the xds policy.
+ WaitForBackend(0, /*reset_counters=*/false);
+ EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
+ // Wait until backend 1 is ready, before which kNumLocalities localities are
+ // removed by the xds policy.
+ WaitForBackend(1);
+ // The EDS service got a single request, and sent a single response.
+ EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
+ EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
+}
+
TEST_F(SingleBalancerTest, Drop) {
SetNextResolution({}, kDefaultServiceConfig_.c_str());
SetNextResolutionForLbChannelAllBalancers();
@@ -1061,7 +1137,7 @@
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
- GetBackendPortsInGroups(),
+ GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
@@ -1102,7 +1178,7 @@
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
- EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+ EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerHundredForLb}},
FractionalPercent::HUNDRED),
0);
@@ -1142,7 +1218,7 @@
// The EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
- EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+ EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerTenThousandForLb}},
FractionalPercent::TEN_THOUSAND),
0);
@@ -1186,14 +1262,14 @@
// The first EDS response contains one drop category.
ScheduleResponseForBalancer(
0,
- EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+ EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb}}),
0);
// The second EDS response contains two drop categories.
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
- GetBackendPortsInGroups(),
+ GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
5000);
@@ -1279,7 +1355,7 @@
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
- GetBackendPortsInGroups(),
+ GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);
@@ -1472,7 +1548,7 @@
// Return a new balancer that sends a response to drop all calls.
ScheduleResponseForBalancer(
0,
- EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+ EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
{{kLbDropType, 1000000}}),
0);
SetNextResolutionForLbChannelAllBalancers();
@@ -1930,7 +2006,7 @@
ScheduleResponseForBalancer(
0,
EdsServiceImpl::BuildResponse(
- GetBackendPortsInGroups(),
+ GetBackendPortsInGroups(), {},
{{kLbDropType, kDropPerMillionForLb},
{kThrottleDropType, kDropPerMillionForThrottle}}),
0);