ring_hash LB policy implementation (#26285)

* ring_hash LB policy (#25697)

* Ring Hash Policy implementation

* Code review comment fixing

* Fixing code review comments.

* Code review comment fixing

* Fixing reconnect logic

* adding helper method for pick

* Holding on to ref to parent

* first attempt at calling AttemptToConnect

* Fixing state change

* Fixing code review comments

* Fixing the reconnect from channel watcher code

* Fixing the BUILD to include new policy

* Fixing major code review suggestion

* Fixing code review comments

* Fixing code review suggestions

* Initial 2 tests.

* Adding channel id case.

* Fixing code review comments.

* Small change to get the spread of backends

* Add header hashing tests

* Added more tests and debugging

* Fixing Header hash

* Added more tests

* cleanup

* removing debugs

* Fixing code review comments.

* code review fixing

* combining code and match design

* fixing code review comments.

* Fixed IDLE case

* Moving tests

* Fixing code review comments

* Adding more tests according to code review comments.

* Added tests with differetn types of weights

* Adding terminal policy case

* Remove hash_func as there is only 1

* Added nack invalid hash function

* Added NACK cases

* fixing build error

* fixing build

* small warning

* adding regex test

* Adding policy tests

* fixing warning

* fixing warning

* fixing code reivew comments.

* fixing IDLE case

* Code review comments.

* fixing code review comments

* Making a helper function

* fixing reattempt case

* Added afew more tests.

* Adding more tests

* Added backward compatible test

* FIxing the reattempt test

* Clean up

* fixing clang error

* fixing clang error

* Fix logic discovered during code review

* code review comments

* code review comments

* code review comment

* clean up tests

* fixing code review comments

* clean up tests

* Separated test

* Fixing test

* fixing test

* fixing clang error

* Addressing code review suggestions

* Fixing last bit of code review comments

* Fixing flaky tests

* Fixing last bit of code review comments

* clean debugs

* Remove a verbose log

* Relaxing deadline exceeded for 1st RPC until ring is optimized.
Making Hash more efficient for random case.
diff --git a/BUILD b/BUILD
index 34226c8..af81334 100644
--- a/BUILD
+++ b/BUILD
@@ -1105,6 +1105,7 @@
         "grpc_client_authority_filter",
         "grpc_lb_policy_pick_first",
         "grpc_lb_policy_priority",
+        "grpc_lb_policy_ring_hash",
         "grpc_lb_policy_round_robin",
         "grpc_lb_policy_weighted_target",
         "grpc_client_idle_filter",
@@ -1544,6 +1545,7 @@
         "grpc_base",
         "grpc_client_channel",
         "grpc_lb_address_filtering",
+        "grpc_lb_policy_ring_hash",
         "grpc_lb_xds_channel_args",
         "grpc_lb_xds_common",
         "grpc_resolver_fake",
@@ -1636,6 +1638,10 @@
     hdrs = [
         "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h",
     ],
+    external_deps = [
+        "absl/strings",
+        "xxhash",
+    ],
     language = "c++",
     deps = [
         "grpc_base",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5887f75..2e10b03 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2326,6 +2326,7 @@
   src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
   src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
   src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
+  src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
   src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
   src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
   src/core/ext/filters/client_channel/lb_policy_registry.cc
diff --git a/Makefile b/Makefile
index 5abbc89..c91e53a 100644
--- a/Makefile
+++ b/Makefile
@@ -1735,6 +1735,7 @@
     src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \
     src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
     src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
+    src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
     src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
     src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
     src/core/ext/filters/client_channel/lb_policy_registry.cc \
@@ -2669,7 +2670,6 @@
 # installing headers to their final destination on the drive. We need this
 # otherwise parallel compilation will fail if a source is compiled first.
 src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
-src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
 src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 31c9d54..7514911 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -1618,6 +1618,7 @@
   - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
   - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
   - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+  - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
   - src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
   - src/core/ext/filters/client_channel/lb_policy_factory.h
   - src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -1828,6 +1829,7 @@
   - src/core/lib/transport/transport.h
   - src/core/lib/transport/transport_impl.h
   - src/core/lib/uri/uri_parser.h
+  - third_party/xxhash/xxhash.h
   src:
   - src/core/ext/filters/census/grpc_context.cc
   - src/core/ext/filters/client_channel/backend_metric.cc
@@ -1854,6 +1856,7 @@
   - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
   - src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
   - src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
+  - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
   - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
   - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
   - src/core/ext/filters/client_channel/lb_policy_registry.cc
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index bdb54a0..1aa0719 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -75,6 +75,7 @@
     in DEBUG)
   - priority_lb - traces priority LB policy
   - resource_quota - trace resource quota objects internals
+  - ring_hash_lb - traces the ring hash load balancing policy
   - round_robin - traces the round_robin load balancing policy
   - queue_pluck
   - server_channel - lightweight trace of significant server channel events
diff --git a/grpc.gyp b/grpc.gyp
index 2418290..b717f76 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1122,6 +1122,7 @@
         'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc',
         'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
         'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
+        'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
         'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
         'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
         'src/core/ext/filters/client_channel/lb_policy_registry.cc',
diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
index 921bd2e..ad001d8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
@@ -16,8 +16,767 @@
 
 #include <grpc/support/port_platform.h>
 
+#include <stdlib.h>
+#include <string.h>
+
+#include "absl/strings/numbers.h"
+#include "absl/strings/str_cat.h"
+#define XXH_INLINE_ALL
+#include "xxhash.h"
+
+#include <grpc/support/alloc.h>
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/static_metadata.h"
+
 namespace grpc_core {
 
 const char* kRequestRingHashAttribute = "request_ring_hash";
+TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
+
+// Helper Parser method
+void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
+                           size_t* max_ring_size,
+                           std::vector<grpc_error_handle>* error_list) {
+  *min_ring_size = 1024;
+  *max_ring_size = 8388608;
+  if (json.type() != Json::Type::OBJECT) {
+    error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "ring_hash_experimental should be of type object"));
+    return;
+  }
+  const Json::Object& ring_hash = json.object_value();
+  auto ring_hash_it = ring_hash.find("min_ring_size");
+  if (ring_hash_it != ring_hash.end()) {
+    if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+      error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:min_ring_size error: should be of type number"));
+    } else {
+      *min_ring_size = gpr_parse_nonnegative_int(
+          ring_hash_it->second.string_value().c_str());
+    }
+  }
+  ring_hash_it = ring_hash.find("max_ring_size");
+  if (ring_hash_it != ring_hash.end()) {
+    if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+      error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "field:max_ring_size error: should be of type number"));
+    } else {
+      *max_ring_size = gpr_parse_nonnegative_int(
+          ring_hash_it->second.string_value().c_str());
+    }
+  }
+  if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
+      *max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
+    error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "field:max_ring_size and or min_ring_size error: "
+        "values need to be in the range of 1 to 8388608 "
+        "and max_ring_size cannot be smaller than "
+        "min_ring_size"));
+  }
+}
+
+namespace {
+
+constexpr char kRingHash[] = "ring_hash_experimental";
+
+class RingHashLbConfig : public LoadBalancingPolicy::Config {
+ public:
+  RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
+      : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
+  const char* name() const override { return kRingHash; }
+  size_t min_ring_size() const { return min_ring_size_; }
+  size_t max_ring_size() const { return max_ring_size_; }
+
+ private:
+  size_t min_ring_size_;
+  size_t max_ring_size_;
+};
+
+//
+// ring_hash LB policy
+//
+class RingHash : public LoadBalancingPolicy {
+ public:
+  explicit RingHash(Args args);
+
+  const char* name() const override { return kRingHash; }
+
+  void UpdateLocked(UpdateArgs args) override;
+  void ResetBackoffLocked() override;
+
+ private:
+  ~RingHash() override;
+
+  // Forward declaration.
+  class RingHashSubchannelList;
+
+  // Data for a particular subchannel in a subchannel list.
+  // This subclass adds the following functionality:
+  // - Tracks the previous connectivity state of the subchannel, so that
+  //   we know how many subchannels are in each state.
+  class RingHashSubchannelData
+      : public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
+   public:
+    RingHashSubchannelData(
+        SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
+            subchannel_list,
+        const ServerAddress& address,
+        RefCountedPtr<SubchannelInterface> subchannel)
+        : SubchannelData(subchannel_list, address, std::move(subchannel)),
+          address_(address) {}
+
+    grpc_connectivity_state connectivity_state() const {
+      return last_connectivity_state_;
+    }
+    const ServerAddress& address() const { return address_; }
+
+    bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
+
+    // Performs connectivity state updates that need to be done both when we
+    // first start watching and when a watcher notification is received.
+    void UpdateConnectivityStateLocked(
+        grpc_connectivity_state connectivity_state);
+
+   private:
+    // Performs connectivity state updates that need to be done only
+    // after we have started watching.
+    void ProcessConnectivityChangeLocked(
+        grpc_connectivity_state connectivity_state) override;
+
+    ServerAddress address_;
+    grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
+    bool seen_failure_since_ready_ = false;
+  };
+
+  // A list of subchannels.
+  class RingHashSubchannelList
+      : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
+   public:
+    RingHashSubchannelList(RingHash* policy, TraceFlag* tracer,
+                           ServerAddressList addresses,
+                           const grpc_channel_args& args)
+        : SubchannelList(policy, tracer, std::move(addresses),
+                         policy->channel_control_helper(), args) {
+      // Need to maintain a ref to the LB policy as long as we maintain
+      // any references to subchannels, since the subchannels'
+      // pollset_sets will include the LB policy's pollset_set.
+      policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+    }
+
+    ~RingHashSubchannelList() override {
+      RingHash* p = static_cast<RingHash*>(policy());
+      p->Unref(DEBUG_LOCATION, "subchannel_list");
+    }
+
+    // Starts watching the subchannels in this list.
+    void StartWatchingLocked();
+
+    // Updates the counters of subchannels in each state when a
+    // subchannel transitions from old_state to new_state.
+    void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+                                   grpc_connectivity_state new_state);
+
+    // Updates the RH policy's connectivity state based on the
+    // subchannel list's state counters, creating new picker and new ring.
+    // Furthermore, return a bool indicating whether the aggregated state is
+    // Transient Failure.
+    bool UpdateRingHashConnectivityStateLocked();
+
+   private:
+    size_t num_idle_ = 0;
+    size_t num_ready_ = 0;
+    size_t num_connecting_ = 0;
+    size_t num_transient_failure_ = 0;
+  };
+
+  class Picker : public SubchannelPicker {
+   public:
+    Picker(RefCountedPtr<RingHash> parent,
+           RingHashSubchannelList* subchannel_list);
+
+    PickResult Pick(PickArgs args) override;
+
+   private:
+    struct RingEntry {
+      uint64_t hash;
+      RefCountedPtr<SubchannelInterface> subchannel;
+      grpc_connectivity_state connectivity_state;
+    };
+
+    // A fire-and-forget class that schedules subchannel connection attempts
+    // on the control plane WorkSerializer.
+    class SubchannelConnectionAttempter : public Orphanable {
+     public:
+      explicit SubchannelConnectionAttempter(
+          RefCountedPtr<RingHash> ring_hash_lb)
+          : ring_hash_lb_(std::move(ring_hash_lb)) {
+        GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
+      }
+
+      void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
+        subchannels_.push_back(std::move(subchannel));
+      }
+
+      void Orphan() override {
+        // Hop into ExecCtx, so that we're not holding the data plane mutex
+        // while we run control-plane code.
+        ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
+      }
+
+     private:
+      static void RunInExecCtx(void* arg, grpc_error* /*error*/) {
+        auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
+        self->ring_hash_lb_->work_serializer()->Run(
+            [self]() {
+              if (!self->ring_hash_lb_->shutdown_) {
+                for (auto& subchannel : self->subchannels_) {
+                  subchannel->AttemptToConnect();
+                }
+              }
+              delete self;
+            },
+            DEBUG_LOCATION);
+      }
+
+      RefCountedPtr<RingHash> ring_hash_lb_;
+      grpc_closure closure_;
+      absl::InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
+    };
+
+    RefCountedPtr<RingHash> parent_;
+
+    // A ring of subchannels.
+    std::vector<RingEntry> ring_;
+  };
+
+  void ShutdownLocked() override;
+
+  // Current config from resolver.
+  RefCountedPtr<RingHashLbConfig> config_;
+
+  // list of subchannels.
+  OrphanablePtr<RingHashSubchannelList> subchannel_list_;
+  // indicating if we are shutting down.
+  bool shutdown_ = false;
+};
+
+//
+// RingHash::Picker
+//
+
+RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
+                         RingHashSubchannelList* subchannel_list)
+    : parent_(std::move(parent)) {
+  size_t num_subchannels = subchannel_list->num_subchannels();
+  // Store the weights while finding the sum.
+  struct AddressWeight {
+    std::string address;
+    // Default weight is 1 for the cases where a weight is not provided,
+    // each occurrence of the address will be counted a weight value of 1.
+    uint32_t weight = 1;
+    double normalized_weight;
+  };
+  std::vector<AddressWeight> address_weights;
+  size_t sum = 0;
+  address_weights.reserve(num_subchannels);
+  for (size_t i = 0; i < num_subchannels; ++i) {
+    RingHashSubchannelData* sd = subchannel_list->subchannel(i);
+    const ServerAddressWeightAttribute* weight_attribute = static_cast<
+        const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
+        ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+    AddressWeight address_weight;
+    address_weight.address =
+        grpc_sockaddr_to_string(&sd->address().address(), false);
+    if (weight_attribute != nullptr) {
+      GPR_ASSERT(weight_attribute->weight() != 0);
+      address_weight.weight = weight_attribute->weight();
+    }
+    sum += address_weight.weight;
+    address_weights.push_back(std::move(address_weight));
+  }
+  // Calculating normalized weights and find min and max.
+  double min_normalized_weight = 1.0;
+  double max_normalized_weight = 0.0;
+  for (auto& address : address_weights) {
+    address.normalized_weight = static_cast<double>(address.weight) / sum;
+    min_normalized_weight =
+        std::min(address.normalized_weight, min_normalized_weight);
+    max_normalized_weight =
+        std::max(address.normalized_weight, max_normalized_weight);
+  }
+  // Scale up the number of hashes per host such that the least-weighted host
+  // gets a whole number of hashes on the ring. Other hosts might not end up
+  // with whole numbers, and that's fine (the ring-building algorithm below can
+  // handle this). This preserves the original implementation's behavior: when
+  // weights aren't provided, all hosts should get an equal number of hashes. In
+  // the case where this number exceeds the max_ring_size, it's scaled back down
+  // to fit.
+  const size_t min_ring_size = parent_->config_->min_ring_size();
+  const size_t max_ring_size = parent_->config_->max_ring_size();
+  const double scale = std::min(
+      std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
+      static_cast<double>(max_ring_size));
+  // Reserve memory for the entire ring up front.
+  const uint64_t ring_size = std::ceil(scale);
+  ring_.reserve(ring_size);
+  // Populate the hash ring by walking through the (host, weight) pairs in
+  // normalized_host_weights, and generating (scale * weight) hashes for each
+  // host. Since these aren't necessarily whole numbers, we maintain running
+  // sums -- current_hashes and target_hashes -- which allows us to populate the
+  // ring in a mostly stable way.
+  absl::InlinedVector<char, 196> hash_key_buffer;
+  double current_hashes = 0.0;
+  double target_hashes = 0.0;
+  uint64_t min_hashes_per_host = ring_size;
+  uint64_t max_hashes_per_host = 0;
+  for (size_t i = 0; i < num_subchannels; ++i) {
+    const std::string& address_string = address_weights[i].address;
+    hash_key_buffer.assign(address_string.begin(), address_string.end());
+    hash_key_buffer.emplace_back('_');
+    auto offset_start = hash_key_buffer.end();
+    target_hashes += scale * address_weights[i].normalized_weight;
+    size_t count = 0;
+    auto current_state =
+        subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState();
+    while (current_hashes < target_hashes) {
+      const std::string count_str = absl::StrCat(count);
+      hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
+      absl::string_view hash_key(hash_key_buffer.data(),
+                                 hash_key_buffer.size());
+      const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
+      ring_.push_back({hash,
+                       subchannel_list->subchannel(i)->subchannel()->Ref(),
+                       current_state});
+      ++count;
+      ++current_hashes;
+      hash_key_buffer.erase(offset_start, hash_key_buffer.end());
+    }
+    min_hashes_per_host =
+        std::min(static_cast<uint64_t>(i), min_hashes_per_host);
+    max_hashes_per_host =
+        std::max(static_cast<uint64_t>(i), max_hashes_per_host);
+  }
+  std::sort(ring_.begin(), ring_.end(),
+            [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
+              return lhs.hash < rhs.hash;
+            });
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(GPR_INFO,
+            "[RH %p picker %p] created picker from subchannel_list=%p "
+            "with %" PRIuPTR " ring entries",
+            parent_.get(), this, subchannel_list, ring_.size());
+    // for (const auto& r : ring_) {
+    // gpr_log(GPR_INFO, "donn ring hash: %" PRIx64 " subchannel: %p state: %d",
+    //         r.hash, r.subchannel.get(), r.connectivity_state);
+    //}
+  }
+}
+
+RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
+  PickResult result;
+  // Initialize to PICK_FAILED.
+  result.type = PickResult::PICK_FAILED;
+  auto hash =
+      args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
+  uint64_t h;
+  if (!absl::SimpleAtoi(hash, &h)) {
+    result.error = grpc_error_set_int(
+        GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+            absl::StrCat("xds ring hash value is not a number").c_str()),
+        GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
+    return result;
+  }
+  // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
+  // (ketama_get_server) NOTE: The algorithm depends on using signed integers
+  // for lowp, highp, and first_index. Do not change them!
+  int64_t lowp = 0;
+  int64_t highp = ring_.size();
+  int64_t first_index = 0;
+  while (true) {
+    first_index = (lowp + highp) / 2;
+    if (first_index == static_cast<int64_t>(ring_.size())) {
+      first_index = 0;
+      break;
+    }
+    uint64_t midval = ring_[first_index].hash;
+    uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash;
+    if (h <= midval && h > midval1) {
+      break;
+    }
+    if (midval < h) {
+      lowp = first_index + 1;
+    } else {
+      highp = first_index - 1;
+    }
+    if (lowp > highp) {
+      first_index = 0;
+      break;
+    }
+  }
+  OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
+  auto ScheduleSubchannelConnectionAttempt =
+      [&](RefCountedPtr<SubchannelInterface> subchannel) {
+        if (subchannel_connection_attempter == nullptr) {
+          subchannel_connection_attempter =
+              MakeOrphanable<SubchannelConnectionAttempter>(parent_);
+        }
+        subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
+      };
+  switch (ring_[first_index].connectivity_state) {
+    case GRPC_CHANNEL_READY:
+      result.type = PickResult::PICK_COMPLETE;
+      result.subchannel = ring_[first_index].subchannel;
+      return result;
+    case GRPC_CHANNEL_IDLE:
+      ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
+      // fallthrough
+    case GRPC_CHANNEL_CONNECTING:
+      result.type = PickResult::PICK_QUEUE;
+      return result;
+    default:  // GRPC_CHANNEL_TRANSIENT_FAILURE
+      break;
+  }
+  ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
+  // Loop through remaining subchannels to find one in READY.
+  // On the way, we make sure the right set of connection attempts
+  // will happen.
+  bool found_second_subchannel = false;
+  bool found_first_non_failed = false;
+  for (size_t i = 1; i < ring_.size(); ++i) {
+    const RingEntry& entry = ring_[(first_index + i) % ring_.size()];
+    if (entry.subchannel == ring_[first_index].subchannel) {
+      continue;
+    }
+    if (entry.connectivity_state == GRPC_CHANNEL_READY) {
+      result.type = PickResult::PICK_COMPLETE;
+      result.subchannel = entry.subchannel;
+      return result;
+    }
+    if (!found_second_subchannel) {
+      switch (entry.connectivity_state) {
+        case GRPC_CHANNEL_IDLE:
+          ScheduleSubchannelConnectionAttempt(entry.subchannel);
+          // fallthrough
+        case GRPC_CHANNEL_CONNECTING:
+          result.type = PickResult::PICK_QUEUE;
+          return result;
+        default:
+          break;
+      }
+      found_second_subchannel = true;
+    }
+    if (!found_first_non_failed) {
+      if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+        ScheduleSubchannelConnectionAttempt(entry.subchannel);
+      } else {
+        if (entry.connectivity_state == GRPC_CHANNEL_IDLE) {
+          ScheduleSubchannelConnectionAttempt(entry.subchannel);
+        }
+        found_first_non_failed = true;
+      }
+    }
+  }
+  result.error =
+      grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+                             absl::StrCat("xds ring hash found a subchannel "
+                                          "that is in TRANSIENT_FAILURE state")
+                                 .c_str()),
+                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
+  return result;
+}
+
+//
+// RingHash::RingHashSubchannelList
+//
+
+void RingHash::RingHashSubchannelList::StartWatchingLocked() {
+  if (num_subchannels() == 0) return;
+  // Check current state of each subchannel synchronously.
+  for (size_t i = 0; i < num_subchannels(); ++i) {
+    grpc_connectivity_state state =
+        subchannel(i)->CheckConnectivityStateLocked();
+    subchannel(i)->UpdateConnectivityStateLocked(state);
+  }
+  // Start connectivity watch for each subchannel.
+  for (size_t i = 0; i < num_subchannels(); i++) {
+    if (subchannel(i)->subchannel() != nullptr) {
+      subchannel(i)->StartConnectivityWatchLocked();
+    }
+  }
+  RingHash* p = static_cast<RingHash*>(policy());
+  // Sending up the initial picker while all subchannels are in IDLE state.
+  p->channel_control_helper()->UpdateState(
+      GRPC_CHANNEL_READY, absl::Status(),
+      absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+                                this));
+}
+
+void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
+    grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
+  GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+  if (old_state == GRPC_CHANNEL_IDLE) {
+    GPR_ASSERT(num_idle_ > 0);
+    --num_idle_;
+  } else if (old_state == GRPC_CHANNEL_READY) {
+    GPR_ASSERT(num_ready_ > 0);
+    --num_ready_;
+  } else if (old_state == GRPC_CHANNEL_CONNECTING) {
+    GPR_ASSERT(num_connecting_ > 0);
+    --num_connecting_;
+  } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    GPR_ASSERT(num_transient_failure_ > 0);
+    --num_transient_failure_;
+  }
+  if (new_state == GRPC_CHANNEL_IDLE) {
+    ++num_idle_;
+  } else if (new_state == GRPC_CHANNEL_READY) {
+    ++num_ready_;
+  } else if (new_state == GRPC_CHANNEL_CONNECTING) {
+    ++num_connecting_;
+  } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    ++num_transient_failure_;
+  }
+}
+
+// Sets the RH policy's connectivity state and generates a new picker based
+// on the current subchannel list or requests an re-attempt by returning true..
+bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
+  RingHash* p = static_cast<RingHash*>(policy());
+  // Only set connectivity state if this is the current subchannel list.
+  if (p->subchannel_list_.get() != this) return false;
+  // The overall aggregation rules here are:
+  // 1. If there is at least one subchannel in READY state, report READY.
+  // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
+  // TRANSIENT_FAILURE.
+  // 3. If there is at least one subchannel in CONNECTING state, report
+  // CONNECTING.
+  // 4. If there is at least one subchannel in IDLE state, report IDLE.
+  // 5. Otherwise, report TRANSIENT_FAILURE.
+  if (num_ready_ > 0) {
+    /* READY */
+    p->channel_control_helper()->UpdateState(
+        GRPC_CHANNEL_READY, absl::Status(),
+        absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+                                  this));
+    return false;
+  }
+  if (num_connecting_ > 0 && num_transient_failure_ < 2) {
+    p->channel_control_helper()->UpdateState(
+        GRPC_CHANNEL_CONNECTING, absl::Status(),
+        absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
+    return false;
+  }
+  if (num_idle_ > 0 && num_transient_failure_ < 2) {
+    p->channel_control_helper()->UpdateState(
+        GRPC_CHANNEL_IDLE, absl::Status(),
+        absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+                                  this));
+    return false;
+  }
+  grpc_error* error =
+      grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                             "connections to backend failing or idle"),
+                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+  p->channel_control_helper()->UpdateState(
+      GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
+      absl::make_unique<TransientFailurePicker>(error));
+  return true;
+}
+
+//
+// RingHash::RingHashSubchannelData
+//
+
+void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
+    grpc_connectivity_state connectivity_state) {
+  RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(
+        GPR_INFO,
+        "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
+        "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
+        p, subchannel(), subchannel_list(), Index(),
+        subchannel_list()->num_subchannels(),
+        ConnectivityStateName(last_connectivity_state_),
+        ConnectivityStateName(connectivity_state));
+  }
+  // Decide what state to report for aggregation purposes.
+  // If we haven't seen a failure since the last time we were in state
+  // READY, then we report the state change as-is.  However, once we do see
+  // a failure, we report TRANSIENT_FAILURE and do not report any subsequent
+  // state changes until we go back into state READY.
+  if (!seen_failure_since_ready_) {
+    if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+      seen_failure_since_ready_ = true;
+    }
+    subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
+                                                 connectivity_state);
+  } else {
+    if (connectivity_state == GRPC_CHANNEL_READY) {
+      seen_failure_since_ready_ = false;
+      subchannel_list()->UpdateStateCountersLocked(
+          GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
+    }
+  }
+  // Record last seen connectivity state.
+  last_connectivity_state_ = connectivity_state;
+}
+
+void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
+    grpc_connectivity_state connectivity_state) {
+  RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
+  GPR_ASSERT(subchannel() != nullptr);
+  // If the new state is TRANSIENT_FAILURE, re-resolve.
+  // Only do this if we've started watching, not at startup time.
+  // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+  // when the subchannel list was created, we'd wind up in a constant
+  // loop of re-resolution.
+  // Also attempt to reconnect.
+  if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+      gpr_log(GPR_INFO,
+              "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+              "Requesting re-resolution",
+              p, subchannel());
+    }
+    p->channel_control_helper()->RequestReresolution();
+  }
+  // Update state counters.
+  UpdateConnectivityStateLocked(connectivity_state);
+  // Update the RH policy's connectivity state, creating new picker and new
+  // ring.
+  bool transient_failure =
+      subchannel_list()->UpdateRingHashConnectivityStateLocked();
+  // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
+  // not be getting any pick requests from the priority policy.
+  // However, because the ring_hash policy does not attempt to
+  // reconnect to subchannels unless it is getting pick requests,
+  // it will need special handling to ensure that it will eventually
+  // recover from TRANSIENT_FAILURE state once the problem is resolved.
+  // Specifically, it will make sure that it is attempting to connect to
+  // at least one subchannel at any given time.  After a given subchannel
+  // fails a connection attempt, it will move on to the next subchannel
+  // in the ring.  It will keep doing this until one of the subchannels
+  // successfully connects, at which point it will report READY and stop
+  // proactively trying to connect.  The policy will remain in
+  // TRANSIENT_FAILURE until at least one subchannel becomes connected,
+  // even if subchannels are in state CONNECTING during that time.
+  if (transient_failure &&
+      connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+    size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
+    RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index);
+    next_sd->subchannel()->AttemptToConnect();
+  }
+}
+
+//
+// RingHash
+//
+
+RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(GPR_INFO, "[RH %p] Created", this);
+  }
+}
+
+RingHash::~RingHash() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
+  }
+  GPR_ASSERT(subchannel_list_ == nullptr);
+}
+
+void RingHash::ShutdownLocked() {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
+  }
+  shutdown_ = true;
+  subchannel_list_.reset();
+}
+
+void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
+
+void RingHash::UpdateLocked(UpdateArgs args) {
+  if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+    gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
+            this, args.addresses.size());
+  }
+  config_ = std::move(args.config);
+  // Filter out any address with weight 0.
+  ServerAddressList addresses;
+  addresses.reserve(args.addresses.size());
+  for (ServerAddress& address : args.addresses) {
+    const ServerAddressWeightAttribute* weight_attribute =
+        static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
+            ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+    if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
+      addresses.push_back(std::move(address));
+    }
+  }
+  subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
+      this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
+  if (subchannel_list_->num_subchannels() == 0) {
+    // If the new list is empty, immediately transition to TRANSIENT_FAILURE.
+    grpc_error* error =
+        grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+                           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+    channel_control_helper()->UpdateState(
+        GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
+        absl::make_unique<TransientFailurePicker>(error));
+  } else {
+    // Start watching the new list.
+    subchannel_list_->StartWatchingLocked();
+  }
+}
+
+//
+// factory
+//
+
+class RingHashFactory : public LoadBalancingPolicyFactory {
+ public:
+  OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+      LoadBalancingPolicy::Args args) const override {
+    return MakeOrphanable<RingHash>(std::move(args));
+  }
+
+  const char* name() const override { return kRingHash; }
+
+  RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+      const Json& json, grpc_error** error) const override {
+    size_t min_ring_size;
+    size_t max_ring_size;
+    std::vector<grpc_error_handle> error_list;
+    ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list);
+    if (error_list.empty()) {
+      return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
+    } else {
+      *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+          "ring_hash_experimental LB policy config", &error_list);
+      return nullptr;
+    }
+  }
+};
+
+}  // namespace
+
+void GrpcLbPolicyRingHashInit() {
+  grpc_core::LoadBalancingPolicyRegistry::Builder::
+      RegisterLoadBalancingPolicyFactory(
+          absl::make_unique<grpc_core::RingHashFactory>());
+}
+
+void GrpcLbPolicyRingHashShutdown() {}
 
 }  // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
index dc176c2..f0f86b2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
+++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
@@ -19,9 +19,19 @@
 
 #include <grpc/support/port_platform.h>
 
+#include <stdlib.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/json/json.h"
+
 namespace grpc_core {
 extern const char* kRequestRingHashAttribute;
 
+// Helper Parsing method to parse ring hash policy configs; for example, ring
+// hash size validity.
+void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
+                           size_t* max_ring_size,
+                           std::vector<grpc_error_handle>* error_list);
 }  // namespace grpc_core
 
 #endif  // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_RING_HASH_RING_HASH_H
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
index 49bff27..3531ca5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -452,22 +452,9 @@
     // Construct config for child policy.
     Json::Object xds_lb_policy;
     if (cluster_data.lb_policy == "RING_HASH") {
-      std::string hash_function;
-      switch (cluster_data.hash_function) {
-        case XdsApi::CdsUpdate::HashFunction::XX_HASH:
-          hash_function = "XX_HASH";
-          break;
-        case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
-          hash_function = "MURMUR_HASH_2";
-          break;
-        default:
-          GPR_ASSERT(0);
-          break;
-      }
       xds_lb_policy["RING_HASH"] = Json::Object{
           {"min_ring_size", cluster_data.min_ring_size},
           {"max_ring_size", cluster_data.max_ring_size},
-          {"hash_function", hash_function},
       };
     } else {
       xds_lb_policy["ROUND_ROBIN"] = Json::Object();
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
index bcb8194..62851e8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
@@ -28,6 +28,7 @@
 #include "src/core/ext/filters/client_channel/lb_policy.h"
 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
+#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
@@ -834,6 +835,13 @@
       std::vector<std::string> hierarchical_path = {
           priority_child_name, locality_name->AsHumanReadableString()};
       for (const auto& endpoint : locality.endpoints) {
+        const ServerAddressWeightAttribute* weight_attribute = static_cast<
+            const ServerAddressWeightAttribute*>(endpoint.GetAttribute(
+            ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+        uint32_t weight = locality.lb_weight;
+        if (weight_attribute != nullptr) {
+          weight = locality.lb_weight * weight_attribute->weight();
+        }
         addresses.emplace_back(
             endpoint
                 .WithAttribute(kHierarchicalPathAttributeKey,
@@ -841,10 +849,10 @@
                 .WithAttribute(kXdsLocalityNameAttributeKey,
                                absl::make_unique<XdsLocalityAttribute>(
                                    locality_name->Ref()))
-                .WithAttribute(ServerAddressWeightAttribute::
-                                   kServerAddressWeightAttributeKey,
-                               absl::make_unique<ServerAddressWeightAttribute>(
-                                   locality.lb_weight)));
+                .WithAttribute(
+                    ServerAddressWeightAttribute::
+                        kServerAddressWeightAttributeKey,
+                    absl::make_unique<ServerAddressWeightAttribute>(weight)));
       }
     }
   }
@@ -1201,65 +1209,11 @@
           }
           policy_it = policy.find("RING_HASH");
           if (policy_it != policy.end()) {
-            if (policy_it->second.type() != Json::Type::OBJECT) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:RING_HASH error:type should be object"));
-              continue;
-            }
-            // TODO(donnadionne): Move this to a method in
-            // ring_hash_experimental and call it here.
-            const Json::Object& ring_hash = policy_it->second.object_value();
             xds_lb_policy = array[i];
-            size_t min_ring_size = 1024;
-            size_t max_ring_size = 8388608;
-            auto ring_hash_it = ring_hash.find("min_ring_size");
-            if (ring_hash_it == ring_hash.end()) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:min_ring_size missing"));
-            } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:min_ring_size error: should be of "
-                  "number"));
-            } else {
-              min_ring_size = gpr_parse_nonnegative_int(
-                  ring_hash_it->second.string_value().c_str());
-            }
-            ring_hash_it = ring_hash.find("max_ring_size");
-            if (ring_hash_it == ring_hash.end()) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:max_ring_size missing"));
-            } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:max_ring_size error: should be of "
-                  "number"));
-            } else {
-              max_ring_size = gpr_parse_nonnegative_int(
-                  ring_hash_it->second.string_value().c_str());
-            }
-            if (min_ring_size <= 0 || min_ring_size > 8388608 ||
-                max_ring_size <= 0 || max_ring_size > 8388608 ||
-                min_ring_size > max_ring_size) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:max_ring_size and or min_ring_size error: "
-                  "values need to be in the range of 1 to 8388608 "
-                  "and max_ring_size cannot be smaller than "
-                  "min_ring_size"));
-            }
-            ring_hash_it = ring_hash.find("hash_function");
-            if (ring_hash_it == ring_hash.end()) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:hash_function missing"));
-            } else if (ring_hash_it->second.type() != Json::Type::STRING) {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:hash_function error: should be a "
-                  "string"));
-            } else if (ring_hash_it->second.string_value() != "XX_HASH" &&
-                       ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
-              error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-                  "field:hash_function error: unsupported "
-                  "hash_function"));
-            }
-            break;
+            size_t min_ring_size;
+            size_t max_ring_size;
+            ParseRingHashLbConfig(policy_it->second, &min_ring_size,
+                                  &max_ring_size, &error_list);
           }
         }
       }
diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
index 3a10a42..871d756 100644
--- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
@@ -568,6 +568,9 @@
   std::string value_buffer;
   absl::optional<absl::string_view> header_value =
       GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
+  if (!header_value.has_value()) {
+    return absl::nullopt;
+  }
   if (policy.regex != nullptr) {
     // If GetHeaderValue() did not already store the value in
     // value_buffer, copy it there now, so we can modify it.
@@ -649,10 +652,13 @@
         case XdsApi::Route::HashPolicy::HEADER:
           new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
           break;
-        case XdsApi::Route::HashPolicy::CHANNEL_ID:
-          new_hash =
-              static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver));
+        case XdsApi::Route::HashPolicy::CHANNEL_ID: {
+          std::string address_str = absl::StrFormat(
+              "%" PRIu64,
+              static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver)));
+          new_hash = XXH64(address_str.c_str(), address_str.length(), 0);
           break;
+        }
         default:
           GPR_ASSERT(0);
       }
@@ -671,7 +677,12 @@
     }
     if (!hash.has_value()) {
       // If there is no hash, we just choose a random value as a default.
-      hash = rand();
+      // We cannot directly use the result of rand() as the hash value,
+      // since it is a 32-bit number and not a 64-bit number and will
+      // therefore not be evenly distributed.
+      uint32_t upper = rand();
+      uint32_t lower = rand();
+      hash = (static_cast<uint64_t>(upper) << 32) | lower;
     }
     CallConfig call_config;
     if (method_config != nullptr) {
@@ -680,8 +691,12 @@
       call_config.service_config = std::move(method_config);
     }
     call_config.call_attributes[kXdsClusterAttribute] = it->first;
-    call_config.call_attributes[kRequestRingHashAttribute] =
-        absl::StrFormat("%" PRIu64, hash.value());
+    std::string hash_string = absl::StrCat(hash.value());
+    char* hash_value =
+        static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
+    memcpy(hash_value, hash_string.c_str(), hash_string.size());
+    hash_value[hash_string.size()] = '\0';
+    call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
     call_config.on_call_committed = [resolver, cluster_state]() {
       cluster_state->Unref();
       ExecCtx::Run(
diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc
index e51bc07..610d4e2 100644
--- a/src/core/ext/xds/xds_api.cc
+++ b/src/core/ext/xds/xds_api.cc
@@ -1605,40 +1605,35 @@
             regex_rewrite =
                 envoy_config_route_v3_RouteAction_HashPolicy_Header_regex_rewrite(
                     header);
-        if (regex_rewrite == nullptr) {
-          gpr_log(
-              GPR_DEBUG,
-              "RouteAction HashPolicy contains policy specifier Header with "
-              "RegexMatchAndSubstitution but Regex is missing");
-          continue;
+        if (regex_rewrite != nullptr) {
+          const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
+              envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
+                  regex_rewrite);
+          if (regex_matcher == nullptr) {
+            gpr_log(
+                GPR_DEBUG,
+                "RouteAction HashPolicy contains policy specifier Header with "
+                "RegexMatchAndSubstitution but RegexMatcher pattern is "
+                "missing");
+            continue;
+          }
+          RE2::Options options;
+          policy.regex = absl::make_unique<RE2>(
+              UpbStringToStdString(
+                  envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
+              options);
+          if (!policy.regex->ok()) {
+            gpr_log(
+                GPR_DEBUG,
+                "RouteAction HashPolicy contains policy specifier Header with "
+                "RegexMatchAndSubstitution but RegexMatcher pattern does not "
+                "compile");
+            continue;
+          }
+          policy.regex_substitution = UpbStringToStdString(
+              envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
+                  regex_rewrite));
         }
-        const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
-            envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
-                regex_rewrite);
-        if (regex_matcher == nullptr) {
-          gpr_log(
-              GPR_DEBUG,
-              "RouteAction HashPolicy contains policy specifier Header with "
-              "RegexMatchAndSubstitution but RegexMatcher pattern is "
-              "missing");
-          continue;
-        }
-        RE2::Options options;
-        policy.regex = absl::make_unique<RE2>(
-            UpbStringToStdString(
-                envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
-            options);
-        if (!policy.regex->ok()) {
-          gpr_log(
-              GPR_DEBUG,
-              "RouteAction HashPolicy contains policy specifier Header with "
-              "RegexMatchAndSubstitution but RegexMatcher pattern does not "
-              "compile");
-          continue;
-        }
-        policy.regex_substitution = UpbStringToStdString(
-            envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
-                regex_rewrite));
       } else if ((filter_state =
                       envoy_config_route_v3_RouteAction_HashPolicy_filter_state(
                           hash_policy)) != nullptr) {
@@ -2815,75 +2810,61 @@
       // Record ring hash lb config
       auto* ring_hash_config =
           envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
-      if (ring_hash_config == nullptr) {
-        errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
-            absl::StrCat(cluster_name,
-                         ": ring hash lb config required but not present.")
-                .c_str()));
-        resource_names_failed->insert(cluster_name);
-        continue;
-      }
-      const google_protobuf_UInt64Value* max_ring_size =
-          envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
-              ring_hash_config);
-      if (max_ring_size != nullptr) {
-        cds_update.max_ring_size =
-            google_protobuf_UInt64Value_value(max_ring_size);
-        if (cds_update.max_ring_size > 8388608 ||
-            cds_update.max_ring_size == 0) {
+      if (ring_hash_config != nullptr) {
+        const google_protobuf_UInt64Value* max_ring_size =
+            envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
+                ring_hash_config);
+        if (max_ring_size != nullptr) {
+          cds_update.max_ring_size =
+              google_protobuf_UInt64Value_value(max_ring_size);
+          if (cds_update.max_ring_size > 8388608 ||
+              cds_update.max_ring_size == 0) {
+            errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+                absl::StrCat(
+                    cluster_name,
+                    ": max_ring_size is not in the range of 1 to 8388608.")
+                    .c_str()));
+            resource_names_failed->insert(cluster_name);
+            continue;
+          }
+        }
+        const google_protobuf_UInt64Value* min_ring_size =
+            envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
+                ring_hash_config);
+        if (min_ring_size != nullptr) {
+          cds_update.min_ring_size =
+              google_protobuf_UInt64Value_value(min_ring_size);
+          if (cds_update.min_ring_size > 8388608 ||
+              cds_update.min_ring_size == 0) {
+            errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+                absl::StrCat(
+                    cluster_name,
+                    ": min_ring_size is not in the range of 1 to 8388608.")
+                    .c_str()));
+            resource_names_failed->insert(cluster_name);
+            continue;
+          }
+          if (cds_update.min_ring_size > cds_update.max_ring_size) {
+            errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+                absl::StrCat(
+                    cluster_name,
+                    ": min_ring_size cannot be greater than max_ring_size.")
+                    .c_str()));
+            resource_names_failed->insert(cluster_name);
+            continue;
+          }
+        }
+        if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
+                ring_hash_config) !=
+            envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
           errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
-              absl::StrCat(
-                  cluster_name,
-                  ": max_ring_size is not in the range of 1 to 8388608.")
+              absl::StrCat(cluster_name,
+                           ": ring hash lb config has invalid hash function.")
                   .c_str()));
           resource_names_failed->insert(cluster_name);
           continue;
         }
       }
-      const google_protobuf_UInt64Value* min_ring_size =
-          envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
-              ring_hash_config);
-      if (min_ring_size != nullptr) {
-        cds_update.min_ring_size =
-            google_protobuf_UInt64Value_value(min_ring_size);
-        if (cds_update.min_ring_size > 8388608 ||
-            cds_update.min_ring_size == 0) {
-          errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
-              absl::StrCat(
-                  cluster_name,
-                  ": min_ring_size is not in the range of 1 to 8388608.")
-                  .c_str()));
-          resource_names_failed->insert(cluster_name);
-          continue;
-        }
-        if (cds_update.min_ring_size > cds_update.max_ring_size) {
-          errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
-              absl::StrCat(
-                  cluster_name,
-                  ": min_ring_size cannot be greater than max_ring_size.")
-                  .c_str()));
-          resource_names_failed->insert(cluster_name);
-          continue;
-        }
-      }
-      if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
-              ring_hash_config) ==
-          envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
-        cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH;
-      } else if (
-          envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
-              ring_hash_config) ==
-          envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) {
-        cds_update.hash_function =
-            XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2;
-      } else {
-        errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
-            absl::StrCat(cluster_name,
-                         ": ring hash lb config has invalid hash function.")
-                .c_str()));
-        resource_names_failed->insert(cluster_name);
-        continue;
-      }
     } else {
       errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
           absl::StrCat(cluster_name, ": LB policy is not supported.").c_str()));
@@ -3014,13 +2995,28 @@
   if (GPR_UNLIKELY(port >> 16) != 0) {
     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
   }
+  // Find load_balancing_weight for the endpoint.
+  const google_protobuf_UInt32Value* load_balancing_weight =
+      envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
+  const int32_t weight =
+      load_balancing_weight != nullptr
+          ? google_protobuf_UInt32Value_value(load_balancing_weight)
+          : 500;
+  if (weight == 0) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Invalid endpoint weight of 0.");
+  }
   // Populate grpc_resolved_address.
   grpc_resolved_address addr;
   grpc_error_handle error =
       grpc_string_to_sockaddr(&addr, address_str.c_str(), port);
   if (error != GRPC_ERROR_NONE) return error;
   // Append the address to the list.
-  list->emplace_back(addr, nullptr);
+  std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
+      attributes;
+  attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
+      absl::make_unique<ServerAddressWeightAttribute>(weight);
+  list->emplace_back(addr, nullptr, std::move(attributes));
   return GRPC_ERROR_NONE;
 }
 
diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h
index e7bf1cd..c148349 100644
--- a/src/core/ext/xds/xds_api.h
+++ b/src/core/ext/xds/xds_api.h
@@ -411,8 +411,6 @@
     // Used for RING_HASH LB policy only.
     uint64_t min_ring_size = 1024;
     uint64_t max_ring_size = 8388608;
-    enum HashFunction { XX_HASH, MURMUR_HASH_2 };
-    HashFunction hash_function;
     // Maximum number of outstanding requests can be made to the upstream
     // cluster.
     uint32_t max_concurrent_requests = 1024;
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index d3def27..271b010 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -63,6 +63,8 @@
 namespace grpc_core {
 void FaultInjectionFilterInit(void);
 void FaultInjectionFilterShutdown(void);
+void GrpcLbPolicyRingHashInit(void);
+void GrpcLbPolicyRingHashShutdown(void);
 }  // namespace grpc_core
 
 #ifndef GRPC_NO_XDS
@@ -115,6 +117,8 @@
                        grpc_lb_policy_pick_first_shutdown);
   grpc_register_plugin(grpc_lb_policy_round_robin_init,
                        grpc_lb_policy_round_robin_shutdown);
+  grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
+                       grpc_core::GrpcLbPolicyRingHashShutdown);
   grpc_register_plugin(grpc_resolver_dns_ares_init,
                        grpc_resolver_dns_ares_shutdown);
   grpc_register_plugin(grpc_resolver_dns_native_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
index 5e74529..b58ab32 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
@@ -57,6 +57,8 @@
 namespace grpc_core {
 void FaultInjectionFilterInit(void);
 void FaultInjectionFilterShutdown(void);
+void GrpcLbPolicyRingHashInit(void);
+void GrpcLbPolicyRingHashShutdown(void);
 }  // namespace grpc_core
 void grpc_service_config_channel_arg_filter_init(void);
 void grpc_service_config_channel_arg_filter_shutdown(void);
@@ -94,6 +96,8 @@
                        grpc_lb_policy_pick_first_shutdown);
   grpc_register_plugin(grpc_lb_policy_round_robin_init,
                        grpc_lb_policy_round_robin_shutdown);
+  grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
+                       grpc_core::GrpcLbPolicyRingHashShutdown);
   grpc_register_plugin(grpc_client_idle_filter_init,
                        grpc_client_idle_filter_shutdown);
   grpc_register_plugin(grpc_max_age_filter_init,
diff --git a/src/proto/grpc/testing/xds/v3/cluster.proto b/src/proto/grpc/testing/xds/v3/cluster.proto
index c04fe20..2c26c9b 100644
--- a/src/proto/grpc/testing/xds/v3/cluster.proto
+++ b/src/proto/grpc/testing/xds/v3/cluster.proto
@@ -153,12 +153,50 @@
   // Configuration to use for EDS updates for the Cluster.
   EdsClusterConfig eds_cluster_config = 3;
 
+  // Specific configuration for the :ref:`RingHash<arch_overview_load_balancing_types_ring_hash>`
+  // load balancing policy.
+  message RingHashLbConfig {
+    // The hash function used to hash hosts onto the ketama ring.
+    enum HashFunction {
+      // Use `xxHash <https://github.com/Cyan4973/xxHash>`_, this is the default hash function.
+      XX_HASH = 0;
+      MURMUR_HASH_2 = 1;
+    }
+
+    reserved 2;
+
+    // Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each
+    // provided host) the better the request distribution will reflect the desired weights. Defaults
+    // to 1024 entries, and limited to 8M entries. See also
+    // :ref:`maximum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.maximum_ring_size>`.
+    google.protobuf.UInt64Value minimum_ring_size = 1;
+
+    // The hash function used to hash hosts onto the ketama ring. The value defaults to
+    // :ref:`XX_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.RingHashLbConfig.HashFunction.XX_HASH>`.
+    HashFunction hash_function = 3;
+
+    // Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered
+    // to further constrain resource use. See also
+    // :ref:`minimum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.minimum_ring_size>`.
+    google.protobuf.UInt64Value maximum_ring_size = 4;
+  }
+
   // The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
   // when picking a host in the cluster.
   LbPolicy lb_policy = 6;
 
   CircuitBreakers circuit_breakers = 10;
 
+  // Optional configuration for the load balancing algorithm selected by
+  // LbPolicy. Currently only
+  // :ref:`RING_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.LbPolicy.RING_HASH>`,
+  // Specifying ring_hash_lb_config without setting the corresponding
+  // LbPolicy will generate an error at runtime.
+  oneof lb_config {
+    // Optional configuration for the Ring Hash load balancing policy.
+    RingHashLbConfig ring_hash_lb_config = 23;
+  }
+
   // Optional custom transport socket implementation to use for upstream connections.
   // To setup TLS, set a transport socket with name `tls` and
   // :ref:`UpstreamTlsContexts <envoy_api_msg_extensions.transport_sockets.tls.v3.UpstreamTlsContext>` in the `typed_config`.
diff --git a/src/proto/grpc/testing/xds/v3/endpoint.proto b/src/proto/grpc/testing/xds/v3/endpoint.proto
index 7cc1d40..7cbea7f 100644
--- a/src/proto/grpc/testing/xds/v3/endpoint.proto
+++ b/src/proto/grpc/testing/xds/v3/endpoint.proto
@@ -76,6 +76,17 @@
 
   // Optional health status when known and supplied by EDS server.
   HealthStatus health_status = 2;
+
+  // The optional load balancing weight of the upstream host; at least 1.
+  // Envoy uses the load balancing weight in some of the built in load
+  // balancers. The load balancing weight for an endpoint is divided by the sum
+  // of the weights of all endpoints in the endpoint's locality to produce a
+  // percentage of traffic for the endpoint. This percentage is then further
+  // weighted by the endpoint's locality's load balancing weight from
+  // LocalityLbEndpoints. If unspecified, each host is presumed to have equal
+  // weight in a locality. The sum of the weights of all endpoints in the
+  // endpoint's locality must not exceed uint32_t maximal value (4294967295).
+  google.protobuf.UInt32Value load_balancing_weight = 4;
 }
 
 // A group of endpoints belonging to a Locality.
diff --git a/src/proto/grpc/testing/xds/v3/regex.proto b/src/proto/grpc/testing/xds/v3/regex.proto
index af90457..9039ed4 100644
--- a/src/proto/grpc/testing/xds/v3/regex.proto
+++ b/src/proto/grpc/testing/xds/v3/regex.proto
@@ -36,3 +36,8 @@
   // The regex match string. The string must be supported by the configured engine.
   string regex = 2;
 }
+
+message RegexMatchAndSubstitute {
+  RegexMatcher pattern = 1;
+  string substitution = 2;
+}
diff --git a/src/proto/grpc/testing/xds/v3/route.proto b/src/proto/grpc/testing/xds/v3/route.proto
index baeaaf6..89260f6 100644
--- a/src/proto/grpc/testing/xds/v3/route.proto
+++ b/src/proto/grpc/testing/xds/v3/route.proto
@@ -246,6 +246,78 @@
     // for additional documentation.
     WeightedCluster weighted_clusters = 3;
   }
+
+  message HashPolicy {
+    message Header {
+      // The name of the request header that will be used to obtain the hash
+      // key. If the request header is not present, no hash will be produced.
+      string header_name = 1;
+
+      // If specified, the request header value will be rewritten and used
+      // to produce the hash key.
+      type.matcher.v3.RegexMatchAndSubstitute regex_rewrite = 2;
+    }
+    
+    message Cookie {
+      string name = 1;
+    }
+
+    message ConnectionProperties {
+      bool source_ip = 1;
+    }
+
+    message QueryParameter {
+      string name = 1;
+    }
+
+    message FilterState {
+      // The name of the Object in the per-request filterState, which is an
+      // Envoy::Http::Hashable object. If there is no data associated with the key,
+      // or the stored object is not Envoy::Http::Hashable, no hash will be produced.
+      string key = 1;
+    }
+
+    oneof policy_specifier {
+      // Header hash policy.
+      Header header = 1;
+
+      // Cookie hash policy.
+      Cookie cookie = 2;
+
+      // Connection properties hash policy.
+      ConnectionProperties connection_properties = 3;
+
+      // Query parameter hash policy.
+      QueryParameter query_parameter = 5;
+
+      // Filter state hash policy.
+      FilterState filter_state = 6;
+    }
+
+    // The flag that short-circuits the hash computing. This field provides a
+    // 'fallback' style of configuration: "if a terminal policy doesn't work,
+    // fallback to rest of the policy list", it saves time when the terminal
+    // policy works.
+    //
+    // If true, and there is already a hash computed, ignore rest of the
+    // list of hash polices.
+    // For example, if the following hash methods are configured:
+    //
+    //  ========= ========
+    //  specifier terminal
+    //  ========= ========
+    //  Header A  true
+    //  Header B  false
+    //  Header C  false
+    //  ========= ========
+    //
+    // The generateHash process ends if policy "header A" generates a hash, as
+    // it's a terminal policy.
+    bool terminal = 4;
+  }
+
+  repeated HashPolicy hash_policy = 15;
+
   // Specifies the maximum stream duration for this route.
   MaxStreamDuration max_stream_duration = 36;
 }
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 2a80c54..0257778 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -32,6 +32,7 @@
 #include "absl/functional/bind_front.h"
 #include "absl/memory/memory.h"
 #include "absl/strings/str_cat.h"
+#include "absl/strings/str_format.h"
 #include "absl/strings/str_join.h"
 #include "absl/types/optional.h"
 
@@ -2095,6 +2096,14 @@
     return addresses;
   }
 
+  std::string CreateMetadataValueThatHashesToBackendPort(int port) {
+    return absl::StrCat(ipv6_only_ ? "::1" : "127.0.0.1", ":", port, "_0");
+  }
+
+  std::string CreateMetadataValueThatHashesToBackend(int index) {
+    return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
+  }
+
   void SetNextResolution(
       const std::vector<int>& ports,
       grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
@@ -2236,6 +2245,13 @@
     return listener;
   }
 
+  AdsServiceImpl::EdsResourceArgs::Endpoint CreateEndpoint(
+      size_t backend_idx, HealthStatus health_status = HealthStatus::UNKNOWN,
+      int lb_weight = 1) {
+    return AdsServiceImpl::EdsResourceArgs::Endpoint(
+        backends_[backend_idx]->port(), health_status, lb_weight);
+  }
+
   std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint>
   CreateEndpointsForBackends(size_t start_index = 0, size_t stop_index = 0,
                              HealthStatus health_status = HealthStatus::UNKNOWN,
@@ -2243,7 +2259,7 @@
     if (stop_index == 0) stop_index = backends_.size();
     std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
     for (size_t i = start_index; i < stop_index; ++i) {
-      endpoints.emplace_back(backends_[i]->port(), health_status, lb_weight);
+      endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
     }
     return endpoints;
   }
@@ -2272,6 +2288,11 @@
             locality.endpoints[i].health_status != HealthStatus::UNKNOWN) {
           lb_endpoints->set_health_status(locality.endpoints[i].health_status);
         }
+        if (locality.endpoints.size() > i &&
+            locality.endpoints[i].lb_weight >= 1) {
+          lb_endpoints->mutable_load_balancing_weight()->set_value(
+              locality.endpoints[i].lb_weight);
+        }
         auto* endpoint = lb_endpoints->mutable_endpoint();
         auto* address = endpoint->mutable_address();
         auto* socket_address = address->mutable_socket_address();
@@ -2766,8 +2787,8 @@
   }
 }
 
-// Tests that subchannel sharing works when the same backend is listed multiple
-// times.
+// Tests that subchannel sharing works when the same backend is listed
+// multiple times.
 TEST_P(BasicTest, SameBackendListedMultipleTimes) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -2852,8 +2873,8 @@
   EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
 }
 
-// Tests that RPCs fail when the backends are down, and will succeed again after
-// the backends are restarted.
+// Tests that RPCs fail when the backends are down, and will succeed again
+// after the backends are restarted.
 TEST_P(BasicTest, BackendsRestart) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -2867,13 +2888,13 @@
   ShutdownAllBackends();
   // Sending multiple failed requests instead of just one to ensure that the
   // client notices that all backends are down before we restart them. If we
-  // didn't do this, then a single RPC could fail here due to the race condition
-  // between the LB pick and the GOAWAY from the chosen backend being shut down,
-  // which would not actually prove that the client noticed that all of the
-  // backends are down. Then, when we send another request below (which we
-  // expect to succeed), if the callbacks happen in the wrong order, the same
-  // race condition could happen again due to the client not yet having noticed
-  // that the backends were all down.
+  // didn't do this, then a single RPC could fail here due to the race
+  // condition between the LB pick and the GOAWAY from the chosen backend
+  // being shut down, which would not actually prove that the client noticed
+  // that all of the backends are down. Then, when we send another request
+  // below (which we expect to succeed), if the callbacks happen in the wrong
+  // order, the same race condition could happen again due to the client not
+  // yet having noticed that the backends were all down.
   CheckRpcSendFailure(num_backends_);
   // Restart all backends.  RPCs should start succeeding again.
   StartAllBackends();
@@ -3501,7 +3522,8 @@
 }
 
 // Tests that LDS client should send a NACK if the rds message in the
-// http_connection_manager has a config_source field that does not specify ADS.
+// http_connection_manager has a config_source field that does not specify
+// ADS.
 TEST_P(LdsTest, RdsConfigSourceDoesNotSpecifyAds) {
   auto listener = default_listener_;
   HttpConnectionManager http_connection_manager;
@@ -3519,10 +3541,9 @@
   const auto response_state =
       balancers_[0]->ads_service()->lds_response_state();
   EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
-  EXPECT_THAT(
-      response_state.error_message,
-      ::testing::HasSubstr(
-          "HttpConnectionManager ConfigSource for RDS does not specify ADS."));
+  EXPECT_THAT(response_state.error_message,
+              ::testing::HasSubstr("HttpConnectionManager ConfigSource for "
+                                   "RDS does not specify ADS."));
 }
 
 // Tests that we ignore filters after the router filter.
@@ -3893,8 +3914,8 @@
   EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED);
 }
 
-// Tests that LDS client should choose the virtual host with matching domain if
-// multiple virtual hosts exist in the LDS response.
+// Tests that LDS client should choose the virtual host with matching domain
+// if multiple virtual hosts exist in the LDS response.
 TEST_P(LdsRdsTest, ChooseMatchedDomain) {
   RouteConfiguration route_config = default_route_config_;
   *(route_config.add_virtual_hosts()) = route_config.virtual_hosts(0);
@@ -4233,10 +4254,9 @@
   CheckRpcSendFailure();
   const auto response_state = RouteConfigurationResponseState(0);
   EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
-  EXPECT_THAT(
-      response_state.error_message,
-      ::testing::HasSubstr(
-          "RouteAction weighted_cluster cluster contains empty cluster name."));
+  EXPECT_THAT(response_state.error_message,
+              ::testing::HasSubstr("RouteAction weighted_cluster cluster "
+                                   "contains empty cluster name."));
 }
 
 TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasNoWeight) {
@@ -4992,8 +5012,8 @@
   // Change Route Configurations: same clusters different weights.
   weighted_cluster1->mutable_weight()->set_value(kWeight50);
   weighted_cluster2->mutable_weight()->set_value(kWeight50);
-  // Change default route to a new cluster to help to identify when new polices
-  // are seen by the client.
+  // Change default route to a new cluster to help to identify when new
+  // polices are seen by the client.
   default_route->mutable_route()->set_cluster(kNewCluster3Name);
   SetRouteConfiguration(0, new_route_config);
   ResetBackendCounters();
@@ -5258,8 +5278,8 @@
       new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
   default_route->mutable_route()->set_cluster(kNewClusterName);
   SetRouteConfiguration(0, new_route_config);
-  // Wait for RPCs to go to the new backend: 1, this ensures that the client has
-  // processed the update.
+  // Wait for RPCs to go to the new backend: 1, this ensures that the client
+  // has processed the update.
   WaitForBackend(
       1, WaitForBackendOptions().set_reset_counters(false).set_allow_failures(
              true));
@@ -6648,8 +6668,8 @@
               ::testing::HasSubstr("DiscoveryType is not valid."));
 }
 
-// Tests that CDS client should send a NACK if the cluster type in CDS response
-// is unsupported.
+// Tests that CDS client should send a NACK if the cluster type in CDS
+// response is unsupported.
 TEST_P(CdsTest, UnsupportedClusterType) {
   auto cluster = default_cluster_;
   cluster.set_type(Cluster::STATIC);
@@ -6695,8 +6715,8 @@
                       kClusterName2, ": DiscoveryType is not valid."))));
 }
 
-// Tests that CDS client should send a NACK if the eds_config in CDS response is
-// other than ADS.
+// Tests that CDS client should send a NACK if the eds_config in CDS response
+// is other than ADS.
 TEST_P(CdsTest, WrongEdsConfig) {
   auto cluster = default_cluster_;
   cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
@@ -6711,8 +6731,8 @@
               ::testing::HasSubstr("EDS ConfigSource is not ADS."));
 }
 
-// Tests that CDS client should send a NACK if the lb_policy in CDS response is
-// other than ROUND_ROBIN.
+// Tests that CDS client should send a NACK if the lb_policy in CDS response
+// is other than ROUND_ROBIN.
 TEST_P(CdsTest, WrongLbPolicy) {
   auto cluster = default_cluster_;
   cluster.set_lb_policy(Cluster::LEAST_REQUEST);
@@ -6727,8 +6747,8 @@
               ::testing::HasSubstr("LB policy is not supported."));
 }
 
-// Tests that CDS client should send a NACK if the lrs_server in CDS response is
-// other than SELF.
+// Tests that CDS client should send a NACK if the lrs_server in CDS response
+// is other than SELF.
 TEST_P(CdsTest, WrongLrsServer) {
   auto cluster = default_cluster_;
   cluster.mutable_lrs_server()->mutable_ads();
@@ -6743,6 +6763,801 @@
               ::testing::HasSubstr("LRS ConfigSource is not self."));
 }
 
+// Tests that ring hash policy that hashes using channel id ensures all RPCs
+// to go 1 particular backend.
+TEST_P(CdsTest, RingHashChannelIdHashing) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendOk(100);
+  bool found = false;
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    if (backends_[i]->backend_service()->request_count() > 0) {
+      EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+          << "backend " << i;
+      EXPECT_FALSE(found) << "backend " << i;
+      found = true;
+    }
+  }
+  EXPECT_TRUE(found);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a header value can spread
+// RPCs across all the backends.
+TEST_P(CdsTest, RingHashHeaderHashing) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  // Note each type of RPC will contains a header value that will always be
+  // hashed to a specific backend as the header value matches the value used
+  // to create the entry in the ring.
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  std::vector<std::pair<std::string, std::string>> metadata1 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
+  std::vector<std::pair<std::string, std::string>> metadata2 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
+  std::vector<std::pair<std::string, std::string>> metadata3 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
+  const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
+  const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
+  WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+  WaitForBackend(1, WaitForBackendOptions(), rpc_options1);
+  WaitForBackend(2, WaitForBackendOptions(), rpc_options2);
+  WaitForBackend(3, WaitForBackendOptions(), rpc_options3);
+  CheckRpcSendOk(100, rpc_options);
+  CheckRpcSendOk(100, rpc_options1);
+  CheckRpcSendOk(100, rpc_options2);
+  CheckRpcSendOk(100, rpc_options3);
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
+  }
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a header value and regex
+// rewrite to aggregate RPCs to 1 backend.
+TEST_P(CdsTest, RingHashHeaderHashingWithRegexRewrite) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  hash_policy->mutable_header()
+      ->mutable_regex_rewrite()
+      ->mutable_pattern()
+      ->set_regex("[0-9]+");
+  hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
+      "foo");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  std::vector<std::pair<std::string, std::string>> metadata1 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
+  std::vector<std::pair<std::string, std::string>> metadata2 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
+  std::vector<std::pair<std::string, std::string>> metadata3 = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
+  const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
+  const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
+  CheckRpcSendOk(100, rpc_options);
+  CheckRpcSendOk(100, rpc_options1);
+  CheckRpcSendOk(100, rpc_options2);
+  CheckRpcSendOk(100, rpc_options3);
+  bool found = false;
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    if (backends_[i]->backend_service()->request_count() > 0) {
+      EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
+          << "backend " << i;
+      EXPECT_FALSE(found) << "backend " << i;
+      found = true;
+    }
+  }
+  EXPECT_TRUE(found);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value.
+TEST_P(CdsTest, RingHashNoHashPolicy) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const double kDistribution50Percent = 0.5;
+  const double kErrorTolerance = 0.05;
+  const uint32_t kRpcTimeoutMs = 10000;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+  auto cluster = default_cluster_;
+  // Increasing min ring size for random distribution.
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      100000);
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0", CreateEndpointsForBackends(0, 2)}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  // TODO(donnadionne): remove extended timeout after ring creation
+  // optimization.
+  WaitForAllBackends(0, 2, WaitForBackendOptions(),
+                     RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+  CheckRpcSendOk(kNumRpcs);
+  const int request_count_1 = backends_[0]->backend_service()->request_count();
+  const int request_count_2 = backends_[1]->backend_service()->request_count();
+  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that ring hash policy evaluation will continue past the terminal
+// policy if no results are produced yet.
+TEST_P(CdsTest, RingHashContinuesPastTerminalPolicyThatDoesNotProduceResult) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("header_not_present");
+  hash_policy->set_terminal(true);
+  auto* hash_policy2 = route->mutable_route()->add_hash_policy();
+  hash_policy2->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0", CreateEndpointsForBackends(0, 2)}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  CheckRpcSendOk(100, rpc_options);
+  EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
+  EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test random hash is used when header hashing specified a header field that
+// the RPC did not have.
+TEST_P(CdsTest, RingHashOnHeaderThatIsNotPresent) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const double kDistribution50Percent = 0.5;
+  const double kErrorTolerance = 0.05;
+  const uint32_t kRpcTimeoutMs = 10000;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+  auto cluster = default_cluster_;
+  // Increasing min ring size for random distribution.
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      100000);
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("header_not_present");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0", CreateEndpointsForBackends(0, 2)}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
+  };
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  // TODO(donnadionne): remove extended timeout after ring creation
+  // optimization.
+  WaitForAllBackends(0, 2, WaitForBackendOptions(),
+                     RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+  CheckRpcSendOk(kNumRpcs, rpc_options);
+  const int request_count_1 = backends_[0]->backend_service()->request_count();
+  const int request_count_2 = backends_[1]->backend_service()->request_count();
+  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test random hash is used when only unsupported hash policies are
+// configured.
+TEST_P(CdsTest, RingHashUnsupportedHashPolicyDefaultToRandomHashing) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const double kDistribution50Percent = 0.5;
+  const double kErrorTolerance = 0.05;
+  const uint32_t kRpcTimeoutMs = 10000;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+  auto cluster = default_cluster_;
+  // Increasing min ring size for random distribution.
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      100000);
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
+  auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
+      true);
+  auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_3->mutable_query_parameter()->set_name(
+      "query_parameter");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0", CreateEndpointsForBackends(0, 2)}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  // TODO(donnadionne): remove extended timeout after ring creation
+  // optimization.
+  WaitForAllBackends(0, 2, WaitForBackendOptions(),
+                     RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+  CheckRpcSendOk(kNumRpcs);
+  const int request_count_1 = backends_[0]->backend_service()->request_count();
+  const int request_count_2 = backends_[1]->backend_service()->request_count();
+  EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+              ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value can spread
+// RPCs across all the backends according to locality weight.
+TEST_P(CdsTest, RingHashRandomHashingDistributionAccordingToEndpointWeight) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const size_t kWeight1 = 1;
+  const size_t kWeight2 = 2;
+  const size_t kWeightTotal = kWeight1 + kWeight2;
+  const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
+  const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
+  const double kErrorTolerance = 0.05;
+  const uint32_t kRpcTimeoutMs = 10000;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
+  auto cluster = default_cluster_;
+  // Increasing min ring size for random distribution.
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      100000);
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0",
+        {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
+         CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  // TODO(donnadionne): remove extended timeout after ring creation
+  // optimization.
+  WaitForAllBackends(0, 2, WaitForBackendOptions(),
+                     RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+  CheckRpcSendOk(kNumRpcs);
+  const int weight_33_request_count =
+      backends_[0]->backend_service()->request_count();
+  const int weight_66_request_count =
+      backends_[1]->backend_service()->request_count();
+  EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
+              ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
+  EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
+              ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value can spread
+// RPCs across all the backends according to locality weight.
+TEST_P(CdsTest,
+       RingHashRandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const size_t kWeight1 = 1 * 1;
+  const size_t kWeight2 = 2 * 2;
+  const size_t kWeightTotal = kWeight1 + kWeight2;
+  const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
+  const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
+  const double kErrorTolerance = 0.05;
+  const uint32_t kRpcTimeoutMs = 10000;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
+  auto cluster = default_cluster_;
+  // Increasing min ring size for random distribution.
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      100000);
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  AdsServiceImpl::EdsResourceArgs args(
+      {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
+       {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  // TODO(donnadionne): remove extended timeout after ring creation
+  // optimization.
+  WaitForAllBackends(0, 2, WaitForBackendOptions(),
+                     RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+  CheckRpcSendOk(kNumRpcs);
+  const int weight_20_request_count =
+      backends_[0]->backend_service()->request_count();
+  const int weight_80_request_count =
+      backends_[1]->backend_service()->request_count();
+  EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
+              ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
+  EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
+              ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests round robin is not implacted by the endpoint weight, and that the
+// localities in a locality map are picked according to their weights.
+TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) {
+  SetNextResolution({});
+  SetNextResolutionForLbChannelAllBalancers();
+  const int kLocalityWeight0 = 2;
+  const int kLocalityWeight1 = 8;
+  const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
+  const double kLocalityWeightRate0 =
+      static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
+  const double kLocalityWeightRate1 =
+      static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
+  const double kErrorTolerance = 0.05;
+  const size_t kNumRpcs =
+      ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
+  // ADS response contains 2 localities, each of which contains 1 backend.
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0",
+       {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
+       kLocalityWeight0},
+      {"locality1",
+       {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
+       kLocalityWeight1},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  // Wait for both backends to be ready.
+  WaitForAllBackends(0, 2);
+  // Send kNumRpcs RPCs.
+  CheckRpcSendOk(kNumRpcs);
+  // The locality picking rates should be roughly equal to the expectation.
+  const double locality_picked_rate_0 =
+      static_cast<double>(backends_[0]->backend_service()->request_count()) /
+      kNumRpcs;
+  const double locality_picked_rate_1 =
+      static_cast<double>(backends_[1]->backend_service()->request_count()) /
+      kNumRpcs;
+  EXPECT_THAT(locality_picked_rate_0,
+              ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
+  EXPECT_THAT(locality_picked_rate_1,
+              ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
+}
+
+// Tests that ring hash policy that hashes using a fixed string ensures all
+// RPCs to go 1 particular backend; and that subsequent hashing policies are
+// ignored due to the setting of terminal.
+TEST_P(CdsTest, RingHashFixedHashingTerminalPolicy) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("fixed_string");
+  hash_policy->set_terminal(true);
+  auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
+  hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"fixed_string", "fixed_value"},
+      {"random_string", absl::StrFormat("%" PRIu32, rand())},
+  };
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  CheckRpcSendOk(100, rpc_options);
+  bool found = false;
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    if (backends_[i]->backend_service()->request_count() > 0) {
+      EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+          << "backend " << i;
+      EXPECT_FALSE(found) << "backend " << i;
+      found = true;
+    }
+  }
+  EXPECT_TRUE(found);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that the channel will go from idle to ready via connecting;
+// (tho it is not possible to catch the connecting state before moving to
+// ready)
+TEST_P(CdsTest, RingHashIdleToReady) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+  CheckRpcSendOk();
+  EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when the first pick is down leading to a transient failure, we
+// will move on to the next ring hash entry.
+TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+  const int unused_port = grpc_pick_unused_port_or_die();
+  endpoints.emplace_back(unused_port);
+  endpoints.emplace_back(backends_[1]->port());
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", std::move(endpoints)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash",
+       CreateMetadataValueThatHashesToBackendPort(unused_port)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  WaitForBackend(1, WaitForBackendOptions(), rpc_options);
+  CheckRpcSendOk(100, rpc_options);
+  EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
+  EXPECT_EQ(100, backends_[1]->backend_service()->request_count());
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when a backend goes down, we will move on to the next subchannel
+// (with a lower priority).  When the backend comes back up, traffic will move
+// back.
+TEST_P(CdsTest, RingHashSwitchToLowerPrioirtyAndThenBack) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
+       0},
+      {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
+       1},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+  ShutdownBackend(0);
+  WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true),
+                 rpc_options);
+  StartBackend(0);
+  WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+  CheckRpcSendOk(100, rpc_options);
+  EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
+  EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when all backends are down, we will keep reattempting.
+TEST_P(CdsTest, RingHashAllFailReattempt) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const uint32_t kConnectionTimeoutMilliseconds = 5000;
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+  endpoints.emplace_back(grpc_pick_unused_port_or_die());
+  endpoints.emplace_back(backends_[1]->port());
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", std::move(endpoints)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+  ShutdownBackend(1);
+  CheckRpcSendFailure(1, rpc_options);
+  StartBackend(1);
+  // Ensure we are actively connecting without any traffic.
+  EXPECT_TRUE(channel_->WaitForConnected(
+      grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when all backends are down and then up, we may pick a TF backend
+// and we will then jump to ready backend.
+TEST_P(CdsTest, RingHashTransientFailureSkipToAvailableReady) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  const uint32_t kConnectionTimeoutMilliseconds = 5000;
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_header()->set_header_name("address_hash");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+  // Make sure we include some unused ports to fill the ring.
+  endpoints.emplace_back(backends_[0]->port());
+  endpoints.emplace_back(backends_[1]->port());
+  endpoints.emplace_back(grpc_pick_unused_port_or_die());
+  endpoints.emplace_back(grpc_pick_unused_port_or_die());
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", std::move(endpoints)},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  std::vector<std::pair<std::string, std::string>> metadata = {
+      {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+  const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+  EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+  ShutdownBackend(0);
+  ShutdownBackend(1);
+  CheckRpcSendFailure(1, rpc_options);
+  EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
+  // Bring up 0, should be picked as the RPC is hashed to it.
+  StartBackend(0);
+  EXPECT_TRUE(channel_->WaitForConnected(
+      grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+  WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+  // Bring down 0 and bring up 1.
+  // Note the RPC contains a header value that will always be hashed to
+  // backend 0. So by purposely bring down backend 0 and bring up another
+  // backend, this will ensure Picker's first choice of backend 0 will fail
+  // and it will
+  // 1. reattempt backend 0 and
+  // 2. go through the remaining subchannels to find one in READY.
+  // Since the the entries in the ring is pretty distributed and we have
+  // unused ports to fill the ring, it is almost guaranteed that the Picker
+  // will go through some non-READY entries and skip them as per design.
+  ShutdownBackend(0);
+  CheckRpcSendFailure(1, rpc_options);
+  StartBackend(1);
+  EXPECT_TRUE(channel_->WaitForConnected(
+      grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+  WaitForBackend(1, WaitForBackendOptions(), rpc_options);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test unspported hash policy types are all ignored before a supported
+// policy.
+TEST_P(CdsTest, RingHashUnsupportedHashPolicyUntilChannelIdHashing) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
+  auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
+      true);
+  auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
+  hash_policy_unsupported_3->mutable_query_parameter()->set_name(
+      "query_parameter");
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendOk(100);
+  bool found = false;
+  for (size_t i = 0; i < backends_.size(); ++i) {
+    if (backends_[i]->backend_service()->request_count() > 0) {
+      EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+          << "backend " << i;
+      EXPECT_FALSE(found) << "backend " << i;
+      found = true;
+    }
+  }
+  EXPECT_TRUE(found);
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid hash function (something
+// other than XX_HASH.
+TEST_P(CdsTest, RingHashPolicyHasInvalidHashFunction) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  cluster.mutable_ring_hash_lb_config()->set_hash_function(
+      Cluster::RingHashLbConfig::MURMUR_HASH_2);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+  const auto response_state =
+      balancers_[0]->ads_service()->cds_response_state();
+  EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+  EXPECT_THAT(
+      response_state.error_message,
+      ::testing::HasSubstr("ring hash lb config has invalid hash function."));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidMinimumRingSize) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      0);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+  const auto response_state =
+      balancers_[0]->ads_service()->cds_response_state();
+  EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+  EXPECT_THAT(response_state.error_message,
+              ::testing::HasSubstr(
+                  "min_ring_size is not in the range of 1 to 8388608."));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidMaxmumRingSize) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
+      8388609);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+  const auto response_state =
+      balancers_[0]->ads_service()->cds_response_state();
+  EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+  EXPECT_THAT(response_state.error_message,
+              ::testing::HasSubstr(
+                  "max_ring_size is not in the range of 1 to 8388608."));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidRingSizeMinGreaterThanMax) {
+  gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+  auto cluster = default_cluster_;
+  cluster.set_lb_policy(Cluster::RING_HASH);
+  cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
+      5000);
+  cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+      5001);
+  balancers_[0]->ads_service()->SetCdsResource(cluster);
+  auto new_route_config = default_route_config_;
+  auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+  auto* hash_policy = route->mutable_route()->add_hash_policy();
+  hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+  SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+  AdsServiceImpl::EdsResourceArgs args({
+      {"locality0", CreateEndpointsForBackends()},
+  });
+  balancers_[0]->ads_service()->SetEdsResource(
+      BuildEdsResource(args, DefaultEdsServiceName()));
+  SetNextResolutionForLbChannelAllBalancers();
+  CheckRpcSendFailure();
+  const auto response_state =
+      balancers_[0]->ads_service()->cds_response_state();
+  EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+  EXPECT_THAT(response_state.error_message,
+              ::testing::HasSubstr(
+                  "min_ring_size cannot be greater than max_ring_size."));
+  gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
 class XdsSecurityTest : public BasicTest {
  protected:
   static void SetUpTestCase() {
@@ -6760,8 +7575,8 @@
     root_cert_ = ReadFile(kCaCertPath);
     bad_root_cert_ = ReadFile(kBadClientCertPath);
     identity_pair_ = ReadTlsIdentityPair(kClientKeyPath, kClientCertPath);
-    // TODO(yashykt): Use different client certs here instead of reusing server
-    // certs after https://github.com/grpc/grpc/pull/24876 is merged
+    // TODO(yashykt): Use different client certs here instead of reusing
+    // server certs after https://github.com/grpc/grpc/pull/24876 is merged
     fallback_identity_pair_ =
         ReadTlsIdentityPair(kServerKeyPath, kServerCertPath);
     bad_identity_pair_ =
@@ -7562,7 +8377,8 @@
   EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED);
 }
 
-// Verify that a mismatch of listening address results in "not serving" status.
+// Verify that a mismatch of listening address results in "not serving"
+// status.
 TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) {
   Listener listener;
   listener.set_name(
@@ -8421,8 +9237,8 @@
       HttpConnectionManager());
   filter_chain->mutable_filter_chain_match()->add_application_protocols("h2");
   balancers_[0]->ads_service()->SetLdsResource(listener);
-  // A successful RPC proves that filter chains that mention "raw_buffer" as the
-  // transport protocol are chosen as the best match in the round.
+  // A successful RPC proves that filter chains that mention "raw_buffer" as
+  // the transport protocol are chosen as the best match in the round.
   SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
 }
 
@@ -8449,8 +9265,8 @@
   prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1");
   prefix_range->mutable_prefix_len()->set_value(16);
   filter_chain->mutable_filter_chain_match()->add_server_names("server_name");
-  // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is the
-  // highest match, it should be chosen.
+  // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is
+  // the highest match, it should be chosen.
   filter_chain = listener.add_filter_chains();
   filter_chain->add_filters()->mutable_typed_config()->PackFrom(
       HttpConnectionManager());
@@ -8462,7 +9278,8 @@
       filter_chain->mutable_filter_chain_match()->add_prefix_ranges();
   prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1");
   prefix_range->mutable_prefix_len()->set_value(24);
-  // Add another filter chain with a non-matching prefix range (with length 30)
+  // Add another filter chain with a non-matching prefix range (with length
+  // 30)
   filter_chain = listener.add_filter_chains();
   filter_chain->add_filters()->mutable_typed_config()->PackFrom(
       HttpConnectionManager());
@@ -8528,10 +9345,10 @@
   auto* socket_address = listener.mutable_address()->mutable_socket_address();
   socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
   socket_address->set_port_value(backends_[0]->port());
-  // Add filter chain with source prefix range (length 16) but with a bad source
-  // port mentioned. (Prefix range is matched first.)
-  // Note that backends_[0]->port() will never be a match for the source port
-  // because it is already being used by a backend.
+  // Add filter chain with source prefix range (length 16) but with a bad
+  // source port mentioned. (Prefix range is matched first.) Note that
+  // backends_[0]->port() will never be a match for the source port because it
+  // is already being used by a backend.
   auto* filter_chain = listener.add_filter_chains();
   filter_chain->add_filters()->mutable_typed_config()->PackFrom(
       HttpConnectionManager());
@@ -8545,8 +9362,8 @@
   source_prefix_range->mutable_prefix_len()->set_value(16);
   filter_chain->mutable_filter_chain_match()->add_source_ports(
       backends_[0]->port());
-  // Add filter chain with two source prefix ranges (length 8 and 24). Since 24
-  // is the highest match, it should be chosen.
+  // Add filter chain with two source prefix ranges (length 8 and 24). Since
+  // 24 is the highest match, it should be chosen.
   filter_chain = listener.add_filter_chains();
   filter_chain->add_filters()->mutable_typed_config()->PackFrom(
       HttpConnectionManager());
@@ -8716,8 +9533,8 @@
       HttpConnectionManager());
   filter_chain->mutable_filter_chain_match()->set_transport_protocol(
       "raw_buffer");
-  // Add a duplicate filter chain with the same "raw_buffer" transport protocol
-  // entry
+  // Add a duplicate filter chain with the same "raw_buffer" transport
+  // protocol entry
   filter_chain = listener.add_filter_chains();
   filter_chain->add_filters()->mutable_typed_config()->PackFrom(
       HttpConnectionManager());
@@ -9083,8 +9900,8 @@
   delayed_resource_setter.join();
 }
 
-// Tests that the localities in a locality map are picked correctly after update
-// (addition, modification, deletion).
+// Tests that the localities in a locality map are picked correctly after
+// update (addition, modification, deletion).
 TEST_P(LocalityMapTest, UpdateMap) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -9148,8 +9965,8 @@
       BuildEdsResource(args, DefaultEdsServiceName()));
   // Backend 3 hasn't received any request.
   EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
-  // Wait until the locality update has been processed, as signaled by backend 3
-  // receiving a request.
+  // Wait until the locality update has been processed, as signaled by backend
+  // 3 receiving a request.
   WaitForAllBackends(3, 4);
   gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
   // Send kNumRpcs RPCs.
@@ -9268,8 +10085,8 @@
   EXPECT_EQ(0, std::get<1>(counts));
 }
 
-// If the higher priority localities are not reachable, failover to the highest
-// priority among the rest.
+// If the higher priority localities are not reachable, failover to the
+// highest priority among the rest.
 TEST_P(FailoverTest, Failover) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -9372,8 +10189,8 @@
   delayed_resource_setter.join();
 }
 
-// Tests that after the localities' priorities are updated, we still choose the
-// highest READY priority with the updated localities.
+// Tests that after the localities' priorities are updated, we still choose
+// the highest READY priority with the updated localities.
 TEST_P(FailoverTest, UpdatePriority) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -9624,8 +10441,8 @@
                           {kThrottleDropType, kDropPerMillionForThrottle}};
   balancers_[0]->ads_service()->SetEdsResource(
       BuildEdsResource(args, DefaultEdsServiceName()));
-  // Wait until the drop rate increases to the middle of the two configs, which
-  // implies that the update has been in effect.
+  // Wait until the drop rate increases to the middle of the two configs,
+  // which implies that the update has been in effect.
   const double kDropRateThreshold =
       (kDropRateForLb + kDropRateForLbAndThrottle) / 2;
   size_t num_rpcs = kNumRpcsBoth;
@@ -9693,8 +10510,8 @@
   BalancerUpdateTest() : XdsEnd2endTest(4, 3) {}
 };
 
-// Tests that the old LB call is still used after the balancer address update as
-// long as that call is still alive.
+// Tests that the old LB call is still used after the balancer address update
+// as long as that call is still alive.
 TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -9752,10 +10569,10 @@
 }
 
 // Tests that the old LB call is still used after multiple balancer address
-// updates as long as that call is still alive. Send an update with the same set
-// of LBs as the one in SetUp() in order to verify that the LB channel inside
-// xds keeps the initial connection (which by definition is also present in the
-// update).
+// updates as long as that call is still alive. Send an update with the same
+// set of LBs as the one in SetUp() in order to verify that the LB channel
+// inside xds keeps the initial connection (which by definition is also
+// present in the update).
 TEST_P(BalancerUpdateTest, Repeated) {
   SetNextResolution({});
   SetNextResolutionForLbChannelAllBalancers();
@@ -10187,9 +11004,9 @@
  public:
   FaultInjectionTest() : XdsEnd2endTest(1, 1) {}
 
-  // Builds a Listener with Fault Injection filter config. If the http_fault is
-  // nullptr, then assign an empty filter config. This filter config is required
-  // to enable the fault injection features.
+  // Builds a Listener with Fault Injection filter config. If the http_fault
+  // is nullptr, then assign an empty filter config. This filter config is
+  // required to enable the fault injection features.
   static Listener BuildListenerWithFaultInjection(
       const HTTPFault& http_fault = HTTPFault()) {
     HttpConnectionManager http_connection_manager;
@@ -10501,9 +11318,9 @@
               ::testing::DoubleNear(kAbortRate, kErrorTolerance));
 }
 
-// This test and the above test apply different denominators to delay and abort.
-// This ensures that we are using the right denominator for each injected fault
-// in our code.
+// This test and the above test apply different denominators to delay and
+// abort. This ensures that we are using the right denominator for each
+// injected fault in our code.
 TEST_P(FaultInjectionTest,
        XdsFaultInjectionAlwaysDelayPercentageAbortSwitchDenominator) {
   const uint32_t kAbortPercentagePerMillion = 500000;