blob: 9d16ae1df9c987e3ac98a32bf21f0233b6799089 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <stddef.h>
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_priority_trace(false, "priority_lb");
namespace {
constexpr absl::string_view kPriority = "priority_experimental";
// How long we keep a child around for after it is no longer being used
// (either because it has been removed from the config or because we
// have switched to a higher-priority child).
constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
// Default for how long we wait for a newly created child to get connected
// before starting to attempt the next priority. Overridable via channel arg.
constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
// Config for priority LB policy.
class PriorityLbConfig : public LoadBalancingPolicy::Config {
public:
struct PriorityLbChild {
RefCountedPtr<LoadBalancingPolicy::Config> config;
bool ignore_reresolution_requests = false;
};
PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
std::vector<std::string> priorities)
: children_(std::move(children)), priorities_(std::move(priorities)) {}
absl::string_view name() const override { return kPriority; }
const std::map<std::string, PriorityLbChild>& children() const {
return children_;
}
const std::vector<std::string>& priorities() const { return priorities_; }
private:
const std::map<std::string, PriorityLbChild> children_;
const std::vector<std::string> priorities_;
};
// priority LB policy.
class PriorityLb : public LoadBalancingPolicy {
public:
explicit PriorityLb(Args args);
absl::string_view name() const override { return kPriority; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// Each ChildPriority holds a ref to the PriorityLb.
class ChildPriority : public InternallyRefCounted<ChildPriority> {
public:
ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
~ChildPriority() override {
priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
}
const std::string& name() const { return name_; }
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
void ExitIdleLocked();
void ResetBackoffLocked();
void MaybeDeactivateLocked();
void MaybeReactivateLocked();
void Orphan() override;
std::unique_ptr<SubchannelPicker> GetPicker();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
const absl::Status& connectivity_status() const {
return connectivity_status_;
}
bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
private:
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A non-ref-counted wrapper for RefCountedPicker.
class RefCountedPickerWrapper : public SubchannelPicker {
public:
explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
private:
RefCountedPtr<RefCountedPicker> picker_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<ChildPriority> priority)
: priority_(std::move(priority)) {}
~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<ChildPriority> priority_;
};
class DeactivationTimer : public InternallyRefCounted<DeactivationTimer> {
public:
explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
};
class FailoverTimer : public InternallyRefCounted<FailoverTimer> {
public:
explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
static void OnTimer(void* arg, grpc_error_handle error);
void OnTimerLocked(grpc_error_handle);
RefCountedPtr<ChildPriority> child_priority_;
grpc_timer timer_;
grpc_closure on_timer_;
bool timer_pending_ = true;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker);
RefCountedPtr<PriorityLb> priority_policy_;
const std::string name_;
bool ignore_reresolution_requests_ = false;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
absl::Status connectivity_status_;
RefCountedPtr<RefCountedPicker> picker_wrapper_;
bool seen_ready_or_idle_since_transient_failure_ = true;
OrphanablePtr<DeactivationTimer> deactivation_timer_;
OrphanablePtr<FailoverTimer> failover_timer_;
};
~PriorityLb() override;
void ShutdownLocked() override;
// Returns the priority of the specified child name, or UINT32_MAX if
// the child is not in the current priority list.
uint32_t GetChildPriorityLocked(const std::string& child_name) const;
// Called when a child's connectivity state has changed.
// May propagate the update to the channel or trigger choosing a new
// priority.
void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
// Deletes a child. Called when the child's deactivation timer fires.
void DeleteChild(ChildPriority* child);
// Iterates through the list of priorities to choose one:
// - If the child for a priority doesn't exist, creates it.
// - If a child's failover timer is pending, selects that priority
// while we wait for the child to attempt to connect.
// - If the child is connected, selects that priority.
// - Otherwise, continues on to the next child.
// Delegates to the last child if none of the children are connecting.
// Reports TRANSIENT_FAILURE if the priority list is empty.
//
// This method is idempotent; it should yield the same result every
// time as a function of the state of the children.
void ChoosePriorityLocked();
// Sets the specified priority as the current priority.
// Deactivates any children at lower priorities.
// Returns the child's picker to the channel.
void SetCurrentPriorityLocked(uint32_t priority);
const Duration child_failover_timeout_;
// Current channel args and config from the resolver.
ChannelArgs args_;
RefCountedPtr<PriorityLbConfig> config_;
absl::StatusOr<HierarchicalAddressMap> addresses_;
std::string resolution_note_;
// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;
// All children that currently exist.
// Some of these children may be in deactivated state.
std::map<std::string, OrphanablePtr<ChildPriority>> children_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
// Points to the current child from before the most recent update.
// We will continue to use this child until we decide which of the new
// children to use.
ChildPriority* current_child_from_before_update_ = nullptr;
};
//
// PriorityLb
//
PriorityLb::PriorityLb(Args args)
: LoadBalancingPolicy(std::move(args)),
child_failover_timeout_(std::max(
Duration::Zero(),
args.args
.GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
.value_or(kDefaultChildFailoverTimeout))) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] created", this);
}
}
PriorityLb::~PriorityLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
}
}
void PriorityLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
}
shutting_down_ = true;
children_.clear();
}
void PriorityLb::ExitIdleLocked() {
if (current_priority_ != UINT32_MAX) {
const std::string& child_name = config_->priorities()[current_priority_];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] exiting IDLE for current priority %d child %s",
this, current_priority_, child_name.c_str());
}
children_[child_name]->ExitIdleLocked();
}
}
void PriorityLb::ResetBackoffLocked() {
for (const auto& p : children_) p.second->ResetBackoffLocked();
}
void PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
}
// Save current child.
if (current_priority_ != UINT32_MAX) {
const std::string& child_name = config_->priorities()[current_priority_];
auto* child = children_[child_name].get();
GPR_ASSERT(child != nullptr);
if (child->connectivity_state() == GRPC_CHANNEL_READY) {
current_child_from_before_update_ = children_[child_name].get();
}
}
// Update config.
config_ = std::move(args.config);
// Update args.
args_ = std::move(args.args);
// Update addresses.
addresses_ = MakeHierarchicalAddressMap(args.addresses);
resolution_note_ = std::move(args.resolution_note);
// Check all existing children against the new config.
update_in_progress_ = true;
for (const auto& p : children_) {
const std::string& child_name = p.first;
auto& child = p.second;
auto config_it = config_->children().find(child_name);
if (config_it == config_->children().end()) {
// Existing child not found in new config. Deactivate it.
child->MaybeDeactivateLocked();
} else {
// Existing child found in new config. Update it.
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
}
}
update_in_progress_ = false;
// Try to get connected.
ChoosePriorityLocked();
}
uint32_t PriorityLb::GetChildPriorityLocked(
const std::string& child_name) const {
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
if (config_->priorities()[priority] == child_name) return priority;
}
return UINT32_MAX;
}
void PriorityLb::HandleChildConnectivityStateChangeLocked(
ChildPriority* child) {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead choose a new priority once the update has been seen by
// all children. This ensures that we don't incorrectly do the wrong
// thing while state is inconsistent.
if (update_in_progress_) return;
// Special case for the child that was the current child before the
// most recent update.
if (child == current_child_from_before_update_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] state update for current child from before "
"config update",
this);
}
if (child->connectivity_state() == GRPC_CHANNEL_READY ||
child->connectivity_state() == GRPC_CHANNEL_IDLE) {
// If it's still READY or IDLE, we stick with this child, so pass
// the new picker up to our parent.
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
} else {
// If it's no longer READY or IDLE, we should stop using it.
// We already started trying other priorities as a result of the
// update, but calling ChoosePriorityLocked() ensures that we will
// properly select between CONNECTING and TRANSIENT_FAILURE as the
// new state to report to our parent.
current_child_from_before_update_ = nullptr;
ChoosePriorityLocked();
}
return;
}
// Otherwise, find the child's priority.
uint32_t child_priority = GetChildPriorityLocked(child->name());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] state update for priority %u, child %s, current "
"priority %u",
this, child_priority, child->name().c_str(), current_priority_);
}
// Unconditionally call ChoosePriorityLocked(). It should do the
// right thing based on the state of all children.
ChoosePriorityLocked();
}
void PriorityLb::DeleteChild(ChildPriority* child) {
// If this was the current child from before the most recent update,
// stop using it. We already started trying other priorities as a
// result of the update, but calling ChoosePriorityLocked() ensures that
// we will properly select between CONNECTING and TRANSIENT_FAILURE as the
// new state to report to our parent.
if (current_child_from_before_update_ == child) {
current_child_from_before_update_ = nullptr;
ChoosePriorityLocked();
}
children_.erase(child->name());
}
void PriorityLb::ChoosePriorityLocked() {
// If priority list is empty, report TF.
if (config_->priorities().empty()) {
current_child_from_before_update_ = nullptr;
absl::Status status =
absl::UnavailableError("priority policy has empty priority list");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return;
}
// Iterate through priorities, searching for one in READY or IDLE,
// creating new children as needed.
current_priority_ = UINT32_MAX;
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
const std::string& child_name = config_->priorities()[priority];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
priority, child_name.c_str());
}
auto& child = children_[child_name];
if (child == nullptr) {
// If we're not still using an old child from before the last
// update, report CONNECTING here.
// This is probably not strictly necessary, since the child should
// immediately report CONNECTING and cause us to report that state
// anyway, but we do this just in case the child fails to report
// state before UpdateLocked() returns.
if (current_child_from_before_update_ == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
}
current_priority_ = priority;
child = MakeOrphanable<ChildPriority>(
Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end());
child->UpdateLocked(child_config->second.config,
child_config->second.ignore_reresolution_requests);
return;
}
// The child already exists.
child->MaybeReactivateLocked();
// If the child is in state READY or IDLE, switch to it.
if (child->connectivity_state() == GRPC_CHANNEL_READY ||
child->connectivity_state() == GRPC_CHANNEL_IDLE) {
SetCurrentPriorityLocked(priority);
return;
}
// Child is not READY or IDLE.
// If its failover timer is still pending, give it time to fire.
if (child->FailoverTimerPending()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] priority %u, child %s: child still "
"attempting to connect, will wait",
this, priority, child_name.c_str());
}
current_priority_ = priority;
// If we're not still using an old child from before the last
// update, report CONNECTING here.
if (current_child_from_before_update_ == nullptr) {
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
return;
}
// Child has been failing for a while. Move on to the next priority.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] skipping priority %u, child %s: state=%s, "
"failover timer not pending",
this, priority, child_name.c_str(),
ConnectivityStateName(child->connectivity_state()));
}
}
// If we didn't find any priority to try, pick the first one in state
// CONNECTING.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] no priority reachable, checking for CONNECTING "
"priority to delegate to",
this);
}
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
const std::string& child_name = config_->priorities()[priority];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
priority, child_name.c_str());
}
auto& child = children_[child_name];
GPR_ASSERT(child != nullptr);
if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
return;
}
}
// Did not find any child in CONNECTING, delegate to last child.
const std::string& child_name = config_->priorities().back();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] no priority in CONNECTING, delegating to "
"lowest priority child %s",
this, child_name.c_str());
}
auto& child = children_[child_name];
GPR_ASSERT(child != nullptr);
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
void PriorityLb::SetCurrentPriorityLocked(uint32_t priority) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
priority, config_->priorities()[priority].c_str());
}
current_priority_ = priority;
current_child_from_before_update_ = nullptr;
// Deactivate lower priorities.
for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
const std::string& child_name = config_->priorities()[p];
auto it = children_.find(child_name);
if (it != children_.end()) it->second->MaybeDeactivateLocked();
}
// Update picker.
auto& child = children_[config_->priorities()[priority]];
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
//
// PriorityLb::ChildPriority::DeactivationTimer
//
PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivating -- will remove in "
"%" PRId64 "ms",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get(),
kChildRetentionInterval.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(&timer_, ExecCtx::Get()->Now() + kChildRetentionInterval,
&on_timer_);
}
void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
if (timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
}
Unref();
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<DeactivationTimer*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked(
grpc_error_handle error) {
if (GRPC_ERROR_IS_NONE(error) && timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivation timer fired, "
"deleting child",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->priority_policy_->DeleteChild(child_priority_.get());
}
Unref(DEBUG_LOCATION, "Timer");
GRPC_ERROR_UNREF(error);
}
//
// PriorityLb::ChildPriority::FailoverTimer
//
PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(
GPR_INFO,
"[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
"ms",
child_priority_->priority_policy_.get(), child_priority_->name_.c_str(),
child_priority_.get(),
child_priority_->priority_policy_->child_failover_timeout_.millis());
}
GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr);
Ref(DEBUG_LOCATION, "Timer").release();
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() +
child_priority_->priority_policy_->child_failover_timeout_,
&on_timer_);
}
void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
if (timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): cancelling failover timer",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
grpc_timer_cancel(&timer_);
}
Unref();
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimer(
void* arg, grpc_error_handle error) {
auto* self = static_cast<FailoverTimer*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
self->child_priority_->priority_policy_->work_serializer()->Run(
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked(
grpc_error_handle error) {
if (GRPC_ERROR_IS_NONE(error) && timer_pending_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): failover timer fired, "
"reporting TRANSIENT_FAILURE",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
timer_pending_ = false;
child_priority_->OnConnectivityStateUpdateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
nullptr);
}
Unref(DEBUG_LOCATION, "Timer");
GRPC_ERROR_UNREF(error);
}
//
// PriorityLb::ChildPriority
//
PriorityLb::ChildPriority::ChildPriority(
RefCountedPtr<PriorityLb> priority_policy, std::string name)
: priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
priority_policy_.get(), name_.c_str(), this);
}
// Start the failover timer.
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
void PriorityLb::ChildPriority::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
priority_policy_.get(), name_.c_str(), this);
}
failover_timer_.reset();
deactivation_timer_.reset();
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
priority_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
}
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker>
PriorityLb::ChildPriority::GetPicker() {
if (picker_wrapper_ == nullptr) {
return absl::make_unique<QueuePicker>(
priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
}
return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
}
void PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) {
if (priority_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
priority_policy_.get(), name_.c_str(), this);
}
ignore_reresolution_requests_ = ignore_reresolution_requests;
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
if (priority_policy_->addresses_.ok()) {
update_args.addresses = (*priority_policy_->addresses_)[name_];
} else {
update_args.addresses = priority_policy_->addresses_.status();
}
update_args.resolution_note = priority_policy_->resolution_note_;
update_args.args = priority_policy_->args_;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): updating child policy handler %p",
priority_policy_.get(), name_.c_str(), this, child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
OrphanablePtr<LoadBalancingPolicy>
PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = priority_policy_->work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_priority_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): created new child policy "
"handler %p",
priority_policy_.get(), name_.c_str(), this, lb_policy.get());
}
// Add the parent's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on the parent LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
priority_policy_->interested_parties());
return lb_policy;
}
void PriorityLb::ChildPriority::ExitIdleLocked() {
child_policy_->ExitIdleLocked();
}
void PriorityLb::ChildPriority::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
priority_policy_.get(), name_.c_str(), this,
ConnectivityStateName(state), status.ToString().c_str(),
picker.get());
}
// Store the state and picker.
connectivity_state_ = state;
connectivity_status_ = status;
// When the failover timer fires, this method will be called with picker
// set to null, because we want to consider the child to be in
// TRANSIENT_FAILURE, but we have no new picker to report. In that case,
// just keep using the old picker, in case we wind up delegating to this
// child when all priorities are failing.
if (picker != nullptr) {
picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
}
// If we transition to state CONNECTING and we've not seen
// TRANSIENT_FAILURE more recently than READY or IDLE, start failover
// timer if not already pending.
// In any other state, update seen_ready_or_idle_since_transient_failure_
// and cancel failover timer.
if (state == GRPC_CHANNEL_CONNECTING) {
if (seen_ready_or_idle_since_transient_failure_ &&
failover_timer_ == nullptr) {
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
} else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
seen_ready_or_idle_since_transient_failure_ = true;
failover_timer_.reset();
} else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_ready_or_idle_since_transient_failure_ = false;
failover_timer_.reset();
}
// Notify the parent policy.
priority_policy_->HandleChildConnectivityStateChangeLocked(this);
}
void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
if (deactivation_timer_ == nullptr) {
deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
}
}
void PriorityLb::ChildPriority::MaybeReactivateLocked() {
deactivation_timer_.reset();
}
//
// PriorityLb::ChildPriority::Helper
//
RefCountedPtr<SubchannelInterface>
PriorityLb::ChildPriority::Helper::CreateSubchannel(ServerAddress address,
const ChannelArgs& args) {
if (priority_->priority_policy_->shutting_down_) return nullptr;
return priority_->priority_policy_->channel_control_helper()
->CreateSubchannel(std::move(address), args);
}
void PriorityLb::ChildPriority::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (priority_->priority_policy_->shutting_down_) return;
// Notify the priority.
priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
}
void PriorityLb::ChildPriority::Helper::RequestReresolution() {
if (priority_->priority_policy_->shutting_down_) return;
if (priority_->ignore_reresolution_requests_) {
return;
}
priority_->priority_policy_->channel_control_helper()->RequestReresolution();
}
absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
return priority_->priority_policy_->channel_control_helper()->GetAuthority();
}
void PriorityLb::ChildPriority::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (priority_->priority_policy_->shutting_down_) return;
priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
}
//
// factory
//
class PriorityLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<PriorityLb>(std::move(args));
}
absl::string_view name() const override { return kPriority; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
// priority was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API.
return absl::InvalidArgumentError(
"field:loadBalancingPolicy error:priority policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
// Children.
std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
auto it = json.object_value().find("children");
if (it == json.object_value().end()) {
errors.emplace_back("field:children error:required field missing");
} else if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back("field:children error:type should be object");
} else {
const Json::Object& object = it->second.object_value();
for (const auto& p : object) {
const std::string& child_name = p.first;
const Json& element = p.second;
if (element.type() != Json::Type::OBJECT) {
errors.emplace_back(absl::StrCat("field:children key:", child_name,
" error:should be type object"));
} else {
auto it2 = element.object_value().find("config");
if (it2 == element.object_value().end()) {
errors.emplace_back(absl::StrCat("field:children key:", child_name,
" error:missing 'config' field"));
} else {
bool ignore_resolution_requests = false;
// If present, ignore_reresolution_requests must be of type
// boolean.
auto it3 =
element.object_value().find("ignore_reresolution_requests");
if (it3 != element.object_value().end()) {
if (it3->second.type() == Json::Type::JSON_TRUE) {
ignore_resolution_requests = true;
} else if (it3->second.type() != Json::Type::JSON_FALSE) {
errors.emplace_back(
absl::StrCat("field:children key:", child_name,
" field:ignore_reresolution_requests:should "
"be type boolean"));
}
}
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it2->second);
if (!config.ok()) {
errors.emplace_back(
absl::StrCat("field:children key:", child_name, ": ",
config.status().message()));
} else {
children[child_name].config = std::move(*config);
children[child_name].ignore_reresolution_requests =
ignore_resolution_requests;
}
}
}
}
}
// Priorities.
std::vector<std::string> priorities;
it = json.object_value().find("priorities");
if (it == json.object_value().end()) {
errors.emplace_back("field:priorities error:required field missing");
} else if (it->second.type() != Json::Type::ARRAY) {
errors.emplace_back("field:priorities error:type should be array");
} else {
const Json::Array& array = it->second.array_value();
for (size_t i = 0; i < array.size(); ++i) {
const Json& element = array[i];
if (element.type() != Json::Type::STRING) {
errors.emplace_back(absl::StrCat("field:priorities element:", i,
" error:should be type string"));
} else if (children.find(element.string_value()) == children.end()) {
errors.emplace_back(absl::StrCat("field:priorities element:", i,
" error:unknown child '",
element.string_value(), "'"));
} else {
priorities.emplace_back(element.string_value());
}
}
if (priorities.size() != children.size()) {
errors.emplace_back(absl::StrCat(
"field:priorities error:priorities size (", priorities.size(),
") != children size (", children.size(), ")"));
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("priority_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<PriorityLbConfig>(std::move(children),
std::move(priorities));
}
};
} // namespace
void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
absl::make_unique<PriorityLbFactory>());
}
} // namespace grpc_core