Merge pull request #22330 from markdroth/xds_locality_map_update_fix

If an EDS update replaces all localities in a priority, go into CONNECTING
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 2079938..232a5d2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -298,7 +298,7 @@
     ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); }
 
     void UpdateLocked(
-        const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+        const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
         bool update_locality_stats);
     void ResetBackoffLocked();
     void UpdateXdsPickerLocked();
@@ -1033,7 +1033,7 @@
 }
 
 void XdsLb::LocalityMap::UpdateLocked(
-    const XdsApi::PriorityListUpdate::LocalityMap& locality_map_update,
+    const XdsApi::PriorityListUpdate::LocalityMap& priority_update,
     bool update_locality_stats) {
   if (xds_policy_->shutting_down_) return;
   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@@ -1043,11 +1043,11 @@
   // Maybe reactivate the locality map in case all the active locality maps have
   // failed.
   MaybeReactivateLocked();
-  // Remove (later) the localities not in locality_map_update.
+  // Remove (later) the localities not in priority_update.
   for (auto iter = localities_.begin(); iter != localities_.end();) {
     const auto& name = iter->first;
     Locality* locality = iter->second.get();
-    if (locality_map_update.Contains(name)) {
+    if (priority_update.Contains(name)) {
       ++iter;
       continue;
     }
@@ -1058,8 +1058,8 @@
       ++iter;
     }
   }
-  // Add or update the localities in locality_map_update.
-  for (const auto& p : locality_map_update.localities) {
+  // Add or update the localities in priority_update.
+  for (const auto& p : priority_update.localities) {
     const auto& name = p.first;
     const auto& locality_update = p.second;
     OrphanablePtr<Locality>& locality = localities_[name];
@@ -1079,6 +1079,32 @@
     locality->UpdateLocked(locality_update.lb_weight,
                            locality_update.serverlist, update_locality_stats);
   }
+  // If this is the current priority and we removed all of the READY
+  // localities, go into state CONNECTING.
+  // TODO(roth): Ideally, we should model this as a graceful policy
+  // switch: we should keep using the old localities for a short period
+  // of time, long enough to give the new localities a chance to get
+  // connected.  As part of refactoring this policy, we should try to
+  // fix that.
+  if (priority_ == xds_policy()->current_priority_) {
+    bool found_ready = false;
+    for (auto& p : localities_) {
+      const auto& locality_name = p.first;
+      Locality* locality = p.second.get();
+      if (!locality_map_update()->Contains(locality_name)) continue;
+      if (locality->connectivity_state() == GRPC_CHANNEL_READY) {
+        found_ready = true;
+        break;
+      }
+    }
+    if (!found_ready) {
+      xds_policy_->channel_control_helper()->UpdateState(
+          GRPC_CHANNEL_CONNECTING,
+          absl::make_unique<QueuePicker>(
+              xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")));
+      xds_policy_->current_priority_ = UINT32_MAX;
+    }
+  }
 }
 
 void XdsLb::LocalityMap::ResetBackoffLocked() {
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index bf173df..1a89f62 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -1209,11 +1209,16 @@
     return std::make_tuple(num_ok, num_failure, num_drops);
   }
 
-  void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
+  void WaitForBackend(size_t backend_idx, bool reset_counters = true,
+                      bool require_success = false) {
     gpr_log(GPR_INFO, "========= WAITING FOR BACKEND %lu ==========",
             static_cast<unsigned long>(backend_idx));
     do {
-      (void)SendRpc();
+      Status status = SendRpc();
+      if (require_success) {
+        EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
+                                 << " message=" << status.error_message();
+      }
     } while (backends_[backend_idx]->backend_service()->request_count() == 0);
     if (reset_counters) ResetBackendCounters();
     gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========",
@@ -2304,6 +2309,31 @@
   delayed_resource_setter.join();
 }
 
+// Tests that we don't fail RPCs when replacing all of the localities in
+// a given priority.
+TEST_P(LocalityMapTest, ReplaceAllLocalitiesInPriority) {
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", GetBackendPorts(0, 1)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      AdsServiceImpl::BuildEdsResource(args), kDefaultResourceName);
+  args = AdsServiceImpl::EdsResourceArgs({
+      {"locality1", GetBackendPorts(1, 2)},
+  });
+  std::thread delayed_resource_setter(std::bind(
+      &BasicTest::SetEdsResourceWithDelay, this, 0,
+      AdsServiceImpl::BuildEdsResource(args), 5000, kDefaultResourceName));
+  // Wait for the first backend to be ready.
+  WaitForBackend(0);
+  // Keep sending RPCs until we switch over to backend 1, which tells us
+  // that we received the update.  No RPCs should fail during this
+  // transition.
+  WaitForBackend(1, /*reset_counters=*/true, /*require_success=*/true);
+  delayed_resource_setter.join();
+}
+
 class FailoverTest : public BasicTest {
  public:
   void SetUp() override {