blob: 95160d543b4c2c7bf21720b6e297ae0a4413b581 [file] [log] [blame]
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <stdlib.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/endpoint_list.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/resolver/endpoint_addresses.h"
namespace grpc_core {
TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
namespace {
constexpr absl::string_view kRoundRobin = "round_robin";
class RoundRobin final : public LoadBalancingPolicy {
public:
explicit RoundRobin(Args args);
absl::string_view name() const override { return kRoundRobin; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
class RoundRobinEndpointList final : public EndpointList {
public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinEndpointList"
: nullptr) {
Init(endpoints, args,
[&](RefCountedPtr<EndpointList> endpoint_list,
const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable<RoundRobinEndpoint>(
std::move(endpoint_list), addresses, args,
policy<RoundRobin>()->work_serializer());
});
}
private:
class RoundRobinEndpoint final : public Endpoint {
public:
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
const EndpointAddresses& addresses,
const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
: Endpoint(std::move(endpoint_list)) {
Init(addresses, args, std::move(work_serializer));
}
private:
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state,
const absl::Status& status) override;
};
LoadBalancingPolicy::ChannelControlHelper* channel_control_helper()
const override {
return policy<RoundRobin>()->channel_control_helper();
}
// Updates the counters of children in each state when a
// child transitions from old_state to new_state.
void UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state);
// Ensures that the right child list is used and then updates
// the RR policy's connectivity state based on the child list's
// state counters.
void MaybeUpdateRoundRobinConnectivityStateLocked(
absl::Status status_for_tf);
std::string CountersString() const {
return absl::StrCat("num_children=", size(), " num_ready=", num_ready_,
" num_connecting=", num_connecting_,
" num_transient_failure=", num_transient_failure_);
}
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
absl::Status last_failure_;
};
class Picker final : public SubchannelPicker {
public:
Picker(RoundRobin* parent,
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>
pickers);
PickResult Pick(PickArgs args) override;
private:
// Using pointer value only, no ref held -- do not dereference!
RoundRobin* parent_;
std::atomic<size_t> last_picked_index_;
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers_;
};
~RoundRobin() override;
void ShutdownLocked() override;
// Current child list.
OrphanablePtr<RoundRobinEndpointList> endpoint_list_;
// Latest pending child list.
// When we get an updated address list, we create a new child list
// for it here, and we wait to swap it into endpoint_list_ until the new
// list becomes READY.
OrphanablePtr<RoundRobinEndpointList> latest_pending_endpoint_list_;
bool shutdown_ = false;
absl::BitGen bit_gen_;
};
//
// RoundRobin::Picker
//
RoundRobin::Picker::Picker(
RoundRobin* parent,
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers)
: parent_(parent), pickers_(std::move(pickers)) {
// For discussion on why we generate a random starting index for
// the picker, see https://github.com/grpc/grpc-go/issues/2580.
size_t index = absl::Uniform<size_t>(parent->bit_gen_, 0, pickers_.size());
last_picked_index_.store(index, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] created picker from endpoint_list=%p "
"with %" PRIuPTR " READY children; last_picked_index_=%" PRIuPTR,
parent_, this, parent_->endpoint_list_.get(), pickers_.size(),
index);
}
}
RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) {
size_t index = last_picked_index_.fetch_add(1, std::memory_order_relaxed) %
pickers_.size();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p picker %p] using picker index %" PRIuPTR ", picker=%p",
parent_, this, index, pickers_[index].get());
}
return pickers_[index]->Pick(args);
}
//
// RoundRobin
//
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Created", this);
}
}
RoundRobin::~RoundRobin() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
GPR_ASSERT(endpoint_list_ == nullptr);
GPR_ASSERT(latest_pending_endpoint_list_ == nullptr);
}
void RoundRobin::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
}
shutdown_ = true;
endpoint_list_.reset();
latest_pending_endpoint_list_.reset();
}
void RoundRobin::ResetBackoffLocked() {
endpoint_list_->ResetBackoffLocked();
if (latest_pending_endpoint_list_ != nullptr) {
latest_pending_endpoint_list_->ResetBackoffLocked();
}
}
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update", this);
}
addresses = args.addresses->get();
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
// If we already have a child list, then keep using the existing
// list, but still report back that the update was not accepted.
if (endpoint_list_ != nullptr) return args.addresses.status();
}
// Create new child list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
latest_pending_endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this,
latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
addresses, args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
endpoint_list_ != nullptr) {
gpr_log(GPR_INFO, "[RR %p] replacing previous child list %p", this,
endpoint_list_.get());
}
endpoint_list_ = std::move(latest_pending_endpoint_list_);
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
: args.addresses.status();
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
return status;
}
// Otherwise, if this is the initial update, immediately promote it to
// endpoint_list_.
if (endpoint_list_ == nullptr) {
endpoint_list_ = std::move(latest_pending_endpoint_list_);
}
return absl::OkStatus();
}
//
// RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint
//
void RoundRobin::RoundRobinEndpointList::RoundRobinEndpoint::OnStateUpdate(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state, const absl::Status& status) {
auto* rr_endpoint_list = endpoint_list<RoundRobinEndpointList>();
auto* round_robin = policy<RoundRobin>();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for child %p, endpoint_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s (%s)",
round_robin, this, rr_endpoint_list, Index(), rr_endpoint_list->size(),
(old_state.has_value() ? ConnectivityStateName(*old_state) : "N/A"),
ConnectivityStateName(new_state), status.ToString().c_str());
}
if (new_state == GRPC_CHANNEL_IDLE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] child %p reported IDLE; requesting connection",
round_robin, this);
}
ExitIdleLocked();
}
// If state changed, update state counters.
if (!old_state.has_value() || *old_state != new_state) {
rr_endpoint_list->UpdateStateCountersLocked(old_state, new_state);
}
// Update the policy state.
rr_endpoint_list->MaybeUpdateRoundRobinConnectivityStateLocked(status);
}
//
// RoundRobin::RoundRobinEndpointList
//
void RoundRobin::RoundRobinEndpointList::UpdateStateCountersLocked(
absl::optional<grpc_connectivity_state> old_state,
grpc_connectivity_state new_state) {
// We treat IDLE the same as CONNECTING, since it will immediately
// transition into that state anyway.
if (old_state.has_value()) {
GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN);
if (*old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (*old_state == GRPC_CHANNEL_CONNECTING ||
*old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
}
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING ||
new_state == GRPC_CHANNEL_IDLE) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
void RoundRobin::RoundRobinEndpointList::
MaybeUpdateRoundRobinConnectivityStateLocked(absl::Status status_for_tf) {
auto* round_robin = policy<RoundRobin>();
// If this is latest_pending_endpoint_list_, then swap it into
// endpoint_list_ in the following cases:
// - endpoint_list_ has no READY children.
// - This list has at least one READY child and we have seen the
// initial connectivity state notification for all children.
// - All of the children in this list are in TRANSIENT_FAILURE.
// (This may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we're doing what the control plane told us to do.)
if (round_robin->latest_pending_endpoint_list_.get() == this &&
(round_robin->endpoint_list_->num_ready_ == 0 ||
(num_ready_ > 0 && AllEndpointsSeenInitialState()) ||
num_transient_failure_ == size())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
const std::string old_counters_string =
round_robin->endpoint_list_ != nullptr
? round_robin->endpoint_list_->CountersString()
: "";
gpr_log(GPR_INFO,
"[RR %p] swapping out child list %p (%s) in favor of %p (%s)",
round_robin, round_robin->endpoint_list_.get(),
old_counters_string.c_str(), this, CountersString().c_str());
}
round_robin->endpoint_list_ =
std::move(round_robin->latest_pending_endpoint_list_);
}
// Only set connectivity state if this is the current child list.
if (round_robin->endpoint_list_.get() != this) return;
// First matching rule wins:
// 1) ANY child is READY => policy is READY.
// 2) ANY child is CONNECTING => policy is CONNECTING.
// 3) ALL children are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE.
if (num_ready_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting READY with child list %p",
round_robin, this);
}
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
for (const auto& endpoint : endpoints()) {
auto state = endpoint->connectivity_state();
if (state.has_value() && *state == GRPC_CHANNEL_READY) {
pickers.push_back(endpoint->picker());
}
}
GPR_ASSERT(!pickers.empty());
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::OkStatus(),
MakeRefCounted<Picker>(round_robin, std::move(pickers)));
} else if (num_connecting_ > 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] reporting CONNECTING with child list %p",
round_robin, this);
}
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
MakeRefCounted<QueuePicker>(nullptr));
} else if (num_transient_failure_ == size()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
"[RR %p] reporting TRANSIENT_FAILURE with child list %p: %s",
round_robin, this, status_for_tf.ToString().c_str());
}
if (!status_for_tf.ok()) {
last_failure_ = absl::UnavailableError(
absl::StrCat("connections to all backends failing; last error: ",
status_for_tf.message()));
}
round_robin->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_,
MakeRefCounted<TransientFailurePicker>(last_failure_));
}
}
//
// factory
//
class RoundRobinConfig final : public LoadBalancingPolicy::Config {
public:
absl::string_view name() const override { return kRoundRobin; }
};
class RoundRobinFactory final : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<RoundRobin>(std::move(args));
}
absl::string_view name() const override { return kRoundRobin; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& /*json*/) const override {
return MakeRefCounted<RoundRobinConfig>();
}
};
} // namespace
void RegisterRoundRobinLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<RoundRobinFactory>());
}
} // namespace grpc_core