Merge pull request #22330 from markdroth/xds_locality_map_update_fix
If an EDS update replaces all localities in a priority, go into CONNECTING
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 2079938..232a5d2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -298,7 +298,7 @@
~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
void UpdateLocked(
- const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+ const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
bool update_locality_stats);
void ResetBackoffLocked();
void UpdateXdsPickerLocked();
@@ -1033,7 +1033,7 @@
}
void XdsLb::LocalityMap::UpdateLocked(
- const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+ const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
bool update_locality_stats) {
if (xds_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@@ -1043,11 +1043,11 @@
// Maybe reactivate the locality map in case all the active locality maps have
// failed.
MaybeReactivateLocked();
- // Remove (later) the localities not in locality_map_update.
+ // Remove (later) the localities not in priority_update.
for (auto iter = localities_.begin(); iter != localities_.end();) {
const auto& name = iter->first;
Locality* locality = iter->second.get();
- if (locality_map_update.Contains(name)) {
+ if (priority_update.Contains(name)) {
++iter;
continue;
}
@@ -1058,8 +1058,8 @@
++iter;
}
}
- // Add or update the localities in locality_map_update.
- for (const auto& p : locality_map_update.localities) {
+ // Add or update the localities in priority_update.
+ for (const auto& p : priority_update.localities) {
const auto& name = p.first;
const auto& locality_update = p.second;
OrphanablePtr<Locality>& locality = localities_[name];
@@ -1079,6 +1079,32 @@
locality->UpdateLocked(locality_update.lb_weight,
locality_update.serverlist, update_locality_stats);
}
+ // If this is the current priority and we removed all of the READY
+ // localities, go into state CONNECTING.
+ // TODO(roth): Ideally, we should model this as a graceful policy
+ // switch: we should keep using the old localities for a short period
+ // of time, long enough to give the new localities a chance to get
+ // connected. As part of refactoring this policy, we should try to
+ // fix that.
+ if (priority_ == xds_policy()->current_priority_) {
+ bool found_ready = false;
+ for (auto& p : localities_) {
+ const auto& locality_name = p.first;
+ Locality* locality = p.second.get();
+ if (!locality_map_update()->Contains(locality_name)) continue;
+ if (locality->connectivity_state() == GRPC_CHANNEL_READY) {
+ found_ready = true;
+ break;
+ }
+ }
+ if (!found_ready) {
+ xds_policy_->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_CONNECTING,
+ absl::make_unique<QueuePicker>(
+ xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")));
+ xds_policy_->current_priority_ = UINT32_MAX;
+ }
+ }
}
void XdsLb::LocalityMap::ResetBackoffLocked() {
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index bf173df..1a89f62 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -1209,11 +1209,16 @@
return std::make_tuple(num_ok, num_failure, num_drops);
}
- void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
+ void WaitForBackend(size_t backend_idx, bool reset_counters = true,
+ bool require_success = false) {
gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
static_cast<unsigned long>(backend_idx));
do {
- (void)SendRpc();
+ Status status = SendRpc();
+ if (require_success) {
+ EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+ << " message=" << status.error_message();
+ }
} while (backends_[backend_idx]->backend_service()->request_count() == 0);
if (reset_counters) ResetBackendCounters();
gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
@@ -2304,6 +2309,31 @@
delayed_resource_setter.join();
}
+// Tests that we don't fail RPCs when replacing all of the localities in
+// a given priority.
+TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", GetBackendPorts(0, 1)},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
+ args = AdsServiceImpl::EdsResourceArgs({
+ {"locality1", GetBackendPorts(1, 2)},
+ });
+ std::thread delayed_resource_setter(std::bind(
+ &BasicTest::SetEdsResourceWithDelay, this, 0,
+ AdsServiceImpl::BuildEdsResource(args), 5000, kDefaultResourceName));
+ // Wait for the first backend to be ready.
+ WaitForBackend(0);
+ // Keep sending RPCs until we switch over to backend 1, which tells us
+ // that we received the update. No RPCs should fail during this
+ // transition.
+ WaitForBackend(1, /*reset_counters=*/true, /*require_success=*/true);
+ delayed_resource_setter.join();
+}
+
class FailoverTest : public BasicTest {
public:
void SetUp() override {