blob: ad001d87b1200280d5be91684f13aceed8e47846 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include <string.h>
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/static_metadata.h"
namespace grpc_core {
const char* kRequestRingHashAttribute = "request_ring_hash";
TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
// Helper Parser method
void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
size_t* max_ring_size,
std::vector<grpc_error_handle>* error_list) {
*min_ring_size = 1024;
*max_ring_size = 8388608;
if (json.type() != Json::Type::OBJECT) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"ring_hash_experimental should be of type object"));
return;
}
const Json::Object& ring_hash = json.object_value();
auto ring_hash_it = ring_hash.find("min_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size error: should be of type number"));
} else {
*min_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
}
ring_hash_it = ring_hash.find("max_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size error: should be of type number"));
} else {
*max_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
}
if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
*max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size and or min_ring_size error: "
"values need to be in the range of 1 to 8388608 "
"and max_ring_size cannot be smaller than "
"min_ring_size"));
}
}
namespace {
constexpr char kRingHash[] = "ring_hash_experimental";
class RingHashLbConfig : public LoadBalancingPolicy::Config {
public:
RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
: min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
const char* name() const override { return kRingHash; }
size_t min_ring_size() const { return min_ring_size_; }
size_t max_ring_size() const { return max_ring_size_; }
private:
size_t min_ring_size_;
size_t max_ring_size_;
};
//
// ring_hash LB policy
//
class RingHash : public LoadBalancingPolicy {
public:
explicit RingHash(Args args);
const char* name() const override { return kRingHash; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
~RingHash() override;
// Forward declaration.
class RingHashSubchannelList;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RingHashSubchannelData
: public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
public:
RingHashSubchannelData(
SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
subchannel_list,
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)),
address_(address) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
}
const ServerAddress& address() const { return address_; }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
// Performs connectivity state updates that need to be done both when we
// first start watching and when a watcher notification is received.
void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
ServerAddress address_;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
bool seen_failure_since_ready_ = false;
};
// A list of subchannels.
class RingHashSubchannelList
: public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
public:
RingHashSubchannelList(RingHash* policy, TraceFlag* tracer,
ServerAddressList addresses,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, std::move(addresses),
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
}
~RingHashSubchannelList() override {
RingHash* p = static_cast<RingHash*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
// Starts watching the subchannels in this list.
void StartWatchingLocked();
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state);
// Updates the RH policy's connectivity state based on the
// subchannel list's state counters, creating new picker and new ring.
// Furthermore, return a bool indicating whether the aggregated state is
// Transient Failure.
bool UpdateRingHashConnectivityStateLocked();
private:
size_t num_idle_ = 0;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list);
PickResult Pick(PickArgs args) override;
private:
struct RingEntry {
uint64_t hash;
RefCountedPtr<SubchannelInterface> subchannel;
grpc_connectivity_state connectivity_state;
};
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHash> ring_hash_lb)
: ring_hash_lb_(std::move(ring_hash_lb)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Orphan() override {
// Hop into ExecCtx, so that we're not holding the data plane mutex
// while we run control-plane code.
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
private:
static void RunInExecCtx(void* arg, grpc_error* /*error*/) {
auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb_->work_serializer()->Run(
[self]() {
if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) {
subchannel->AttemptToConnect();
}
}
delete self;
},
DEBUG_LOCATION);
}
RefCountedPtr<RingHash> ring_hash_lb_;
grpc_closure closure_;
absl::InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
};
RefCountedPtr<RingHash> parent_;
// A ring of subchannels.
std::vector<RingEntry> ring_;
};
void ShutdownLocked() override;
// Current config from resolver.
RefCountedPtr<RingHashLbConfig> config_;
// list of subchannels.
OrphanablePtr<RingHashSubchannelList> subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
};
//
// RingHash::Picker
//
RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list)
: parent_(std::move(parent)) {
size_t num_subchannels = subchannel_list->num_subchannels();
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
std::vector<AddressWeight> address_weights;
size_t sum = 0;
address_weights.reserve(num_subchannels);
for (size_t i = 0; i < num_subchannels; ++i) {
RingHashSubchannelData* sd = subchannel_list->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
AddressWeight address_weight;
address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false);
if (weight_attribute != nullptr) {
GPR_ASSERT(weight_attribute->weight() != 0);
address_weight.weight = weight_attribute->weight();
}
sum += address_weight.weight;
address_weights.push_back(std::move(address_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
for (auto& address : address_weights) {
address.normalized_weight = static_cast<double>(address.weight) / sum;
min_normalized_weight =
std::min(address.normalized_weight, min_normalized_weight);
max_normalized_weight =
std::max(address.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
// with whole numbers, and that's fine (the ring-building algorithm below can
// handle this). This preserves the original implementation's behavior: when
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t min_ring_size = parent_->config_->min_ring_size();
const size_t max_ring_size = parent_->config_->max_ring_size();
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
// Reserve memory for the entire ring up front.
const uint64_t ring_size = std::ceil(scale);
ring_.reserve(ring_size);
// Populate the hash ring by walking through the (host, weight) pairs in
// normalized_host_weights, and generating (scale * weight) hashes for each
// host. Since these aren't necessarily whole numbers, we maintain running
// sums -- current_hashes and target_hashes -- which allows us to populate the
// ring in a mostly stable way.
absl::InlinedVector<char, 196> hash_key_buffer;
double current_hashes = 0.0;
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < num_subchannels; ++i) {
const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
auto current_state =
subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState();
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash,
subchannel_list->subchannel(i)->subchannel()->Ref(),
current_state});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
}
min_hashes_per_host =
std::min(static_cast<uint64_t>(i), min_hashes_per_host);
max_hashes_per_host =
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const RingEntry& lhs, const RingEntry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p picker %p] created picker from subchannel_list=%p "
"with %" PRIuPTR " ring entries",
parent_.get(), this, subchannel_list, ring_.size());
// for (const auto& r : ring_) {
// gpr_log(GPR_INFO, "donn ring hash: %" PRIx64 " subchannel: %p state: %d",
// r.hash, r.subchannel.get(), r.connectivity_state);
//}
}
}
RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
PickResult result;
// Initialize to PICK_FAILED.
result.type = PickResult::PICK_FAILED;
auto hash =
args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash value is not a number").c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them!
int64_t lowp = 0;
int64_t highp = ring_.size();
int64_t first_index = 0;
while (true) {
first_index = (lowp + highp) / 2;
if (first_index == static_cast<int64_t>(ring_.size())) {
first_index = 0;
break;
}
uint64_t midval = ring_[first_index].hash;
uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash;
if (h <= midval && h > midval1) {
break;
}
if (midval < h) {
lowp = first_index + 1;
} else {
highp = first_index - 1;
}
if (lowp > highp) {
first_index = 0;
break;
}
}
OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
auto ScheduleSubchannelConnectionAttempt =
[&](RefCountedPtr<SubchannelInterface> subchannel) {
if (subchannel_connection_attempter == nullptr) {
subchannel_connection_attempter =
MakeOrphanable<SubchannelConnectionAttempter>(parent_);
}
subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
};
switch (ring_[first_index].connectivity_state) {
case GRPC_CHANNEL_READY:
result.type = PickResult::PICK_COMPLETE;
result.subchannel = ring_[first_index].subchannel;
return result;
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break;
}
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
// Loop through remaining subchannels to find one in READY.
// On the way, we make sure the right set of connection attempts
// will happen.
bool found_second_subchannel = false;
bool found_first_non_failed = false;
for (size_t i = 1; i < ring_.size(); ++i) {
const RingEntry& entry = ring_[(first_index + i) % ring_.size()];
if (entry.subchannel == ring_[first_index].subchannel) {
continue;
}
if (entry.connectivity_state == GRPC_CHANNEL_READY) {
result.type = PickResult::PICK_COMPLETE;
result.subchannel = entry.subchannel;
return result;
}
if (!found_second_subchannel) {
switch (entry.connectivity_state) {
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(entry.subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
default:
break;
}
found_second_subchannel = true;
}
if (!found_first_non_failed) {
if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
} else {
if (entry.connectivity_state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
}
found_first_non_failed = true;
}
}
}
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash found a subchannel "
"that is in TRANSIENT_FAILURE state")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
//
// RingHash::RingHashSubchannelList
//
void RingHash::RingHashSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked();
subchannel(i)->UpdateConnectivityStateLocked(state);
}
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
}
}
RingHash* p = static_cast<RingHash*>(policy());
// Sending up the initial picker while all subchannels are in IDLE state.
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_idle_ > 0);
--num_idle_;
} else if (old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
if (new_state == GRPC_CHANNEL_IDLE) {
++num_idle_;
} else if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
// Sets the RH policy's connectivity state and generates a new picker based
// on the current subchannel list or requests an re-attempt by returning true..
bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
RingHash* p = static_cast<RingHash*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return false;
// The overall aggregation rules here are:
// 1. If there is at least one subchannel in READY state, report READY.
// 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
// TRANSIENT_FAILURE.
// 3. If there is at least one subchannel in CONNECTING state, report
// CONNECTING.
// 4. If there is at least one subchannel in IDLE state, report IDLE.
// 5. Otherwise, report TRANSIENT_FAILURE.
if (num_ready_ > 0) {
/* READY */
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
return false;
}
if (num_connecting_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return false;
}
if (num_idle_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
return false;
}
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to backend failing or idle"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
return true;
}
//
// RingHash::RingHashSubchannelData
//
void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state_),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent
// state changes until we go back into state READY.
if (!seen_failure_since_ready_) {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_failure_since_ready_ = true;
}
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
} else {
if (connectivity_state == GRPC_CHANNEL_READY) {
seen_failure_since_ready_ = false;
subchannel_list()->UpdateStateCountersLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
}
}
// Record last seen connectivity state.
last_connectivity_state_ = connectivity_state;
}
void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, subchannel());
}
p->channel_control_helper()->RequestReresolution();
}
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
bool transient_failure =
subchannel_list()->UpdateRingHashConnectivityStateLocked();
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
if (transient_failure &&
connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index);
next_sd->subchannel()->AttemptToConnect();
}
}
//
// RingHash
//
RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Created", this);
}
}
RingHash::~RingHash() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
}
void RingHash::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
}
shutdown_ = true;
subchannel_list_.reset();
}
void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
void RingHash::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses.size());
}
config_ = std::move(args.config);
// Filter out any address with weight 0.
ServerAddressList addresses;
addresses.reserve(args.addresses.size());
for (ServerAddress& address : args.addresses) {
const ServerAddressWeightAttribute* weight_attribute =
static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
addresses.push_back(std::move(address));
}
}
subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
if (subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately transition to TRANSIENT_FAILURE.
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
} else {
// Start watching the new list.
subchannel_list_->StartWatchingLocked();
}
}
//
// factory
//
class RingHashFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<RingHash>(std::move(args));
}
const char* name() const override { return kRingHash; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
size_t min_ring_size;
size_t max_ring_size;
std::vector<grpc_error_handle> error_list;
ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list);
if (error_list.empty()) {
return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"ring_hash_experimental LB policy config", &error_list);
return nullptr;
}
}
};
} // namespace
void GrpcLbPolicyRingHashInit() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::RingHashFactory>());
}
void GrpcLbPolicyRingHashShutdown() {}
} // namespace grpc_core