Add locality map tests
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
index 4f119ed..468c9d8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
@@ -509,7 +509,7 @@
       OrphanablePtr<LoadBalancingPolicy> child_policy_;
       OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
       RefCountedPtr<PickerWrapper> picker_wrapper_;
-      grpc_connectivity_state connectivity_state_;
+      grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
       uint32_t locality_weight_;
     };
 
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 5d114c0..dda5bdf 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -98,6 +98,7 @@
 constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone";
 constexpr char kLbDropType[] = "lb";
 constexpr char kThrottleDropType[] = "throttle";
+constexpr int kDefaultLocalityWeight = 3;
 
 template <typename ServiceType>
 class CountedService : public ServiceType {
@@ -317,6 +318,7 @@
 
   static DiscoveryResponse BuildResponse(
       const std::vector<std::vector<int>>& backend_ports,
+      const std::vector<int>& lb_weights = {},
       const std::map<grpc::string, uint32_t>& drop_categories = {},
       const FractionalPercent::DenominatorType denominator =
           FractionalPercent::MILLION) {
@@ -324,7 +326,9 @@
     assignment.set_cluster_name("service name");
     for (size_t i = 0; i < backend_ports.size(); ++i) {
       auto* endpoints = assignment.add_endpoints();
-      endpoints->mutable_load_balancing_weight()->set_value(3);
+      const int lb_weight =
+          lb_weights.empty() ? kDefaultLocalityWeight : lb_weights[i];
+      endpoints->mutable_load_balancing_weight()->set_value(lb_weight);
       endpoints->set_priority(0);
       endpoints->mutable_locality()->set_region(kDefaultLocalityRegion);
       endpoints->mutable_locality()->set_zone(kDefaultLocalityZone);
@@ -620,11 +624,14 @@
     return std::make_tuple(num_ok, num_failure, num_drops);
   }
 
-  void WaitForBackend(size_t backend_idx) {
+  void WaitForBackend(size_t backend_idx, bool reset_counters = true) {
+    gpr_log(GPR_INFO,
+            "========= WAITING FOR BACKEND %lu ==========", backend_idx);
     do {
       (void)SendRpc();
     } while (backends_[backend_idx]->backend_service()->request_count() == 0);
-    ResetBackendCounters();
+    if (reset_counters) ResetBackendCounters();
+    gpr_log(GPR_INFO, "========= BACKEND %lu READY ==========", backend_idx);
   }
 
   grpc_core::ServerAddressList CreateLbAddressesFromPortList(
@@ -1047,6 +1054,75 @@
   EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
 }
 
+TEST_F(SingleBalancerTest, LocalityMapWeightedRoundRobin) {
+  SetNextResolution({}, kDefaultServiceConfig_.c_str());
+  SetNextResolutionForLbChannelAllBalancers();
+  const size_t kNumRpcs = 5000;
+  const int kLocalityWeight0 = 2;
+  const int kLocalityWeight1 = 8;
+  const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
+  const double kLocalityWeightRate0 =
+      static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
+  const double kLocalityWeightRate1 =
+      static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
+  // EDS response contains 2 localities, each of which contains 1 backend.
+  ScheduleResponseForBalancer(
+      0,
+      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(0, 2, 2),
+                                    {kLocalityWeight0, kLocalityWeight1}),
+      0);
+  // Wait for both backends to be ready.
+  WaitForAllBackends(1, 0, 2);
+  // Send kNumRpcs RPCs.
+  CheckRpcSendOk(kNumRpcs);
+  // The locality picking rates should be roughly equal to the expectation.
+  const double locality_picked_rate_0 =
+      static_cast<double>(backends_[0]->backend_service()->request_count()) /
+      kNumRpcs;
+  const double locality_picked_rate_1 =
+      static_cast<double>(backends_[1]->backend_service()->request_count()) /
+      kNumRpcs;
+  const double kErrorTolerance = 0.2;
+  EXPECT_THAT(locality_picked_rate_0,
+              ::testing::AllOf(
+                  ::testing::Ge(kLocalityWeightRate0 * (1 - kErrorTolerance)),
+                  ::testing::Le(kLocalityWeightRate0 * (1 + kErrorTolerance))));
+  EXPECT_THAT(locality_picked_rate_1,
+              ::testing::AllOf(
+                  ::testing::Ge(kLocalityWeightRate1 * (1 - kErrorTolerance)),
+                  ::testing::Le(kLocalityWeightRate1 * (1 + kErrorTolerance))));
+  // The EDS service got a single request, and sent a single response.
+  EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
+  EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count());
+}
+
+TEST_F(SingleBalancerTest, LocalityMapStressTest) {
+  SetNextResolution({}, kDefaultServiceConfig_.c_str());
+  SetNextResolutionForLbChannelAllBalancers();
+  const size_t kNumLocalities = 100;
+  // The first EDS response contains kNumLocalities localities, each of which
+  // contains backend 0.
+  const std::vector<std::vector<int>> locality_list_0(kNumLocalities,
+                                                      {backends_[0]->port()});
+  // The second EDS response contains 1 locality, which contains backend 1.
+  const std::vector<std::vector<int>> locality_list_1 =
+      GetBackendPortsInGroups(1, 2);
+  ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_0),
+                              0);
+  ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(locality_list_1),
+                              60 * 1000);
+  // Wait until backend 0 is ready, before which kNumLocalities localities are
+  // received and handled by the xds policy.
+  WaitForBackend(0, /*reset_counters=*/false);
+  EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
+  // Wait until backend 1 is ready, before which kNumLocalities localities are
+  // removed by the xds policy.
+  WaitForBackend(1);
+  // The EDS service got a single request, and sent a single response.
+  EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count());
+  EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count());
+}
+
 TEST_F(SingleBalancerTest, Drop) {
   SetNextResolution({}, kDefaultServiceConfig_.c_str());
   SetNextResolutionForLbChannelAllBalancers();
@@ -1061,7 +1137,7 @@
   ScheduleResponseForBalancer(
       0,
       EdsServiceImpl::BuildResponse(
-          GetBackendPortsInGroups(),
+          GetBackendPortsInGroups(), {},
           {{kLbDropType, kDropPerMillionForLb},
            {kThrottleDropType, kDropPerMillionForThrottle}}),
       0);
@@ -1102,7 +1178,7 @@
   // The EDS response contains one drop category.
   ScheduleResponseForBalancer(
       0,
-      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
                                     {{kLbDropType, kDropPerHundredForLb}},
                                     FractionalPercent::HUNDRED),
       0);
@@ -1142,7 +1218,7 @@
   // The EDS response contains one drop category.
   ScheduleResponseForBalancer(
       0,
-      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
                                     {{kLbDropType, kDropPerTenThousandForLb}},
                                     FractionalPercent::TEN_THOUSAND),
       0);
@@ -1186,14 +1262,14 @@
   // The first EDS response contains one drop category.
   ScheduleResponseForBalancer(
       0,
-      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
                                     {{kLbDropType, kDropPerMillionForLb}}),
       0);
   // The second EDS response contains two drop categories.
   ScheduleResponseForBalancer(
       0,
       EdsServiceImpl::BuildResponse(
-          GetBackendPortsInGroups(),
+          GetBackendPortsInGroups(), {},
           {{kLbDropType, kDropPerMillionForLb},
            {kThrottleDropType, kDropPerMillionForThrottle}}),
       5000);
@@ -1279,7 +1355,7 @@
   ScheduleResponseForBalancer(
       0,
       EdsServiceImpl::BuildResponse(
-          GetBackendPortsInGroups(),
+          GetBackendPortsInGroups(), {},
           {{kLbDropType, kDropPerMillionForLb},
            {kThrottleDropType, kDropPerMillionForThrottle}}),
       0);
@@ -1472,7 +1548,7 @@
   // Return a new balancer that sends a response to drop all calls.
   ScheduleResponseForBalancer(
       0,
-      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(),
+      EdsServiceImpl::BuildResponse(GetBackendPortsInGroups(), {},
                                     {{kLbDropType, 1000000}}),
       0);
   SetNextResolutionForLbChannelAllBalancers();
@@ -1930,7 +2006,7 @@
   ScheduleResponseForBalancer(
       0,
       EdsServiceImpl::BuildResponse(
-          GetBackendPortsInGroups(),
+          GetBackendPortsInGroups(), {},
           {{kLbDropType, kDropPerMillionForLb},
            {kThrottleDropType, kDropPerMillionForThrottle}}),
       0);