ring_hash LB policy implementation (#26285)
* ring_hash LB policy (#25697)
* Ring Hash Policy implementation
* Code review comment fixing
* Fixing code review comments.
* Code review comment fixing
* Fixing reconnect logic
* adding helper method for pick
* Holding on to ref to parent
* first attempt at calling AttemptToConnect
* Fixing state change
* Fixing code review comments
* Fixing the reconnect from channel watcher code
* Fixing the BUILD to include new policy
* Fixing major code review suggestion
* Fixing code review comments
* Fixing code review suggestions
* Initial 2 tests.
* Adding channel id case.
* Fixing code review comments.
* Small change to get the spread of backends
* Add header hashing tests
* Added more tests and debugging
* Fixing Header hash
* Added more tests
* cleanup
* removing debugs
* Fixing code review comments.
* code review fixing
* combining code and match design
* fixing code review comments.
* Fixed IDLE case
* Moving tests
* Fixing code review comments
* Adding more tests according to code review comments.
* Added tests with differetn types of weights
* Adding terminal policy case
* Remove hash_func as there is only 1
* Added nack invalid hash function
* Added NACK cases
* fixing build error
* fixing build
* small warning
* adding regex test
* Adding policy tests
* fixing warning
* fixing warning
* fixing code reivew comments.
* fixing IDLE case
* Code review comments.
* fixing code review comments
* Making a helper function
* fixing reattempt case
* Added afew more tests.
* Adding more tests
* Added backward compatible test
* FIxing the reattempt test
* Clean up
* fixing clang error
* fixing clang error
* Fix logic discovered during code review
* code review comments
* code review comments
* code review comment
* clean up tests
* fixing code review comments
* clean up tests
* Separated test
* Fixing test
* fixing test
* fixing clang error
* Addressing code review suggestions
* Fixing last bit of code review comments
* Fixing flaky tests
* Fixing last bit of code review comments
* clean debugs
* Remove a verbose log
* Relaxing deadline exceeded for 1st RPC until ring is optimized.
Making Hash more efficient for random case.
diff --git a/BUILD b/BUILD
index 34226c8..af81334 100644
--- a/BUILD
+++ b/BUILD
@@ -1105,6 +1105,7 @@
"grpc_client_authority_filter",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_priority",
+ "grpc_lb_policy_ring_hash",
"grpc_lb_policy_round_robin",
"grpc_lb_policy_weighted_target",
"grpc_client_idle_filter",
@@ -1544,6 +1545,7 @@
"grpc_base",
"grpc_client_channel",
"grpc_lb_address_filtering",
+ "grpc_lb_policy_ring_hash",
"grpc_lb_xds_channel_args",
"grpc_lb_xds_common",
"grpc_resolver_fake",
@@ -1636,6 +1638,10 @@
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h",
],
+ external_deps = [
+ "absl/strings",
+ "xxhash",
+ ],
language = "c++",
deps = [
"grpc_base",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5887f75..2e10b03 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2326,6 +2326,7 @@
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
+ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc
diff --git a/Makefile b/Makefile
index 5abbc89..c91e53a 100644
--- a/Makefile
+++ b/Makefile
@@ -1735,6 +1735,7 @@
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
+ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
@@ -2669,7 +2670,6 @@
# installing headers to their final destination on the drive. We need this
# otherwise parallel compilation will fail if a source is compiled first.
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
-src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 31c9d54..7514911 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -1618,6 +1618,7 @@
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+ - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
- src/core/ext/filters/client_channel/lb_policy_factory.h
- src/core/ext/filters/client_channel/lb_policy_registry.h
@@ -1828,6 +1829,7 @@
- src/core/lib/transport/transport.h
- src/core/lib/transport/transport_impl.h
- src/core/lib/uri/uri_parser.h
+ - third_party/xxhash/xxhash.h
src:
- src/core/ext/filters/census/grpc_context.cc
- src/core/ext/filters/client_channel/backend_metric.cc
@@ -1854,6 +1856,7 @@
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
- src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
+ - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
- src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
- src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc
diff --git a/doc/environment_variables.md b/doc/environment_variables.md
index bdb54a0..1aa0719 100644
--- a/doc/environment_variables.md
+++ b/doc/environment_variables.md
@@ -75,6 +75,7 @@
in DEBUG)
- priority_lb - traces priority LB policy
- resource_quota - trace resource quota objects internals
+ - ring_hash_lb - traces the ring hash load balancing policy
- round_robin - traces the round_robin load balancing policy
- queue_pluck
- server_channel - lightweight trace of significant server channel events
diff --git a/grpc.gyp b/grpc.gyp
index 2418290..b717f76 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -1122,6 +1122,7 @@
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
+ 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',
diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
index 921bd2e..ad001d8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
@@ -16,8 +16,767 @@
#include <grpc/support/port_platform.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "absl/strings/numbers.h"
+#include "absl/strings/str_cat.h"
+#define XXH_INLINE_ALL
+#include "xxhash.h"
+
+#include <grpc/support/alloc.h>
+#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
+#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
+#include "src/core/ext/filters/client_channel/subchannel.h"
+#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/sync.h"
+#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/static_metadata.h"
+
namespace grpc_core {
const char* kRequestRingHashAttribute = "request_ring_hash";
+TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
+
+// Helper Parser method
+void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
+ size_t* max_ring_size,
+ std::vector<grpc_error_handle>* error_list) {
+ *min_ring_size = 1024;
+ *max_ring_size = 8388608;
+ if (json.type() != Json::Type::OBJECT) {
+ error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "ring_hash_experimental should be of type object"));
+ return;
+ }
+ const Json::Object& ring_hash = json.object_value();
+ auto ring_hash_it = ring_hash.find("min_ring_size");
+ if (ring_hash_it != ring_hash.end()) {
+ if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+ error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:min_ring_size error: should be of type number"));
+ } else {
+ *min_ring_size = gpr_parse_nonnegative_int(
+ ring_hash_it->second.string_value().c_str());
+ }
+ }
+ ring_hash_it = ring_hash.find("max_ring_size");
+ if (ring_hash_it != ring_hash.end()) {
+ if (ring_hash_it->second.type() != Json::Type::NUMBER) {
+ error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:max_ring_size error: should be of type number"));
+ } else {
+ *max_ring_size = gpr_parse_nonnegative_int(
+ ring_hash_it->second.string_value().c_str());
+ }
+ }
+ if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
+ *max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
+ error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "field:max_ring_size and or min_ring_size error: "
+ "values need to be in the range of 1 to 8388608 "
+ "and max_ring_size cannot be smaller than "
+ "min_ring_size"));
+ }
+}
+
+namespace {
+
+constexpr char kRingHash[] = "ring_hash_experimental";
+
+class RingHashLbConfig : public LoadBalancingPolicy::Config {
+ public:
+ RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
+ : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
+ const char* name() const override { return kRingHash; }
+ size_t min_ring_size() const { return min_ring_size_; }
+ size_t max_ring_size() const { return max_ring_size_; }
+
+ private:
+ size_t min_ring_size_;
+ size_t max_ring_size_;
+};
+
+//
+// ring_hash LB policy
+//
+class RingHash : public LoadBalancingPolicy {
+ public:
+ explicit RingHash(Args args);
+
+ const char* name() const override { return kRingHash; }
+
+ void UpdateLocked(UpdateArgs args) override;
+ void ResetBackoffLocked() override;
+
+ private:
+ ~RingHash() override;
+
+ // Forward declaration.
+ class RingHashSubchannelList;
+
+ // Data for a particular subchannel in a subchannel list.
+ // This subclass adds the following functionality:
+ // - Tracks the previous connectivity state of the subchannel, so that
+ // we know how many subchannels are in each state.
+ class RingHashSubchannelData
+ : public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
+ public:
+ RingHashSubchannelData(
+ SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
+ subchannel_list,
+ const ServerAddress& address,
+ RefCountedPtr<SubchannelInterface> subchannel)
+ : SubchannelData(subchannel_list, address, std::move(subchannel)),
+ address_(address) {}
+
+ grpc_connectivity_state connectivity_state() const {
+ return last_connectivity_state_;
+ }
+ const ServerAddress& address() const { return address_; }
+
+ bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
+
+ // Performs connectivity state updates that need to be done both when we
+ // first start watching and when a watcher notification is received.
+ void UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state);
+
+ private:
+ // Performs connectivity state updates that need to be done only
+ // after we have started watching.
+ void ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state) override;
+
+ ServerAddress address_;
+ grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
+ bool seen_failure_since_ready_ = false;
+ };
+
+ // A list of subchannels.
+ class RingHashSubchannelList
+ : public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
+ public:
+ RingHashSubchannelList(RingHash* policy, TraceFlag* tracer,
+ ServerAddressList addresses,
+ const grpc_channel_args& args)
+ : SubchannelList(policy, tracer, std::move(addresses),
+ policy->channel_control_helper(), args) {
+ // Need to maintain a ref to the LB policy as long as we maintain
+ // any references to subchannels, since the subchannels'
+ // pollset_sets will include the LB policy's pollset_set.
+ policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
+ }
+
+ ~RingHashSubchannelList() override {
+ RingHash* p = static_cast<RingHash*>(policy());
+ p->Unref(DEBUG_LOCATION, "subchannel_list");
+ }
+
+ // Starts watching the subchannels in this list.
+ void StartWatchingLocked();
+
+ // Updates the counters of subchannels in each state when a
+ // subchannel transitions from old_state to new_state.
+ void UpdateStateCountersLocked(grpc_connectivity_state old_state,
+ grpc_connectivity_state new_state);
+
+ // Updates the RH policy's connectivity state based on the
+ // subchannel list's state counters, creating new picker and new ring.
+ // Furthermore, return a bool indicating whether the aggregated state is
+ // Transient Failure.
+ bool UpdateRingHashConnectivityStateLocked();
+
+ private:
+ size_t num_idle_ = 0;
+ size_t num_ready_ = 0;
+ size_t num_connecting_ = 0;
+ size_t num_transient_failure_ = 0;
+ };
+
+ class Picker : public SubchannelPicker {
+ public:
+ Picker(RefCountedPtr<RingHash> parent,
+ RingHashSubchannelList* subchannel_list);
+
+ PickResult Pick(PickArgs args) override;
+
+ private:
+ struct RingEntry {
+ uint64_t hash;
+ RefCountedPtr<SubchannelInterface> subchannel;
+ grpc_connectivity_state connectivity_state;
+ };
+
+ // A fire-and-forget class that schedules subchannel connection attempts
+ // on the control plane WorkSerializer.
+ class SubchannelConnectionAttempter : public Orphanable {
+ public:
+ explicit SubchannelConnectionAttempter(
+ RefCountedPtr<RingHash> ring_hash_lb)
+ : ring_hash_lb_(std::move(ring_hash_lb)) {
+ GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
+ }
+
+ void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
+ subchannels_.push_back(std::move(subchannel));
+ }
+
+ void Orphan() override {
+ // Hop into ExecCtx, so that we're not holding the data plane mutex
+ // while we run control-plane code.
+ ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
+ }
+
+ private:
+ static void RunInExecCtx(void* arg, grpc_error* /*error*/) {
+ auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
+ self->ring_hash_lb_->work_serializer()->Run(
+ [self]() {
+ if (!self->ring_hash_lb_->shutdown_) {
+ for (auto& subchannel : self->subchannels_) {
+ subchannel->AttemptToConnect();
+ }
+ }
+ delete self;
+ },
+ DEBUG_LOCATION);
+ }
+
+ RefCountedPtr<RingHash> ring_hash_lb_;
+ grpc_closure closure_;
+ absl::InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
+ };
+
+ RefCountedPtr<RingHash> parent_;
+
+ // A ring of subchannels.
+ std::vector<RingEntry> ring_;
+ };
+
+ void ShutdownLocked() override;
+
+ // Current config from resolver.
+ RefCountedPtr<RingHashLbConfig> config_;
+
+ // list of subchannels.
+ OrphanablePtr<RingHashSubchannelList> subchannel_list_;
+ // indicating if we are shutting down.
+ bool shutdown_ = false;
+};
+
+//
+// RingHash::Picker
+//
+
+RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
+ RingHashSubchannelList* subchannel_list)
+ : parent_(std::move(parent)) {
+ size_t num_subchannels = subchannel_list->num_subchannels();
+ // Store the weights while finding the sum.
+ struct AddressWeight {
+ std::string address;
+ // Default weight is 1 for the cases where a weight is not provided,
+ // each occurrence of the address will be counted a weight value of 1.
+ uint32_t weight = 1;
+ double normalized_weight;
+ };
+ std::vector<AddressWeight> address_weights;
+ size_t sum = 0;
+ address_weights.reserve(num_subchannels);
+ for (size_t i = 0; i < num_subchannels; ++i) {
+ RingHashSubchannelData* sd = subchannel_list->subchannel(i);
+ const ServerAddressWeightAttribute* weight_attribute = static_cast<
+ const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
+ ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+ AddressWeight address_weight;
+ address_weight.address =
+ grpc_sockaddr_to_string(&sd->address().address(), false);
+ if (weight_attribute != nullptr) {
+ GPR_ASSERT(weight_attribute->weight() != 0);
+ address_weight.weight = weight_attribute->weight();
+ }
+ sum += address_weight.weight;
+ address_weights.push_back(std::move(address_weight));
+ }
+ // Calculating normalized weights and find min and max.
+ double min_normalized_weight = 1.0;
+ double max_normalized_weight = 0.0;
+ for (auto& address : address_weights) {
+ address.normalized_weight = static_cast<double>(address.weight) / sum;
+ min_normalized_weight =
+ std::min(address.normalized_weight, min_normalized_weight);
+ max_normalized_weight =
+ std::max(address.normalized_weight, max_normalized_weight);
+ }
+ // Scale up the number of hashes per host such that the least-weighted host
+ // gets a whole number of hashes on the ring. Other hosts might not end up
+ // with whole numbers, and that's fine (the ring-building algorithm below can
+ // handle this). This preserves the original implementation's behavior: when
+ // weights aren't provided, all hosts should get an equal number of hashes. In
+ // the case where this number exceeds the max_ring_size, it's scaled back down
+ // to fit.
+ const size_t min_ring_size = parent_->config_->min_ring_size();
+ const size_t max_ring_size = parent_->config_->max_ring_size();
+ const double scale = std::min(
+ std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
+ static_cast<double>(max_ring_size));
+ // Reserve memory for the entire ring up front.
+ const uint64_t ring_size = std::ceil(scale);
+ ring_.reserve(ring_size);
+ // Populate the hash ring by walking through the (host, weight) pairs in
+ // normalized_host_weights, and generating (scale * weight) hashes for each
+ // host. Since these aren't necessarily whole numbers, we maintain running
+ // sums -- current_hashes and target_hashes -- which allows us to populate the
+ // ring in a mostly stable way.
+ absl::InlinedVector<char, 196> hash_key_buffer;
+ double current_hashes = 0.0;
+ double target_hashes = 0.0;
+ uint64_t min_hashes_per_host = ring_size;
+ uint64_t max_hashes_per_host = 0;
+ for (size_t i = 0; i < num_subchannels; ++i) {
+ const std::string& address_string = address_weights[i].address;
+ hash_key_buffer.assign(address_string.begin(), address_string.end());
+ hash_key_buffer.emplace_back('_');
+ auto offset_start = hash_key_buffer.end();
+ target_hashes += scale * address_weights[i].normalized_weight;
+ size_t count = 0;
+ auto current_state =
+ subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState();
+ while (current_hashes < target_hashes) {
+ const std::string count_str = absl::StrCat(count);
+ hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
+ absl::string_view hash_key(hash_key_buffer.data(),
+ hash_key_buffer.size());
+ const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
+ ring_.push_back({hash,
+ subchannel_list->subchannel(i)->subchannel()->Ref(),
+ current_state});
+ ++count;
+ ++current_hashes;
+ hash_key_buffer.erase(offset_start, hash_key_buffer.end());
+ }
+ min_hashes_per_host =
+ std::min(static_cast<uint64_t>(i), min_hashes_per_host);
+ max_hashes_per_host =
+ std::max(static_cast<uint64_t>(i), max_hashes_per_host);
+ }
+ std::sort(ring_.begin(), ring_.end(),
+ [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
+ return lhs.hash < rhs.hash;
+ });
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO,
+ "[RH %p picker %p] created picker from subchannel_list=%p "
+ "with %" PRIuPTR " ring entries",
+ parent_.get(), this, subchannel_list, ring_.size());
+ // for (const auto& r : ring_) {
+ // gpr_log(GPR_INFO, "donn ring hash: %" PRIx64 " subchannel: %p state: %d",
+ // r.hash, r.subchannel.get(), r.connectivity_state);
+ //}
+ }
+}
+
+RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
+ PickResult result;
+ // Initialize to PICK_FAILED.
+ result.type = PickResult::PICK_FAILED;
+ auto hash =
+ args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
+ uint64_t h;
+ if (!absl::SimpleAtoi(hash, &h)) {
+ result.error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("xds ring hash value is not a number").c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
+ return result;
+ }
+ // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
+ // (ketama_get_server) NOTE: The algorithm depends on using signed integers
+ // for lowp, highp, and first_index. Do not change them!
+ int64_t lowp = 0;
+ int64_t highp = ring_.size();
+ int64_t first_index = 0;
+ while (true) {
+ first_index = (lowp + highp) / 2;
+ if (first_index == static_cast<int64_t>(ring_.size())) {
+ first_index = 0;
+ break;
+ }
+ uint64_t midval = ring_[first_index].hash;
+ uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash;
+ if (h <= midval && h > midval1) {
+ break;
+ }
+ if (midval < h) {
+ lowp = first_index + 1;
+ } else {
+ highp = first_index - 1;
+ }
+ if (lowp > highp) {
+ first_index = 0;
+ break;
+ }
+ }
+ OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
+ auto ScheduleSubchannelConnectionAttempt =
+ [&](RefCountedPtr<SubchannelInterface> subchannel) {
+ if (subchannel_connection_attempter == nullptr) {
+ subchannel_connection_attempter =
+ MakeOrphanable<SubchannelConnectionAttempter>(parent_);
+ }
+ subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
+ };
+ switch (ring_[first_index].connectivity_state) {
+ case GRPC_CHANNEL_READY:
+ result.type = PickResult::PICK_COMPLETE;
+ result.subchannel = ring_[first_index].subchannel;
+ return result;
+ case GRPC_CHANNEL_IDLE:
+ ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
+ // fallthrough
+ case GRPC_CHANNEL_CONNECTING:
+ result.type = PickResult::PICK_QUEUE;
+ return result;
+ default: // GRPC_CHANNEL_TRANSIENT_FAILURE
+ break;
+ }
+ ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
+ // Loop through remaining subchannels to find one in READY.
+ // On the way, we make sure the right set of connection attempts
+ // will happen.
+ bool found_second_subchannel = false;
+ bool found_first_non_failed = false;
+ for (size_t i = 1; i < ring_.size(); ++i) {
+ const RingEntry& entry = ring_[(first_index + i) % ring_.size()];
+ if (entry.subchannel == ring_[first_index].subchannel) {
+ continue;
+ }
+ if (entry.connectivity_state == GRPC_CHANNEL_READY) {
+ result.type = PickResult::PICK_COMPLETE;
+ result.subchannel = entry.subchannel;
+ return result;
+ }
+ if (!found_second_subchannel) {
+ switch (entry.connectivity_state) {
+ case GRPC_CHANNEL_IDLE:
+ ScheduleSubchannelConnectionAttempt(entry.subchannel);
+ // fallthrough
+ case GRPC_CHANNEL_CONNECTING:
+ result.type = PickResult::PICK_QUEUE;
+ return result;
+ default:
+ break;
+ }
+ found_second_subchannel = true;
+ }
+ if (!found_first_non_failed) {
+ if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ScheduleSubchannelConnectionAttempt(entry.subchannel);
+ } else {
+ if (entry.connectivity_state == GRPC_CHANNEL_IDLE) {
+ ScheduleSubchannelConnectionAttempt(entry.subchannel);
+ }
+ found_first_non_failed = true;
+ }
+ }
+ }
+ result.error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat("xds ring hash found a subchannel "
+ "that is in TRANSIENT_FAILURE state")
+ .c_str()),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
+ return result;
+}
+
+//
+// RingHash::RingHashSubchannelList
+//
+
+void RingHash::RingHashSubchannelList::StartWatchingLocked() {
+ if (num_subchannels() == 0) return;
+ // Check current state of each subchannel synchronously.
+ for (size_t i = 0; i < num_subchannels(); ++i) {
+ grpc_connectivity_state state =
+ subchannel(i)->CheckConnectivityStateLocked();
+ subchannel(i)->UpdateConnectivityStateLocked(state);
+ }
+ // Start connectivity watch for each subchannel.
+ for (size_t i = 0; i < num_subchannels(); i++) {
+ if (subchannel(i)->subchannel() != nullptr) {
+ subchannel(i)->StartConnectivityWatchLocked();
+ }
+ }
+ RingHash* p = static_cast<RingHash*>(policy());
+ // Sending up the initial picker while all subchannels are in IDLE state.
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_READY, absl::Status(),
+ absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+ this));
+}
+
+void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
+ grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
+ GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
+ if (old_state == GRPC_CHANNEL_IDLE) {
+ GPR_ASSERT(num_idle_ > 0);
+ --num_idle_;
+ } else if (old_state == GRPC_CHANNEL_READY) {
+ GPR_ASSERT(num_ready_ > 0);
+ --num_ready_;
+ } else if (old_state == GRPC_CHANNEL_CONNECTING) {
+ GPR_ASSERT(num_connecting_ > 0);
+ --num_connecting_;
+ } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ GPR_ASSERT(num_transient_failure_ > 0);
+ --num_transient_failure_;
+ }
+ if (new_state == GRPC_CHANNEL_IDLE) {
+ ++num_idle_;
+ } else if (new_state == GRPC_CHANNEL_READY) {
+ ++num_ready_;
+ } else if (new_state == GRPC_CHANNEL_CONNECTING) {
+ ++num_connecting_;
+ } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ ++num_transient_failure_;
+ }
+}
+
+// Sets the RH policy's connectivity state and generates a new picker based
+// on the current subchannel list or requests an re-attempt by returning true..
+bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
+ RingHash* p = static_cast<RingHash*>(policy());
+ // Only set connectivity state if this is the current subchannel list.
+ if (p->subchannel_list_.get() != this) return false;
+ // The overall aggregation rules here are:
+ // 1. If there is at least one subchannel in READY state, report READY.
+ // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
+ // TRANSIENT_FAILURE.
+ // 3. If there is at least one subchannel in CONNECTING state, report
+ // CONNECTING.
+ // 4. If there is at least one subchannel in IDLE state, report IDLE.
+ // 5. Otherwise, report TRANSIENT_FAILURE.
+ if (num_ready_ > 0) {
+ /* READY */
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_READY, absl::Status(),
+ absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+ this));
+ return false;
+ }
+ if (num_connecting_ > 0 && num_transient_failure_ < 2) {
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_CONNECTING, absl::Status(),
+ absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
+ return false;
+ }
+ if (num_idle_ > 0 && num_transient_failure_ < 2) {
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_IDLE, absl::Status(),
+ absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
+ this));
+ return false;
+ }
+ grpc_error* error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "connections to backend failing or idle"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ p->channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
+ absl::make_unique<TransientFailurePicker>(error));
+ return true;
+}
+
+//
+// RingHash::RingHashSubchannelData
+//
+
+void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
+ grpc_connectivity_state connectivity_state) {
+ RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(
+ GPR_INFO,
+ "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
+ "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
+ p, subchannel(), subchannel_list(), Index(),
+ subchannel_list()->num_subchannels(),
+ ConnectivityStateName(last_connectivity_state_),
+ ConnectivityStateName(connectivity_state));
+ }
+ // Decide what state to report for aggregation purposes.
+ // If we haven't seen a failure since the last time we were in state
+ // READY, then we report the state change as-is. However, once we do see
+ // a failure, we report TRANSIENT_FAILURE and do not report any subsequent
+ // state changes until we go back into state READY.
+ if (!seen_failure_since_ready_) {
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ seen_failure_since_ready_ = true;
+ }
+ subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
+ connectivity_state);
+ } else {
+ if (connectivity_state == GRPC_CHANNEL_READY) {
+ seen_failure_since_ready_ = false;
+ subchannel_list()->UpdateStateCountersLocked(
+ GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
+ }
+ }
+ // Record last seen connectivity state.
+ last_connectivity_state_ = connectivity_state;
+}
+
+void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
+ grpc_connectivity_state connectivity_state) {
+ RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
+ GPR_ASSERT(subchannel() != nullptr);
+ // If the new state is TRANSIENT_FAILURE, re-resolve.
+ // Only do this if we've started watching, not at startup time.
+ // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
+ // when the subchannel list was created, we'd wind up in a constant
+ // loop of re-resolution.
+ // Also attempt to reconnect.
+ if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO,
+ "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
+ "Requesting re-resolution",
+ p, subchannel());
+ }
+ p->channel_control_helper()->RequestReresolution();
+ }
+ // Update state counters.
+ UpdateConnectivityStateLocked(connectivity_state);
+ // Update the RH policy's connectivity state, creating new picker and new
+ // ring.
+ bool transient_failure =
+ subchannel_list()->UpdateRingHashConnectivityStateLocked();
+ // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
+ // not be getting any pick requests from the priority policy.
+ // However, because the ring_hash policy does not attempt to
+ // reconnect to subchannels unless it is getting pick requests,
+ // it will need special handling to ensure that it will eventually
+ // recover from TRANSIENT_FAILURE state once the problem is resolved.
+ // Specifically, it will make sure that it is attempting to connect to
+ // at least one subchannel at any given time. After a given subchannel
+ // fails a connection attempt, it will move on to the next subchannel
+ // in the ring. It will keep doing this until one of the subchannels
+ // successfully connects, at which point it will report READY and stop
+ // proactively trying to connect. The policy will remain in
+ // TRANSIENT_FAILURE until at least one subchannel becomes connected,
+ // even if subchannels are in state CONNECTING during that time.
+ if (transient_failure &&
+ connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
+ RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index);
+ next_sd->subchannel()->AttemptToConnect();
+ }
+}
+
+//
+// RingHash
+//
+
+RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO, "[RH %p] Created", this);
+ }
+}
+
+RingHash::~RingHash() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
+ }
+ GPR_ASSERT(subchannel_list_ == nullptr);
+}
+
+void RingHash::ShutdownLocked() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
+ }
+ shutdown_ = true;
+ subchannel_list_.reset();
+}
+
+void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
+
+void RingHash::UpdateLocked(UpdateArgs args) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
+ gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
+ this, args.addresses.size());
+ }
+ config_ = std::move(args.config);
+ // Filter out any address with weight 0.
+ ServerAddressList addresses;
+ addresses.reserve(args.addresses.size());
+ for (ServerAddress& address : args.addresses) {
+ const ServerAddressWeightAttribute* weight_attribute =
+ static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
+ ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+ if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
+ addresses.push_back(std::move(address));
+ }
+ }
+ subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
+ this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
+ if (subchannel_list_->num_subchannels() == 0) {
+ // If the new list is empty, immediately transition to TRANSIENT_FAILURE.
+ grpc_error* error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
+ channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
+ absl::make_unique<TransientFailurePicker>(error));
+ } else {
+ // Start watching the new list.
+ subchannel_list_->StartWatchingLocked();
+ }
+}
+
+//
+// factory
+//
+
+class RingHashFactory : public LoadBalancingPolicyFactory {
+ public:
+ OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
+ LoadBalancingPolicy::Args args) const override {
+ return MakeOrphanable<RingHash>(std::move(args));
+ }
+
+ const char* name() const override { return kRingHash; }
+
+ RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
+ const Json& json, grpc_error** error) const override {
+ size_t min_ring_size;
+ size_t max_ring_size;
+ std::vector<grpc_error_handle> error_list;
+ ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list);
+ if (error_list.empty()) {
+ return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
+ } else {
+ *error = GRPC_ERROR_CREATE_FROM_VECTOR(
+ "ring_hash_experimental LB policy config", &error_list);
+ return nullptr;
+ }
+ }
+};
+
+} // namespace
+
+void GrpcLbPolicyRingHashInit() {
+ grpc_core::LoadBalancingPolicyRegistry::Builder::
+ RegisterLoadBalancingPolicyFactory(
+ absl::make_unique<grpc_core::RingHashFactory>());
+}
+
+void GrpcLbPolicyRingHashShutdown() {}
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
index dc176c2..f0f86b2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
+++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
@@ -19,9 +19,19 @@
#include <grpc/support/port_platform.h>
+#include <stdlib.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/json/json.h"
+
namespace grpc_core {
extern const char* kRequestRingHashAttribute;
+// Helper Parsing method to parse ring hash policy configs; for example, ring
+// hash size validity.
+void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
+ size_t* max_ring_size,
+ std::vector<grpc_error_handle>* error_list);
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_RING_HASH_RING_HASH_H
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
index 49bff27..3531ca5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
@@ -452,22 +452,9 @@
// Construct config for child policy.
Json::Object xds_lb_policy;
if (cluster_data.lb_policy == "RING_HASH") {
- std::string hash_function;
- switch (cluster_data.hash_function) {
- case XdsApi::CdsUpdate::HashFunction::XX_HASH:
- hash_function = "XX_HASH";
- break;
- case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
- hash_function = "MURMUR_HASH_2";
- break;
- default:
- GPR_ASSERT(0);
- break;
- }
xds_lb_policy["RING_HASH"] = Json::Object{
{"min_ring_size", cluster_data.min_ring_size},
{"max_ring_size", cluster_data.max_ring_size},
- {"hash_function", hash_function},
};
} else {
xds_lb_policy["ROUND_ROBIN"] = Json::Object();
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
index bcb8194..62851e8 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
@@ -28,6 +28,7 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
+#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
@@ -834,6 +835,13 @@
std::vector<std::string> hierarchical_path = {
priority_child_name, locality_name->AsHumanReadableString()};
for (const auto& endpoint : locality.endpoints) {
+ const ServerAddressWeightAttribute* weight_attribute = static_cast<
+ const ServerAddressWeightAttribute*>(endpoint.GetAttribute(
+ ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
+ uint32_t weight = locality.lb_weight;
+ if (weight_attribute != nullptr) {
+ weight = locality.lb_weight * weight_attribute->weight();
+ }
addresses.emplace_back(
endpoint
.WithAttribute(kHierarchicalPathAttributeKey,
@@ -841,10 +849,10 @@
.WithAttribute(kXdsLocalityNameAttributeKey,
absl::make_unique<XdsLocalityAttribute>(
locality_name->Ref()))
- .WithAttribute(ServerAddressWeightAttribute::
- kServerAddressWeightAttributeKey,
- absl::make_unique<ServerAddressWeightAttribute>(
- locality.lb_weight)));
+ .WithAttribute(
+ ServerAddressWeightAttribute::
+ kServerAddressWeightAttributeKey,
+ absl::make_unique<ServerAddressWeightAttribute>(weight)));
}
}
}
@@ -1201,65 +1209,11 @@
}
policy_it = policy.find("RING_HASH");
if (policy_it != policy.end()) {
- if (policy_it->second.type() != Json::Type::OBJECT) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:RING_HASH error:type should be object"));
- continue;
- }
- // TODO(donnadionne): Move this to a method in
- // ring_hash_experimental and call it here.
- const Json::Object& ring_hash = policy_it->second.object_value();
xds_lb_policy = array[i];
- size_t min_ring_size = 1024;
- size_t max_ring_size = 8388608;
- auto ring_hash_it = ring_hash.find("min_ring_size");
- if (ring_hash_it == ring_hash.end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:min_ring_size missing"));
- } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:min_ring_size error: should be of "
- "number"));
- } else {
- min_ring_size = gpr_parse_nonnegative_int(
- ring_hash_it->second.string_value().c_str());
- }
- ring_hash_it = ring_hash.find("max_ring_size");
- if (ring_hash_it == ring_hash.end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:max_ring_size missing"));
- } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:max_ring_size error: should be of "
- "number"));
- } else {
- max_ring_size = gpr_parse_nonnegative_int(
- ring_hash_it->second.string_value().c_str());
- }
- if (min_ring_size <= 0 || min_ring_size > 8388608 ||
- max_ring_size <= 0 || max_ring_size > 8388608 ||
- min_ring_size > max_ring_size) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:max_ring_size and or min_ring_size error: "
- "values need to be in the range of 1 to 8388608 "
- "and max_ring_size cannot be smaller than "
- "min_ring_size"));
- }
- ring_hash_it = ring_hash.find("hash_function");
- if (ring_hash_it == ring_hash.end()) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:hash_function missing"));
- } else if (ring_hash_it->second.type() != Json::Type::STRING) {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:hash_function error: should be a "
- "string"));
- } else if (ring_hash_it->second.string_value() != "XX_HASH" &&
- ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
- error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "field:hash_function error: unsupported "
- "hash_function"));
- }
- break;
+ size_t min_ring_size;
+ size_t max_ring_size;
+ ParseRingHashLbConfig(policy_it->second, &min_ring_size,
+ &max_ring_size, &error_list);
}
}
}
diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
index 3a10a42..871d756 100644
--- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
@@ -568,6 +568,9 @@
std::string value_buffer;
absl::optional<absl::string_view> header_value =
GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
+ if (!header_value.has_value()) {
+ return absl::nullopt;
+ }
if (policy.regex != nullptr) {
// If GetHeaderValue() did not already store the value in
// value_buffer, copy it there now, so we can modify it.
@@ -649,10 +652,13 @@
case XdsApi::Route::HashPolicy::HEADER:
new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
break;
- case XdsApi::Route::HashPolicy::CHANNEL_ID:
- new_hash =
- static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver));
+ case XdsApi::Route::HashPolicy::CHANNEL_ID: {
+ std::string address_str = absl::StrFormat(
+ "%" PRIu64,
+ static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver)));
+ new_hash = XXH64(address_str.c_str(), address_str.length(), 0);
break;
+ }
default:
GPR_ASSERT(0);
}
@@ -671,7 +677,12 @@
}
if (!hash.has_value()) {
// If there is no hash, we just choose a random value as a default.
- hash = rand();
+ // We cannot directly use the result of rand() as the hash value,
+ // since it is a 32-bit number and not a 64-bit number and will
+ // therefore not be evenly distributed.
+ uint32_t upper = rand();
+ uint32_t lower = rand();
+ hash = (static_cast<uint64_t>(upper) << 32) | lower;
}
CallConfig call_config;
if (method_config != nullptr) {
@@ -680,8 +691,12 @@
call_config.service_config = std::move(method_config);
}
call_config.call_attributes[kXdsClusterAttribute] = it->first;
- call_config.call_attributes[kRequestRingHashAttribute] =
- absl::StrFormat("%" PRIu64, hash.value());
+ std::string hash_string = absl::StrCat(hash.value());
+ char* hash_value =
+ static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
+ memcpy(hash_value, hash_string.c_str(), hash_string.size());
+ hash_value[hash_string.size()] = '\0';
+ call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
call_config.on_call_committed = [resolver, cluster_state]() {
cluster_state->Unref();
ExecCtx::Run(
diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc
index e51bc07..610d4e2 100644
--- a/src/core/ext/xds/xds_api.cc
+++ b/src/core/ext/xds/xds_api.cc
@@ -1605,40 +1605,35 @@
regex_rewrite =
envoy_config_route_v3_RouteAction_HashPolicy_Header_regex_rewrite(
header);
- if (regex_rewrite == nullptr) {
- gpr_log(
- GPR_DEBUG,
- "RouteAction HashPolicy contains policy specifier Header with "
- "RegexMatchAndSubstitution but Regex is missing");
- continue;
+ if (regex_rewrite != nullptr) {
+ const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
+ envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
+ regex_rewrite);
+ if (regex_matcher == nullptr) {
+ gpr_log(
+ GPR_DEBUG,
+ "RouteAction HashPolicy contains policy specifier Header with "
+ "RegexMatchAndSubstitution but RegexMatcher pattern is "
+ "missing");
+ continue;
+ }
+ RE2::Options options;
+ policy.regex = absl::make_unique<RE2>(
+ UpbStringToStdString(
+ envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
+ options);
+ if (!policy.regex->ok()) {
+ gpr_log(
+ GPR_DEBUG,
+ "RouteAction HashPolicy contains policy specifier Header with "
+ "RegexMatchAndSubstitution but RegexMatcher pattern does not "
+ "compile");
+ continue;
+ }
+ policy.regex_substitution = UpbStringToStdString(
+ envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
+ regex_rewrite));
}
- const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
- envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
- regex_rewrite);
- if (regex_matcher == nullptr) {
- gpr_log(
- GPR_DEBUG,
- "RouteAction HashPolicy contains policy specifier Header with "
- "RegexMatchAndSubstitution but RegexMatcher pattern is "
- "missing");
- continue;
- }
- RE2::Options options;
- policy.regex = absl::make_unique<RE2>(
- UpbStringToStdString(
- envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
- options);
- if (!policy.regex->ok()) {
- gpr_log(
- GPR_DEBUG,
- "RouteAction HashPolicy contains policy specifier Header with "
- "RegexMatchAndSubstitution but RegexMatcher pattern does not "
- "compile");
- continue;
- }
- policy.regex_substitution = UpbStringToStdString(
- envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
- regex_rewrite));
} else if ((filter_state =
envoy_config_route_v3_RouteAction_HashPolicy_filter_state(
hash_policy)) != nullptr) {
@@ -2815,75 +2810,61 @@
// Record ring hash lb config
auto* ring_hash_config =
envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
- if (ring_hash_config == nullptr) {
- errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat(cluster_name,
- ": ring hash lb config required but not present.")
- .c_str()));
- resource_names_failed->insert(cluster_name);
- continue;
- }
- const google_protobuf_UInt64Value* max_ring_size =
- envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
- ring_hash_config);
- if (max_ring_size != nullptr) {
- cds_update.max_ring_size =
- google_protobuf_UInt64Value_value(max_ring_size);
- if (cds_update.max_ring_size > 8388608 ||
- cds_update.max_ring_size == 0) {
+ if (ring_hash_config != nullptr) {
+ const google_protobuf_UInt64Value* max_ring_size =
+ envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
+ ring_hash_config);
+ if (max_ring_size != nullptr) {
+ cds_update.max_ring_size =
+ google_protobuf_UInt64Value_value(max_ring_size);
+ if (cds_update.max_ring_size > 8388608 ||
+ cds_update.max_ring_size == 0) {
+ errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat(
+ cluster_name,
+ ": max_ring_size is not in the range of 1 to 8388608.")
+ .c_str()));
+ resource_names_failed->insert(cluster_name);
+ continue;
+ }
+ }
+ const google_protobuf_UInt64Value* min_ring_size =
+ envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
+ ring_hash_config);
+ if (min_ring_size != nullptr) {
+ cds_update.min_ring_size =
+ google_protobuf_UInt64Value_value(min_ring_size);
+ if (cds_update.min_ring_size > 8388608 ||
+ cds_update.min_ring_size == 0) {
+ errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat(
+ cluster_name,
+ ": min_ring_size is not in the range of 1 to 8388608.")
+ .c_str()));
+ resource_names_failed->insert(cluster_name);
+ continue;
+ }
+ if (cds_update.min_ring_size > cds_update.max_ring_size) {
+ errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
+ absl::StrCat(
+ cluster_name,
+ ": min_ring_size cannot be greater than max_ring_size.")
+ .c_str()));
+ resource_names_failed->insert(cluster_name);
+ continue;
+ }
+ }
+ if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
+ ring_hash_config) !=
+ envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat(
- cluster_name,
- ": max_ring_size is not in the range of 1 to 8388608.")
+ absl::StrCat(cluster_name,
+ ": ring hash lb config has invalid hash function.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
- const google_protobuf_UInt64Value* min_ring_size =
- envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
- ring_hash_config);
- if (min_ring_size != nullptr) {
- cds_update.min_ring_size =
- google_protobuf_UInt64Value_value(min_ring_size);
- if (cds_update.min_ring_size > 8388608 ||
- cds_update.min_ring_size == 0) {
- errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat(
- cluster_name,
- ": min_ring_size is not in the range of 1 to 8388608.")
- .c_str()));
- resource_names_failed->insert(cluster_name);
- continue;
- }
- if (cds_update.min_ring_size > cds_update.max_ring_size) {
- errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat(
- cluster_name,
- ": min_ring_size cannot be greater than max_ring_size.")
- .c_str()));
- resource_names_failed->insert(cluster_name);
- continue;
- }
- }
- if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
- ring_hash_config) ==
- envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
- cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH;
- } else if (
- envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
- ring_hash_config) ==
- envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) {
- cds_update.hash_function =
- XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2;
- } else {
- errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
- absl::StrCat(cluster_name,
- ": ring hash lb config has invalid hash function.")
- .c_str()));
- resource_names_failed->insert(cluster_name);
- continue;
- }
} else {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name, ": LB policy is not supported.").c_str()));
@@ -3014,13 +2995,28 @@
if (GPR_UNLIKELY(port >> 16) != 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
}
+ // Find load_balancing_weight for the endpoint.
+ const google_protobuf_UInt32Value* load_balancing_weight =
+ envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
+ const int32_t weight =
+ load_balancing_weight != nullptr
+ ? google_protobuf_UInt32Value_value(load_balancing_weight)
+ : 500;
+ if (weight == 0) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Invalid endpoint weight of 0.");
+ }
// Populate grpc_resolved_address.
grpc_resolved_address addr;
grpc_error_handle error =
grpc_string_to_sockaddr(&addr, address_str.c_str(), port);
if (error != GRPC_ERROR_NONE) return error;
// Append the address to the list.
- list->emplace_back(addr, nullptr);
+ std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
+ attributes;
+ attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
+ absl::make_unique<ServerAddressWeightAttribute>(weight);
+ list->emplace_back(addr, nullptr, std::move(attributes));
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h
index e7bf1cd..c148349 100644
--- a/src/core/ext/xds/xds_api.h
+++ b/src/core/ext/xds/xds_api.h
@@ -411,8 +411,6 @@
// Used for RING_HASH LB policy only.
uint64_t min_ring_size = 1024;
uint64_t max_ring_size = 8388608;
- enum HashFunction { XX_HASH, MURMUR_HASH_2 };
- HashFunction hash_function;
// Maximum number of outstanding requests can be made to the upstream
// cluster.
uint32_t max_concurrent_requests = 1024;
diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc
index d3def27..271b010 100644
--- a/src/core/plugin_registry/grpc_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_plugin_registry.cc
@@ -63,6 +63,8 @@
namespace grpc_core {
void FaultInjectionFilterInit(void);
void FaultInjectionFilterShutdown(void);
+void GrpcLbPolicyRingHashInit(void);
+void GrpcLbPolicyRingHashShutdown(void);
} // namespace grpc_core
#ifndef GRPC_NO_XDS
@@ -115,6 +117,8 @@
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown);
+ grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
+ grpc_core::GrpcLbPolicyRingHashShutdown);
grpc_register_plugin(grpc_resolver_dns_ares_init,
grpc_resolver_dns_ares_shutdown);
grpc_register_plugin(grpc_resolver_dns_native_init,
diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
index 5e74529..b58ab32 100644
--- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
+++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
@@ -57,6 +57,8 @@
namespace grpc_core {
void FaultInjectionFilterInit(void);
void FaultInjectionFilterShutdown(void);
+void GrpcLbPolicyRingHashInit(void);
+void GrpcLbPolicyRingHashShutdown(void);
} // namespace grpc_core
void grpc_service_config_channel_arg_filter_init(void);
void grpc_service_config_channel_arg_filter_shutdown(void);
@@ -94,6 +96,8 @@
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown);
+ grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
+ grpc_core::GrpcLbPolicyRingHashShutdown);
grpc_register_plugin(grpc_client_idle_filter_init,
grpc_client_idle_filter_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,
diff --git a/src/proto/grpc/testing/xds/v3/cluster.proto b/src/proto/grpc/testing/xds/v3/cluster.proto
index c04fe20..2c26c9b 100644
--- a/src/proto/grpc/testing/xds/v3/cluster.proto
+++ b/src/proto/grpc/testing/xds/v3/cluster.proto
@@ -153,12 +153,50 @@
// Configuration to use for EDS updates for the Cluster.
EdsClusterConfig eds_cluster_config = 3;
+ // Specific configuration for the :ref:`RingHash<arch_overview_load_balancing_types_ring_hash>`
+ // load balancing policy.
+ message RingHashLbConfig {
+ // The hash function used to hash hosts onto the ketama ring.
+ enum HashFunction {
+ // Use `xxHash <https://github.com/Cyan4973/xxHash>`_, this is the default hash function.
+ XX_HASH = 0;
+ MURMUR_HASH_2 = 1;
+ }
+
+ reserved 2;
+
+ // Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each
+ // provided host) the better the request distribution will reflect the desired weights. Defaults
+ // to 1024 entries, and limited to 8M entries. See also
+ // :ref:`maximum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.maximum_ring_size>`.
+ google.protobuf.UInt64Value minimum_ring_size = 1;
+
+ // The hash function used to hash hosts onto the ketama ring. The value defaults to
+ // :ref:`XX_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.RingHashLbConfig.HashFunction.XX_HASH>`.
+ HashFunction hash_function = 3;
+
+ // Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered
+ // to further constrain resource use. See also
+ // :ref:`minimum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.minimum_ring_size>`.
+ google.protobuf.UInt64Value maximum_ring_size = 4;
+ }
+
// The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
// when picking a host in the cluster.
LbPolicy lb_policy = 6;
CircuitBreakers circuit_breakers = 10;
+ // Optional configuration for the load balancing algorithm selected by
+ // LbPolicy. Currently only
+ // :ref:`RING_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.LbPolicy.RING_HASH>`,
+ // Specifying ring_hash_lb_config without setting the corresponding
+ // LbPolicy will generate an error at runtime.
+ oneof lb_config {
+ // Optional configuration for the Ring Hash load balancing policy.
+ RingHashLbConfig ring_hash_lb_config = 23;
+ }
+
// Optional custom transport socket implementation to use for upstream connections.
// To setup TLS, set a transport socket with name `tls` and
// :ref:`UpstreamTlsContexts <envoy_api_msg_extensions.transport_sockets.tls.v3.UpstreamTlsContext>` in the `typed_config`.
diff --git a/src/proto/grpc/testing/xds/v3/endpoint.proto b/src/proto/grpc/testing/xds/v3/endpoint.proto
index 7cc1d40..7cbea7f 100644
--- a/src/proto/grpc/testing/xds/v3/endpoint.proto
+++ b/src/proto/grpc/testing/xds/v3/endpoint.proto
@@ -76,6 +76,17 @@
// Optional health status when known and supplied by EDS server.
HealthStatus health_status = 2;
+
+ // The optional load balancing weight of the upstream host; at least 1.
+ // Envoy uses the load balancing weight in some of the built in load
+ // balancers. The load balancing weight for an endpoint is divided by the sum
+ // of the weights of all endpoints in the endpoint's locality to produce a
+ // percentage of traffic for the endpoint. This percentage is then further
+ // weighted by the endpoint's locality's load balancing weight from
+ // LocalityLbEndpoints. If unspecified, each host is presumed to have equal
+ // weight in a locality. The sum of the weights of all endpoints in the
+ // endpoint's locality must not exceed uint32_t maximal value (4294967295).
+ google.protobuf.UInt32Value load_balancing_weight = 4;
}
// A group of endpoints belonging to a Locality.
diff --git a/src/proto/grpc/testing/xds/v3/regex.proto b/src/proto/grpc/testing/xds/v3/regex.proto
index af90457..9039ed4 100644
--- a/src/proto/grpc/testing/xds/v3/regex.proto
+++ b/src/proto/grpc/testing/xds/v3/regex.proto
@@ -36,3 +36,8 @@
// The regex match string. The string must be supported by the configured engine.
string regex = 2;
}
+
+message RegexMatchAndSubstitute {
+ RegexMatcher pattern = 1;
+ string substitution = 2;
+}
diff --git a/src/proto/grpc/testing/xds/v3/route.proto b/src/proto/grpc/testing/xds/v3/route.proto
index baeaaf6..89260f6 100644
--- a/src/proto/grpc/testing/xds/v3/route.proto
+++ b/src/proto/grpc/testing/xds/v3/route.proto
@@ -246,6 +246,78 @@
// for additional documentation.
WeightedCluster weighted_clusters = 3;
}
+
+ message HashPolicy {
+ message Header {
+ // The name of the request header that will be used to obtain the hash
+ // key. If the request header is not present, no hash will be produced.
+ string header_name = 1;
+
+ // If specified, the request header value will be rewritten and used
+ // to produce the hash key.
+ type.matcher.v3.RegexMatchAndSubstitute regex_rewrite = 2;
+ }
+
+ message Cookie {
+ string name = 1;
+ }
+
+ message ConnectionProperties {
+ bool source_ip = 1;
+ }
+
+ message QueryParameter {
+ string name = 1;
+ }
+
+ message FilterState {
+ // The name of the Object in the per-request filterState, which is an
+ // Envoy::Http::Hashable object. If there is no data associated with the key,
+ // or the stored object is not Envoy::Http::Hashable, no hash will be produced.
+ string key = 1;
+ }
+
+ oneof policy_specifier {
+ // Header hash policy.
+ Header header = 1;
+
+ // Cookie hash policy.
+ Cookie cookie = 2;
+
+ // Connection properties hash policy.
+ ConnectionProperties connection_properties = 3;
+
+ // Query parameter hash policy.
+ QueryParameter query_parameter = 5;
+
+ // Filter state hash policy.
+ FilterState filter_state = 6;
+ }
+
+ // The flag that short-circuits the hash computing. This field provides a
+ // 'fallback' style of configuration: "if a terminal policy doesn't work,
+ // fallback to rest of the policy list", it saves time when the terminal
+ // policy works.
+ //
+ // If true, and there is already a hash computed, ignore rest of the
+ // list of hash polices.
+ // For example, if the following hash methods are configured:
+ //
+ // ========= ========
+ // specifier terminal
+ // ========= ========
+ // Header A true
+ // Header B false
+ // Header C false
+ // ========= ========
+ //
+ // The generateHash process ends if policy "header A" generates a hash, as
+ // it's a terminal policy.
+ bool terminal = 4;
+ }
+
+ repeated HashPolicy hash_policy = 15;
+
// Specifies the maximum stream duration for this route.
MaxStreamDuration max_stream_duration = 36;
}
diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc
index 2a80c54..0257778 100644
--- a/test/cpp/end2end/xds_end2end_test.cc
+++ b/test/cpp/end2end/xds_end2end_test.cc
@@ -32,6 +32,7 @@
#include "absl/functional/bind_front.h"
#include "absl/memory/memory.h"
#include "absl/strings/str_cat.h"
+#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/types/optional.h"
@@ -2095,6 +2096,14 @@
return addresses;
}
+ std::string CreateMetadataValueThatHashesToBackendPort(int port) {
+ return absl::StrCat(ipv6_only_ ? "::1" : "127.0.0.1", ":", port, "_0");
+ }
+
+ std::string CreateMetadataValueThatHashesToBackend(int index) {
+ return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
+ }
+
void SetNextResolution(
const std::vector<int>& ports,
grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) {
@@ -2236,6 +2245,13 @@
return listener;
}
+ AdsServiceImpl::EdsResourceArgs::Endpoint CreateEndpoint(
+ size_t backend_idx, HealthStatus health_status = HealthStatus::UNKNOWN,
+ int lb_weight = 1) {
+ return AdsServiceImpl::EdsResourceArgs::Endpoint(
+ backends_[backend_idx]->port(), health_status, lb_weight);
+ }
+
std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint>
CreateEndpointsForBackends(size_t start_index = 0, size_t stop_index = 0,
HealthStatus health_status = HealthStatus::UNKNOWN,
@@ -2243,7 +2259,7 @@
if (stop_index == 0) stop_index = backends_.size();
std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
for (size_t i = start_index; i < stop_index; ++i) {
- endpoints.emplace_back(backends_[i]->port(), health_status, lb_weight);
+ endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight));
}
return endpoints;
}
@@ -2272,6 +2288,11 @@
locality.endpoints[i].health_status != HealthStatus::UNKNOWN) {
lb_endpoints->set_health_status(locality.endpoints[i].health_status);
}
+ if (locality.endpoints.size() > i &&
+ locality.endpoints[i].lb_weight >= 1) {
+ lb_endpoints->mutable_load_balancing_weight()->set_value(
+ locality.endpoints[i].lb_weight);
+ }
auto* endpoint = lb_endpoints->mutable_endpoint();
auto* address = endpoint->mutable_address();
auto* socket_address = address->mutable_socket_address();
@@ -2766,8 +2787,8 @@
}
}
-// Tests that subchannel sharing works when the same backend is listed multiple
-// times.
+// Tests that subchannel sharing works when the same backend is listed
+// multiple times.
TEST_P(BasicTest, SameBackendListedMultipleTimes) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -2852,8 +2873,8 @@
EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
}
-// Tests that RPCs fail when the backends are down, and will succeed again after
-// the backends are restarted.
+// Tests that RPCs fail when the backends are down, and will succeed again
+// after the backends are restarted.
TEST_P(BasicTest, BackendsRestart) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -2867,13 +2888,13 @@
ShutdownAllBackends();
// Sending multiple failed requests instead of just one to ensure that the
// client notices that all backends are down before we restart them. If we
- // didn't do this, then a single RPC could fail here due to the race condition
- // between the LB pick and the GOAWAY from the chosen backend being shut down,
- // which would not actually prove that the client noticed that all of the
- // backends are down. Then, when we send another request below (which we
- // expect to succeed), if the callbacks happen in the wrong order, the same
- // race condition could happen again due to the client not yet having noticed
- // that the backends were all down.
+ // didn't do this, then a single RPC could fail here due to the race
+ // condition between the LB pick and the GOAWAY from the chosen backend
+ // being shut down, which would not actually prove that the client noticed
+ // that all of the backends are down. Then, when we send another request
+ // below (which we expect to succeed), if the callbacks happen in the wrong
+ // order, the same race condition could happen again due to the client not
+ // yet having noticed that the backends were all down.
CheckRpcSendFailure(num_backends_);
// Restart all backends. RPCs should start succeeding again.
StartAllBackends();
@@ -3501,7 +3522,8 @@
}
// Tests that LDS client should send a NACK if the rds message in the
-// http_connection_manager has a config_source field that does not specify ADS.
+// http_connection_manager has a config_source field that does not specify
+// ADS.
TEST_P(LdsTest, RdsConfigSourceDoesNotSpecifyAds) {
auto listener = default_listener_;
HttpConnectionManager http_connection_manager;
@@ -3519,10 +3541,9 @@
const auto response_state =
balancers_[0]->ads_service()->lds_response_state();
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
- EXPECT_THAT(
- response_state.error_message,
- ::testing::HasSubstr(
- "HttpConnectionManager ConfigSource for RDS does not specify ADS."));
+ EXPECT_THAT(response_state.error_message,
+ ::testing::HasSubstr("HttpConnectionManager ConfigSource for "
+ "RDS does not specify ADS."));
}
// Tests that we ignore filters after the router filter.
@@ -3893,8 +3914,8 @@
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED);
}
-// Tests that LDS client should choose the virtual host with matching domain if
-// multiple virtual hosts exist in the LDS response.
+// Tests that LDS client should choose the virtual host with matching domain
+// if multiple virtual hosts exist in the LDS response.
TEST_P(LdsRdsTest, ChooseMatchedDomain) {
RouteConfiguration route_config = default_route_config_;
*(route_config.add_virtual_hosts()) = route_config.virtual_hosts(0);
@@ -4233,10 +4254,9 @@
CheckRpcSendFailure();
const auto response_state = RouteConfigurationResponseState(0);
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
- EXPECT_THAT(
- response_state.error_message,
- ::testing::HasSubstr(
- "RouteAction weighted_cluster cluster contains empty cluster name."));
+ EXPECT_THAT(response_state.error_message,
+ ::testing::HasSubstr("RouteAction weighted_cluster cluster "
+ "contains empty cluster name."));
}
TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasNoWeight) {
@@ -4992,8 +5012,8 @@
// Change Route Configurations: same clusters different weights.
weighted_cluster1->mutable_weight()->set_value(kWeight50);
weighted_cluster2->mutable_weight()->set_value(kWeight50);
- // Change default route to a new cluster to help to identify when new polices
- // are seen by the client.
+ // Change default route to a new cluster to help to identify when new
+ // polices are seen by the client.
default_route->mutable_route()->set_cluster(kNewCluster3Name);
SetRouteConfiguration(0, new_route_config);
ResetBackendCounters();
@@ -5258,8 +5278,8 @@
new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
default_route->mutable_route()->set_cluster(kNewClusterName);
SetRouteConfiguration(0, new_route_config);
- // Wait for RPCs to go to the new backend: 1, this ensures that the client has
- // processed the update.
+ // Wait for RPCs to go to the new backend: 1, this ensures that the client
+ // has processed the update.
WaitForBackend(
1, WaitForBackendOptions().set_reset_counters(false).set_allow_failures(
true));
@@ -6648,8 +6668,8 @@
::testing::HasSubstr("DiscoveryType is not valid."));
}
-// Tests that CDS client should send a NACK if the cluster type in CDS response
-// is unsupported.
+// Tests that CDS client should send a NACK if the cluster type in CDS
+// response is unsupported.
TEST_P(CdsTest, UnsupportedClusterType) {
auto cluster = default_cluster_;
cluster.set_type(Cluster::STATIC);
@@ -6695,8 +6715,8 @@
kClusterName2, ": DiscoveryType is not valid."))));
}
-// Tests that CDS client should send a NACK if the eds_config in CDS response is
-// other than ADS.
+// Tests that CDS client should send a NACK if the eds_config in CDS response
+// is other than ADS.
TEST_P(CdsTest, WrongEdsConfig) {
auto cluster = default_cluster_;
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
@@ -6711,8 +6731,8 @@
::testing::HasSubstr("EDS ConfigSource is not ADS."));
}
-// Tests that CDS client should send a NACK if the lb_policy in CDS response is
-// other than ROUND_ROBIN.
+// Tests that CDS client should send a NACK if the lb_policy in CDS response
+// is other than ROUND_ROBIN.
TEST_P(CdsTest, WrongLbPolicy) {
auto cluster = default_cluster_;
cluster.set_lb_policy(Cluster::LEAST_REQUEST);
@@ -6727,8 +6747,8 @@
::testing::HasSubstr("LB policy is not supported."));
}
-// Tests that CDS client should send a NACK if the lrs_server in CDS response is
-// other than SELF.
+// Tests that CDS client should send a NACK if the lrs_server in CDS response
+// is other than SELF.
TEST_P(CdsTest, WrongLrsServer) {
auto cluster = default_cluster_;
cluster.mutable_lrs_server()->mutable_ads();
@@ -6743,6 +6763,801 @@
::testing::HasSubstr("LRS ConfigSource is not self."));
}
+// Tests that ring hash policy that hashes using channel id ensures all RPCs
+// to go 1 particular backend.
+TEST_P(CdsTest, RingHashChannelIdHashing) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendOk(100);
+ bool found = false;
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->backend_service()->request_count() > 0) {
+ EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+ << "backend " << i;
+ EXPECT_FALSE(found) << "backend " << i;
+ found = true;
+ }
+ }
+ EXPECT_TRUE(found);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a header value can spread
+// RPCs across all the backends.
+TEST_P(CdsTest, RingHashHeaderHashing) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ // Note each type of RPC will contains a header value that will always be
+ // hashed to a specific backend as the header value matches the value used
+ // to create the entry in the ring.
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ std::vector<std::pair<std::string, std::string>> metadata1 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
+ std::vector<std::pair<std::string, std::string>> metadata2 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
+ std::vector<std::pair<std::string, std::string>> metadata3 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
+ const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
+ const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
+ WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+ WaitForBackend(1, WaitForBackendOptions(), rpc_options1);
+ WaitForBackend(2, WaitForBackendOptions(), rpc_options2);
+ WaitForBackend(3, WaitForBackendOptions(), rpc_options3);
+ CheckRpcSendOk(100, rpc_options);
+ CheckRpcSendOk(100, rpc_options1);
+ CheckRpcSendOk(100, rpc_options2);
+ CheckRpcSendOk(100, rpc_options3);
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
+ }
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a header value and regex
+// rewrite to aggregate RPCs to 1 backend.
+TEST_P(CdsTest, RingHashHeaderHashingWithRegexRewrite) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ hash_policy->mutable_header()
+ ->mutable_regex_rewrite()
+ ->mutable_pattern()
+ ->set_regex("[0-9]+");
+ hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
+ "foo");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ std::vector<std::pair<std::string, std::string>> metadata1 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
+ std::vector<std::pair<std::string, std::string>> metadata2 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
+ std::vector<std::pair<std::string, std::string>> metadata3 = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1));
+ const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2));
+ const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3));
+ CheckRpcSendOk(100, rpc_options);
+ CheckRpcSendOk(100, rpc_options1);
+ CheckRpcSendOk(100, rpc_options2);
+ CheckRpcSendOk(100, rpc_options3);
+ bool found = false;
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->backend_service()->request_count() > 0) {
+ EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
+ << "backend " << i;
+ EXPECT_FALSE(found) << "backend " << i;
+ found = true;
+ }
+ }
+ EXPECT_TRUE(found);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value.
+TEST_P(CdsTest, RingHashNoHashPolicy) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const double kDistribution50Percent = 0.5;
+ const double kErrorTolerance = 0.05;
+ const uint32_t kRpcTimeoutMs = 10000;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+ auto cluster = default_cluster_;
+ // Increasing min ring size for random distribution.
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 100000);
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0", CreateEndpointsForBackends(0, 2)}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ // TODO(donnadionne): remove extended timeout after ring creation
+ // optimization.
+ WaitForAllBackends(0, 2, WaitForBackendOptions(),
+ RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+ CheckRpcSendOk(kNumRpcs);
+ const int request_count_1 = backends_[0]->backend_service()->request_count();
+ const int request_count_2 = backends_[1]->backend_service()->request_count();
+ EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that ring hash policy evaluation will continue past the terminal
+// policy if no results are produced yet.
+TEST_P(CdsTest, RingHashContinuesPastTerminalPolicyThatDoesNotProduceResult) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("header_not_present");
+ hash_policy->set_terminal(true);
+ auto* hash_policy2 = route->mutable_route()->add_hash_policy();
+ hash_policy2->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0", CreateEndpointsForBackends(0, 2)}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ CheckRpcSendOk(100, rpc_options);
+ EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
+ EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test random hash is used when header hashing specified a header field that
+// the RPC did not have.
+TEST_P(CdsTest, RingHashOnHeaderThatIsNotPresent) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const double kDistribution50Percent = 0.5;
+ const double kErrorTolerance = 0.05;
+ const uint32_t kRpcTimeoutMs = 10000;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+ auto cluster = default_cluster_;
+ // Increasing min ring size for random distribution.
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 100000);
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("header_not_present");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0", CreateEndpointsForBackends(0, 2)}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
+ };
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ // TODO(donnadionne): remove extended timeout after ring creation
+ // optimization.
+ WaitForAllBackends(0, 2, WaitForBackendOptions(),
+ RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+ CheckRpcSendOk(kNumRpcs, rpc_options);
+ const int request_count_1 = backends_[0]->backend_service()->request_count();
+ const int request_count_2 = backends_[1]->backend_service()->request_count();
+ EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test random hash is used when only unsupported hash policies are
+// configured.
+TEST_P(CdsTest, RingHashUnsupportedHashPolicyDefaultToRandomHashing) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const double kDistribution50Percent = 0.5;
+ const double kErrorTolerance = 0.05;
+ const uint32_t kRpcTimeoutMs = 10000;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
+ auto cluster = default_cluster_;
+ // Increasing min ring size for random distribution.
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 100000);
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
+ auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
+ true);
+ auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_3->mutable_query_parameter()->set_name(
+ "query_parameter");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0", CreateEndpointsForBackends(0, 2)}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ // TODO(donnadionne): remove extended timeout after ring creation
+ // optimization.
+ WaitForAllBackends(0, 2, WaitForBackendOptions(),
+ RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+ CheckRpcSendOk(kNumRpcs);
+ const int request_count_1 = backends_[0]->backend_service()->request_count();
+ const int request_count_2 = backends_[1]->backend_service()->request_count();
+ EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
+ ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value can spread
+// RPCs across all the backends according to locality weight.
+TEST_P(CdsTest, RingHashRandomHashingDistributionAccordingToEndpointWeight) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const size_t kWeight1 = 1;
+ const size_t kWeight2 = 2;
+ const size_t kWeightTotal = kWeight1 + kWeight2;
+ const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
+ const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
+ const double kErrorTolerance = 0.05;
+ const uint32_t kRpcTimeoutMs = 10000;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
+ auto cluster = default_cluster_;
+ // Increasing min ring size for random distribution.
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 100000);
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0",
+ {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
+ CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ // TODO(donnadionne): remove extended timeout after ring creation
+ // optimization.
+ WaitForAllBackends(0, 2, WaitForBackendOptions(),
+ RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+ CheckRpcSendOk(kNumRpcs);
+ const int weight_33_request_count =
+ backends_[0]->backend_service()->request_count();
+ const int weight_66_request_count =
+ backends_[1]->backend_service()->request_count();
+ EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
+ ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
+ EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
+ ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests that ring hash policy that hashes using a random value can spread
+// RPCs across all the backends according to locality weight.
+TEST_P(CdsTest,
+ RingHashRandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const size_t kWeight1 = 1 * 1;
+ const size_t kWeight2 = 2 * 2;
+ const size_t kWeightTotal = kWeight1 + kWeight2;
+ const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
+ const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
+ const double kErrorTolerance = 0.05;
+ const uint32_t kRpcTimeoutMs = 10000;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
+ auto cluster = default_cluster_;
+ // Increasing min ring size for random distribution.
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 100000);
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ AdsServiceImpl::EdsResourceArgs args(
+ {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
+ {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ // TODO(donnadionne): remove extended timeout after ring creation
+ // optimization.
+ WaitForAllBackends(0, 2, WaitForBackendOptions(),
+ RpcOptions().set_timeout_ms(kRpcTimeoutMs));
+ CheckRpcSendOk(kNumRpcs);
+ const int weight_20_request_count =
+ backends_[0]->backend_service()->request_count();
+ const int weight_80_request_count =
+ backends_[1]->backend_service()->request_count();
+ EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
+ ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
+ EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
+ ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Tests round robin is not implacted by the endpoint weight, and that the
+// localities in a locality map are picked according to their weights.
+TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) {
+ SetNextResolution({});
+ SetNextResolutionForLbChannelAllBalancers();
+ const int kLocalityWeight0 = 2;
+ const int kLocalityWeight1 = 8;
+ const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
+ const double kLocalityWeightRate0 =
+ static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
+ const double kLocalityWeightRate1 =
+ static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
+ const double kErrorTolerance = 0.05;
+ const size_t kNumRpcs =
+ ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
+ // ADS response contains 2 localities, each of which contains 1 backend.
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0",
+ {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
+ kLocalityWeight0},
+ {"locality1",
+ {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
+ kLocalityWeight1},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ // Wait for both backends to be ready.
+ WaitForAllBackends(0, 2);
+ // Send kNumRpcs RPCs.
+ CheckRpcSendOk(kNumRpcs);
+ // The locality picking rates should be roughly equal to the expectation.
+ const double locality_picked_rate_0 =
+ static_cast<double>(backends_[0]->backend_service()->request_count()) /
+ kNumRpcs;
+ const double locality_picked_rate_1 =
+ static_cast<double>(backends_[1]->backend_service()->request_count()) /
+ kNumRpcs;
+ EXPECT_THAT(locality_picked_rate_0,
+ ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
+ EXPECT_THAT(locality_picked_rate_1,
+ ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
+}
+
+// Tests that ring hash policy that hashes using a fixed string ensures all
+// RPCs to go 1 particular backend; and that subsequent hashing policies are
+// ignored due to the setting of terminal.
+TEST_P(CdsTest, RingHashFixedHashingTerminalPolicy) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("fixed_string");
+ hash_policy->set_terminal(true);
+ auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
+ hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"fixed_string", "fixed_value"},
+ {"random_string", absl::StrFormat("%" PRIu32, rand())},
+ };
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ CheckRpcSendOk(100, rpc_options);
+ bool found = false;
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->backend_service()->request_count() > 0) {
+ EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+ << "backend " << i;
+ EXPECT_FALSE(found) << "backend " << i;
+ found = true;
+ }
+ }
+ EXPECT_TRUE(found);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that the channel will go from idle to ready via connecting;
+// (tho it is not possible to catch the connecting state before moving to
+// ready)
+TEST_P(CdsTest, RingHashIdleToReady) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+ CheckRpcSendOk();
+ EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when the first pick is down leading to a transient failure, we
+// will move on to the next ring hash entry.
+TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+ const int unused_port = grpc_pick_unused_port_or_die();
+ endpoints.emplace_back(unused_port);
+ endpoints.emplace_back(backends_[1]->port());
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", std::move(endpoints)},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash",
+ CreateMetadataValueThatHashesToBackendPort(unused_port)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ WaitForBackend(1, WaitForBackendOptions(), rpc_options);
+ CheckRpcSendOk(100, rpc_options);
+ EXPECT_EQ(0, backends_[0]->backend_service()->request_count());
+ EXPECT_EQ(100, backends_[1]->backend_service()->request_count());
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when a backend goes down, we will move on to the next subchannel
+// (with a lower priority). When the backend comes back up, traffic will move
+// back.
+TEST_P(CdsTest, RingHashSwitchToLowerPrioirtyAndThenBack) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
+ 0},
+ {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
+ 1},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+ ShutdownBackend(0);
+ WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true),
+ rpc_options);
+ StartBackend(0);
+ WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+ CheckRpcSendOk(100, rpc_options);
+ EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
+ EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when all backends are down, we will keep reattempting.
+TEST_P(CdsTest, RingHashAllFailReattempt) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const uint32_t kConnectionTimeoutMilliseconds = 5000;
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+ endpoints.emplace_back(grpc_pick_unused_port_or_die());
+ endpoints.emplace_back(backends_[1]->port());
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", std::move(endpoints)},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+ ShutdownBackend(1);
+ CheckRpcSendFailure(1, rpc_options);
+ StartBackend(1);
+ // Ensure we are actively connecting without any traffic.
+ EXPECT_TRUE(channel_->WaitForConnected(
+ grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test that when all backends are down and then up, we may pick a TF backend
+// and we will then jump to ready backend.
+TEST_P(CdsTest, RingHashTransientFailureSkipToAvailableReady) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ const uint32_t kConnectionTimeoutMilliseconds = 5000;
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_header()->set_header_name("address_hash");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ std::vector<AdsServiceImpl::EdsResourceArgs::Endpoint> endpoints;
+ // Make sure we include some unused ports to fill the ring.
+ endpoints.emplace_back(backends_[0]->port());
+ endpoints.emplace_back(backends_[1]->port());
+ endpoints.emplace_back(grpc_pick_unused_port_or_die());
+ endpoints.emplace_back(grpc_pick_unused_port_or_die());
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", std::move(endpoints)},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ std::vector<std::pair<std::string, std::string>> metadata = {
+ {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
+ const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
+ EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
+ ShutdownBackend(0);
+ ShutdownBackend(1);
+ CheckRpcSendFailure(1, rpc_options);
+ EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
+ // Bring up 0, should be picked as the RPC is hashed to it.
+ StartBackend(0);
+ EXPECT_TRUE(channel_->WaitForConnected(
+ grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+ WaitForBackend(0, WaitForBackendOptions(), rpc_options);
+ // Bring down 0 and bring up 1.
+ // Note the RPC contains a header value that will always be hashed to
+ // backend 0. So by purposely bring down backend 0 and bring up another
+ // backend, this will ensure Picker's first choice of backend 0 will fail
+ // and it will
+ // 1. reattempt backend 0 and
+ // 2. go through the remaining subchannels to find one in READY.
+ // Since the the entries in the ring is pretty distributed and we have
+ // unused ports to fill the ring, it is almost guaranteed that the Picker
+ // will go through some non-READY entries and skip them as per design.
+ ShutdownBackend(0);
+ CheckRpcSendFailure(1, rpc_options);
+ StartBackend(1);
+ EXPECT_TRUE(channel_->WaitForConnected(
+ grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
+ WaitForBackend(1, WaitForBackendOptions(), rpc_options);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test unspported hash policy types are all ignored before a supported
+// policy.
+TEST_P(CdsTest, RingHashUnsupportedHashPolicyUntilChannelIdHashing) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
+ auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
+ true);
+ auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
+ hash_policy_unsupported_3->mutable_query_parameter()->set_name(
+ "query_parameter");
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendOk(100);
+ bool found = false;
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (backends_[i]->backend_service()->request_count() > 0) {
+ EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
+ << "backend " << i;
+ EXPECT_FALSE(found) << "backend " << i;
+ found = true;
+ }
+ }
+ EXPECT_TRUE(found);
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid hash function (something
+// other than XX_HASH.
+TEST_P(CdsTest, RingHashPolicyHasInvalidHashFunction) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ cluster.mutable_ring_hash_lb_config()->set_hash_function(
+ Cluster::RingHashLbConfig::MURMUR_HASH_2);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendFailure();
+ const auto response_state =
+ balancers_[0]->ads_service()->cds_response_state();
+ EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+ EXPECT_THAT(
+ response_state.error_message,
+ ::testing::HasSubstr("ring hash lb config has invalid hash function."));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidMinimumRingSize) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 0);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendFailure();
+ const auto response_state =
+ balancers_[0]->ads_service()->cds_response_state();
+ EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+ EXPECT_THAT(response_state.error_message,
+ ::testing::HasSubstr(
+ "min_ring_size is not in the range of 1 to 8388608."));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidMaxmumRingSize) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
+ 8388609);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendFailure();
+ const auto response_state =
+ balancers_[0]->ads_service()->cds_response_state();
+ EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+ EXPECT_THAT(response_state.error_message,
+ ::testing::HasSubstr(
+ "max_ring_size is not in the range of 1 to 8388608."));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
+// Test we nack when ring hash policy has invalid ring size.
+TEST_P(CdsTest, RingHashPolicyHasInvalidRingSizeMinGreaterThanMax) {
+ gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true");
+ auto cluster = default_cluster_;
+ cluster.set_lb_policy(Cluster::RING_HASH);
+ cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value(
+ 5000);
+ cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
+ 5001);
+ balancers_[0]->ads_service()->SetCdsResource(cluster);
+ auto new_route_config = default_route_config_;
+ auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
+ auto* hash_policy = route->mutable_route()->add_hash_policy();
+ hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
+ SetListenerAndRouteConfiguration(0, default_listener_, new_route_config);
+ AdsServiceImpl::EdsResourceArgs args({
+ {"locality0", CreateEndpointsForBackends()},
+ });
+ balancers_[0]->ads_service()->SetEdsResource(
+ BuildEdsResource(args, DefaultEdsServiceName()));
+ SetNextResolutionForLbChannelAllBalancers();
+ CheckRpcSendFailure();
+ const auto response_state =
+ balancers_[0]->ads_service()->cds_response_state();
+ EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED);
+ EXPECT_THAT(response_state.error_message,
+ ::testing::HasSubstr(
+ "min_ring_size cannot be greater than max_ring_size."));
+ gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH");
+}
+
class XdsSecurityTest : public BasicTest {
protected:
static void SetUpTestCase() {
@@ -6760,8 +7575,8 @@
root_cert_ = ReadFile(kCaCertPath);
bad_root_cert_ = ReadFile(kBadClientCertPath);
identity_pair_ = ReadTlsIdentityPair(kClientKeyPath, kClientCertPath);
- // TODO(yashykt): Use different client certs here instead of reusing server
- // certs after https://github.com/grpc/grpc/pull/24876 is merged
+ // TODO(yashykt): Use different client certs here instead of reusing
+ // server certs after https://github.com/grpc/grpc/pull/24876 is merged
fallback_identity_pair_ =
ReadTlsIdentityPair(kServerKeyPath, kServerCertPath);
bad_identity_pair_ =
@@ -7562,7 +8377,8 @@
EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED);
}
-// Verify that a mismatch of listening address results in "not serving" status.
+// Verify that a mismatch of listening address results in "not serving"
+// status.
TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) {
Listener listener;
listener.set_name(
@@ -8421,8 +9237,8 @@
HttpConnectionManager());
filter_chain->mutable_filter_chain_match()->add_application_protocols("h2");
balancers_[0]->ads_service()->SetLdsResource(listener);
- // A successful RPC proves that filter chains that mention "raw_buffer" as the
- // transport protocol are chosen as the best match in the round.
+ // A successful RPC proves that filter chains that mention "raw_buffer" as
+ // the transport protocol are chosen as the best match in the round.
SendRpc([this]() { return CreateInsecureChannel(); }, {}, {});
}
@@ -8449,8 +9265,8 @@
prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1");
prefix_range->mutable_prefix_len()->set_value(16);
filter_chain->mutable_filter_chain_match()->add_server_names("server_name");
- // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is the
- // highest match, it should be chosen.
+ // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is
+ // the highest match, it should be chosen.
filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
HttpConnectionManager());
@@ -8462,7 +9278,8 @@
filter_chain->mutable_filter_chain_match()->add_prefix_ranges();
prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1");
prefix_range->mutable_prefix_len()->set_value(24);
- // Add another filter chain with a non-matching prefix range (with length 30)
+ // Add another filter chain with a non-matching prefix range (with length
+ // 30)
filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
HttpConnectionManager());
@@ -8528,10 +9345,10 @@
auto* socket_address = listener.mutable_address()->mutable_socket_address();
socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1");
socket_address->set_port_value(backends_[0]->port());
- // Add filter chain with source prefix range (length 16) but with a bad source
- // port mentioned. (Prefix range is matched first.)
- // Note that backends_[0]->port() will never be a match for the source port
- // because it is already being used by a backend.
+ // Add filter chain with source prefix range (length 16) but with a bad
+ // source port mentioned. (Prefix range is matched first.) Note that
+ // backends_[0]->port() will never be a match for the source port because it
+ // is already being used by a backend.
auto* filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
HttpConnectionManager());
@@ -8545,8 +9362,8 @@
source_prefix_range->mutable_prefix_len()->set_value(16);
filter_chain->mutable_filter_chain_match()->add_source_ports(
backends_[0]->port());
- // Add filter chain with two source prefix ranges (length 8 and 24). Since 24
- // is the highest match, it should be chosen.
+ // Add filter chain with two source prefix ranges (length 8 and 24). Since
+ // 24 is the highest match, it should be chosen.
filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
HttpConnectionManager());
@@ -8716,8 +9533,8 @@
HttpConnectionManager());
filter_chain->mutable_filter_chain_match()->set_transport_protocol(
"raw_buffer");
- // Add a duplicate filter chain with the same "raw_buffer" transport protocol
- // entry
+ // Add a duplicate filter chain with the same "raw_buffer" transport
+ // protocol entry
filter_chain = listener.add_filter_chains();
filter_chain->add_filters()->mutable_typed_config()->PackFrom(
HttpConnectionManager());
@@ -9083,8 +9900,8 @@
delayed_resource_setter.join();
}
-// Tests that the localities in a locality map are picked correctly after update
-// (addition, modification, deletion).
+// Tests that the localities in a locality map are picked correctly after
+// update (addition, modification, deletion).
TEST_P(LocalityMapTest, UpdateMap) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -9148,8 +9965,8 @@
BuildEdsResource(args, DefaultEdsServiceName()));
// Backend 3 hasn't received any request.
EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
- // Wait until the locality update has been processed, as signaled by backend 3
- // receiving a request.
+ // Wait until the locality update has been processed, as signaled by backend
+ // 3 receiving a request.
WaitForAllBackends(3, 4);
gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH ==========");
// Send kNumRpcs RPCs.
@@ -9268,8 +10085,8 @@
EXPECT_EQ(0, std::get<1>(counts));
}
-// If the higher priority localities are not reachable, failover to the highest
-// priority among the rest.
+// If the higher priority localities are not reachable, failover to the
+// highest priority among the rest.
TEST_P(FailoverTest, Failover) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -9372,8 +10189,8 @@
delayed_resource_setter.join();
}
-// Tests that after the localities' priorities are updated, we still choose the
-// highest READY priority with the updated localities.
+// Tests that after the localities' priorities are updated, we still choose
+// the highest READY priority with the updated localities.
TEST_P(FailoverTest, UpdatePriority) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -9624,8 +10441,8 @@
{kThrottleDropType, kDropPerMillionForThrottle}};
balancers_[0]->ads_service()->SetEdsResource(
BuildEdsResource(args, DefaultEdsServiceName()));
- // Wait until the drop rate increases to the middle of the two configs, which
- // implies that the update has been in effect.
+ // Wait until the drop rate increases to the middle of the two configs,
+ // which implies that the update has been in effect.
const double kDropRateThreshold =
(kDropRateForLb + kDropRateForLbAndThrottle) / 2;
size_t num_rpcs = kNumRpcsBoth;
@@ -9693,8 +10510,8 @@
BalancerUpdateTest() : XdsEnd2endTest(4, 3) {}
};
-// Tests that the old LB call is still used after the balancer address update as
-// long as that call is still alive.
+// Tests that the old LB call is still used after the balancer address update
+// as long as that call is still alive.
TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -9752,10 +10569,10 @@
}
// Tests that the old LB call is still used after multiple balancer address
-// updates as long as that call is still alive. Send an update with the same set
-// of LBs as the one in SetUp() in order to verify that the LB channel inside
-// xds keeps the initial connection (which by definition is also present in the
-// update).
+// updates as long as that call is still alive. Send an update with the same
+// set of LBs as the one in SetUp() in order to verify that the LB channel
+// inside xds keeps the initial connection (which by definition is also
+// present in the update).
TEST_P(BalancerUpdateTest, Repeated) {
SetNextResolution({});
SetNextResolutionForLbChannelAllBalancers();
@@ -10187,9 +11004,9 @@
public:
FaultInjectionTest() : XdsEnd2endTest(1, 1) {}
- // Builds a Listener with Fault Injection filter config. If the http_fault is
- // nullptr, then assign an empty filter config. This filter config is required
- // to enable the fault injection features.
+ // Builds a Listener with Fault Injection filter config. If the http_fault
+ // is nullptr, then assign an empty filter config. This filter config is
+ // required to enable the fault injection features.
static Listener BuildListenerWithFaultInjection(
const HTTPFault& http_fault = HTTPFault()) {
HttpConnectionManager http_connection_manager;
@@ -10501,9 +11318,9 @@
::testing::DoubleNear(kAbortRate, kErrorTolerance));
}
-// This test and the above test apply different denominators to delay and abort.
-// This ensures that we are using the right denominator for each injected fault
-// in our code.
+// This test and the above test apply different denominators to delay and
+// abort. This ensures that we are using the right denominator for each
+// injected fault in our code.
TEST_P(FaultInjectionTest,
XdsFaultInjectionAlwaysDelayPercentageAbortSwitchDenominator) {
const uint32_t kAbortPercentagePerMillion = 500000;