blob: f4a211d0bbc12faaa8c11e4982d945471424e87c [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_lb_priority_trace(false, "priority_lb");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
constexpr absl::string_view kPriority = "priority_experimental";
// How long we keep a child around for after it is no longer being used
// (either because it has been removed from the config or because we
// have switched to a higher-priority child).
constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
// Default for how long we wait for a newly created child to get connected
// before starting to attempt the next priority. Overridable via channel arg.
constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10);
// Config for priority LB policy.
class PriorityLbConfig : public LoadBalancingPolicy::Config {
public:
struct PriorityLbChild {
RefCountedPtr<LoadBalancingPolicy::Config> config;
bool ignore_reresolution_requests = false;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
};
PriorityLbConfig() = default;
PriorityLbConfig(const PriorityLbConfig&) = delete;
PriorityLbConfig& operator=(const PriorityLbConfig&) = delete;
PriorityLbConfig(PriorityLbConfig&& other) = delete;
PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete;
absl::string_view name() const override { return kPriority; }
const std::map<std::string, PriorityLbChild>& children() const {
return children_;
}
const std::vector<std::string>& priorities() const { return priorities_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
private:
std::map<std::string, PriorityLbChild> children_;
std::vector<std::string> priorities_;
};
// priority LB policy.
class PriorityLb : public LoadBalancingPolicy {
public:
explicit PriorityLb(Args args);
absl::string_view name() const override { return kPriority; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
// Each ChildPriority holds a ref to the PriorityLb.
class ChildPriority : public InternallyRefCounted<ChildPriority> {
public:
ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
~ChildPriority() override {
priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
}
const std::string& name() const { return name_; }
absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests);
void ExitIdleLocked();
void ResetBackoffLocked();
void MaybeDeactivateLocked();
void MaybeReactivateLocked();
void Orphan() override;
RefCountedPtr<SubchannelPicker> GetPicker();
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
const absl::Status& connectivity_status() const {
return connectivity_status_;
}
bool FailoverTimerPending() const { return failover_timer_ != nullptr; }
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<ChildPriority> priority)
: priority_(std::move(priority)) {}
~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
EventEngine* GetEventEngine() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<ChildPriority> priority_;
};
class DeactivationTimer : public InternallyRefCounted<DeactivationTimer> {
public:
explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
void OnTimerLocked();
RefCountedPtr<ChildPriority> child_priority_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
};
class FailoverTimer : public InternallyRefCounted<FailoverTimer> {
public:
explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority);
void Orphan() override;
private:
void OnTimerLocked();
RefCountedPtr<ChildPriority> child_priority_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker);
RefCountedPtr<PriorityLb> priority_policy_;
const std::string name_;
bool ignore_reresolution_requests_ = false;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
absl::Status connectivity_status_;
RefCountedPtr<SubchannelPicker> picker_;
bool seen_ready_or_idle_since_transient_failure_ = true;
OrphanablePtr<DeactivationTimer> deactivation_timer_;
OrphanablePtr<FailoverTimer> failover_timer_;
};
~PriorityLb() override;
void ShutdownLocked() override;
// Returns the priority of the specified child name, or UINT32_MAX if
// the child is not in the current priority list.
uint32_t GetChildPriorityLocked(const std::string& child_name) const;
// Deletes a child. Called when the child's deactivation timer fires.
void DeleteChild(ChildPriority* child);
// Iterates through the list of priorities to choose one:
// - If the child for a priority doesn't exist, creates it.
// - If a child's failover timer is pending, selects that priority
// while we wait for the child to attempt to connect.
// - If the child is connected, selects that priority.
// - Otherwise, continues on to the next child.
// Delegates to the last child if none of the children are connecting.
// Reports TRANSIENT_FAILURE if the priority list is empty.
//
// This method is idempotent; it should yield the same result every
// time as a function of the state of the children.
void ChoosePriorityLocked();
// Sets the specified priority as the current priority.
// Optionally deactivates any children at lower priorities.
// Returns the child's picker to the channel.
void SetCurrentPriorityLocked(int32_t priority,
bool deactivate_lower_priorities,
const char* reason);
const Duration child_failover_timeout_;
// Current channel args and config from the resolver.
ChannelArgs args_;
RefCountedPtr<PriorityLbConfig> config_;
absl::StatusOr<HierarchicalAddressMap> addresses_;
std::string resolution_note_;
// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;
// All children that currently exist.
// Some of these children may be in deactivated state.
std::map<std::string, OrphanablePtr<ChildPriority>> children_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
};
//
// PriorityLb
//
PriorityLb::PriorityLb(Args args)
: LoadBalancingPolicy(std::move(args)),
child_failover_timeout_(std::max(
Duration::Zero(),
args.args
.GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS)
.value_or(kDefaultChildFailoverTimeout))) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] created", this);
}
}
PriorityLb::~PriorityLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
}
}
void PriorityLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
}
shutting_down_ = true;
children_.clear();
}
void PriorityLb::ExitIdleLocked() {
if (current_priority_ != UINT32_MAX) {
const std::string& child_name = config_->priorities()[current_priority_];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] exiting IDLE for current priority %d child %s",
this, current_priority_, child_name.c_str());
}
children_[child_name]->ExitIdleLocked();
}
}
void PriorityLb::ResetBackoffLocked() {
for (const auto& p : children_) p.second->ResetBackoffLocked();
}
absl::Status PriorityLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
}
// Update config.
config_ = std::move(args.config);
// Update args.
args_ = std::move(args.args);
// Update addresses.
addresses_ = MakeHierarchicalAddressMap(args.addresses);
resolution_note_ = std::move(args.resolution_note);
// Check all existing children against the new config.
update_in_progress_ = true;
std::vector<std::string> errors;
for (const auto& p : children_) {
const std::string& child_name = p.first;
auto& child = p.second;
auto config_it = config_->children().find(child_name);
if (config_it == config_->children().end()) {
// Existing child not found in new config. Deactivate it.
child->MaybeDeactivateLocked();
} else {
// Existing child found in new config. Update it.
absl::Status status =
child->UpdateLocked(config_it->second.config,
config_it->second.ignore_reresolution_requests);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", child_name, ": ", status.ToString()));
}
}
}
update_in_progress_ = false;
// Try to get connected.
ChoosePriorityLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}
uint32_t PriorityLb::GetChildPriorityLocked(
const std::string& child_name) const {
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
if (config_->priorities()[priority] == child_name) return priority;
}
return UINT32_MAX;
}
void PriorityLb::DeleteChild(ChildPriority* child) {
children_.erase(child->name());
}
void PriorityLb::ChoosePriorityLocked() {
// If priority list is empty, report TF.
if (config_->priorities().empty()) {
absl::Status status =
absl::UnavailableError("priority policy has empty priority list");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
MakeRefCounted<TransientFailurePicker>(status));
return;
}
// Iterate through priorities, searching for one in READY or IDLE,
// creating new children as needed.
current_priority_ = UINT32_MAX;
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
const std::string& child_name = config_->priorities()[priority];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
priority, child_name.c_str());
}
auto& child = children_[child_name];
// Create child if needed.
if (child == nullptr) {
child = MakeOrphanable<ChildPriority>(
Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end());
// TODO(roth): If the child reports a non-OK status with the
// update, we need to propagate that back to the resolver somehow.
(void)child->UpdateLocked(
child_config->second.config,
child_config->second.ignore_reresolution_requests);
} else {
// The child already exists. Reactivate if needed.
child->MaybeReactivateLocked();
}
// Select this child if it is in states READY or IDLE.
if (child->connectivity_state() == GRPC_CHANNEL_READY ||
child->connectivity_state() == GRPC_CHANNEL_IDLE) {
SetCurrentPriorityLocked(
priority, /*deactivate_lower_priorities=*/true,
ConnectivityStateName(child->connectivity_state()));
return;
}
// Select this child if its failover timer is pending.
if (child->FailoverTimerPending()) {
SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
"failover timer pending");
return;
}
// Child has been failing for a while. Move on to the next priority.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] skipping priority %u, child %s: state=%s, "
"failover timer not pending",
this, priority, child_name.c_str(),
ConnectivityStateName(child->connectivity_state()));
}
}
// If we didn't find any priority to try, pick the first one in state
// CONNECTING.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] no priority reachable, checking for CONNECTING "
"priority to delegate to",
this);
}
for (uint32_t priority = 0; priority < config_->priorities().size();
++priority) {
// If the child for the priority does not exist yet, create it.
const std::string& child_name = config_->priorities()[priority];
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
priority, child_name.c_str());
}
auto& child = children_[child_name];
GPR_ASSERT(child != nullptr);
if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false,
"CONNECTING (pass 2)");
return;
}
}
// Did not find any child in CONNECTING, delegate to last child.
SetCurrentPriorityLocked(config_->priorities().size() - 1,
/*deactivate_lower_priorities=*/false,
"no usable children");
}
void PriorityLb::SetCurrentPriorityLocked(int32_t priority,
bool deactivate_lower_priorities,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] selecting priority %u, child %s (%s, "
"deactivate_lower_priorities=%d)",
this, priority, config_->priorities()[priority].c_str(), reason,
deactivate_lower_priorities);
}
current_priority_ = priority;
if (deactivate_lower_priorities) {
for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
const std::string& child_name = config_->priorities()[p];
auto it = children_.find(child_name);
if (it != children_.end()) it->second->MaybeDeactivateLocked();
}
}
auto& child = children_[config_->priorities()[priority]];
GPR_ASSERT(child != nullptr);
channel_control_helper()->UpdateState(child->connectivity_state(),
child->connectivity_status(),
child->GetPicker());
}
//
// PriorityLb::ChildPriority::DeactivationTimer
//
PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivating -- will remove in "
"%" PRId64 "ms",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get(),
kChildRetentionInterval.millis());
}
timer_handle_ =
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION,
"Timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->child_priority_->priority_policy_->work_serializer()->Run(
[self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void PriorityLb::ChildPriority::DeactivationTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() {
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): deactivation timer fired, "
"deleting child",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
child_priority_->priority_policy_->DeleteChild(child_priority_.get());
}
}
//
// PriorityLb::ChildPriority::FailoverTimer
//
PriorityLb::ChildPriority::FailoverTimer::FailoverTimer(
RefCountedPtr<PriorityLb::ChildPriority> child_priority)
: child_priority_(std::move(child_priority)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(
GPR_INFO,
"[priority_lb %p] child %s (%p): starting failover timer for %" PRId64
"ms",
child_priority_->priority_policy_.get(), child_priority_->name_.c_str(),
child_priority_.get(),
child_priority_->priority_policy_->child_failover_timeout_.millis());
}
timer_handle_ =
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->RunAfter(
child_priority_->priority_policy_->child_failover_timeout_,
[self = Ref(DEBUG_LOCATION, "Timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->child_priority_->priority_policy_->work_serializer()
->Run([self = std::move(self)]() { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void PriorityLb::ChildPriority::FailoverTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): cancelling failover timer",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
child_priority_->priority_policy_->channel_control_helper()
->GetEventEngine()
->Cancel(*timer_handle_);
timer_handle_.reset();
}
Unref();
}
void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() {
if (timer_handle_.has_value()) {
timer_handle_.reset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): failover timer fired, "
"reporting TRANSIENT_FAILURE",
child_priority_->priority_policy_.get(),
child_priority_->name_.c_str(), child_priority_.get());
}
child_priority_->OnConnectivityStateUpdateLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
nullptr);
}
}
//
// PriorityLb::ChildPriority
//
PriorityLb::ChildPriority::ChildPriority(
RefCountedPtr<PriorityLb> priority_policy, std::string name)
: priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
priority_policy_.get(), name_.c_str(), this);
}
// Start the failover timer.
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
void PriorityLb::ChildPriority::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
priority_policy_.get(), name_.c_str(), this);
}
failover_timer_.reset();
deactivation_timer_.reset();
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
priority_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_.reset();
Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
PriorityLb::ChildPriority::GetPicker() {
if (picker_ == nullptr) {
return MakeRefCounted<QueuePicker>(
priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker"));
}
return picker_;
}
absl::Status PriorityLb::ChildPriority::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
bool ignore_reresolution_requests) {
if (priority_policy_->shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
priority_policy_.get(), name_.c_str(), this);
}
ignore_reresolution_requests_ = ignore_reresolution_requests;
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = std::move(config);
if (priority_policy_->addresses_.ok()) {
update_args.addresses = (*priority_policy_->addresses_)[name_];
} else {
update_args.addresses = priority_policy_->addresses_.status();
}
update_args.resolution_note = priority_policy_->resolution_note_;
update_args.args = priority_policy_->args_;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): updating child policy handler %p",
priority_policy_.get(), name_.c_str(), this, child_policy_.get());
}
return child_policy_->UpdateLocked(std::move(update_args));
}
OrphanablePtr<LoadBalancingPolicy>
PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = priority_policy_->work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_priority_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): created new child policy "
"handler %p",
priority_policy_.get(), name_.c_str(), this, lb_policy.get());
}
// Add the parent's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on the parent LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
priority_policy_->interested_parties());
return lb_policy;
}
void PriorityLb::ChildPriority::ExitIdleLocked() {
child_policy_->ExitIdleLocked();
}
void PriorityLb::ChildPriority::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
gpr_log(GPR_INFO,
"[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
priority_policy_.get(), name_.c_str(), this,
ConnectivityStateName(state), status.ToString().c_str(),
picker.get());
}
// Store the state and picker.
connectivity_state_ = state;
connectivity_status_ = status;
// When the failover timer fires, this method will be called with picker
// set to null, because we want to consider the child to be in
// TRANSIENT_FAILURE, but we have no new picker to report. In that case,
// just keep using the old picker, in case we wind up delegating to this
// child when all priorities are failing.
if (picker != nullptr) picker_ = std::move(picker);
// If we transition to state CONNECTING and we've not seen
// TRANSIENT_FAILURE more recently than READY or IDLE, start failover
// timer if not already pending.
// In any other state, update seen_ready_or_idle_since_transient_failure_
// and cancel failover timer.
if (state == GRPC_CHANNEL_CONNECTING) {
if (seen_ready_or_idle_since_transient_failure_ &&
failover_timer_ == nullptr) {
failover_timer_ = MakeOrphanable<FailoverTimer>(Ref());
}
} else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) {
seen_ready_or_idle_since_transient_failure_ = true;
failover_timer_.reset();
} else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_ready_or_idle_since_transient_failure_ = false;
failover_timer_.reset();
}
// Call the LB policy's ChoosePriorityLocked() to choose a priority to
// use based on the updated state of this child.
//
// Note that if we're in the process of propagating an update from our
// parent to our children, we skip this, because we don't want to
// choose a new priority based on inconsistent state. Instead, the
// policy will choose a new priority once the update has been seen by
// all children.
if (!priority_policy_->update_in_progress_) {
priority_policy_->ChoosePriorityLocked();
}
}
void PriorityLb::ChildPriority::MaybeDeactivateLocked() {
if (deactivation_timer_ == nullptr) {
deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref());
}
}
void PriorityLb::ChildPriority::MaybeReactivateLocked() {
deactivation_timer_.reset();
}
//
// PriorityLb::ChildPriority::Helper
//
RefCountedPtr<SubchannelInterface>
PriorityLb::ChildPriority::Helper::CreateSubchannel(ServerAddress address,
const ChannelArgs& args) {
if (priority_->priority_policy_->shutting_down_) return nullptr;
return priority_->priority_policy_->channel_control_helper()
->CreateSubchannel(std::move(address), args);
}
void PriorityLb::ChildPriority::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) {
if (priority_->priority_policy_->shutting_down_) return;
// Notify the priority.
priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
}
void PriorityLb::ChildPriority::Helper::RequestReresolution() {
if (priority_->priority_policy_->shutting_down_) return;
if (priority_->ignore_reresolution_requests_) {
return;
}
priority_->priority_policy_->channel_control_helper()->RequestReresolution();
}
absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
return priority_->priority_policy_->channel_control_helper()->GetAuthority();
}
EventEngine* PriorityLb::ChildPriority::Helper::GetEventEngine() {
return priority_->priority_policy_->channel_control_helper()
->GetEventEngine();
}
void PriorityLb::ChildPriority::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (priority_->priority_policy_->shutting_down_) return;
priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
}
//
// factory
//
const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<PriorityLbChild>()
// Note: The "config" field requires custom parsing, so it's
// handled in JsonPostLoad() instead of here.
.OptionalField("ignore_reresolution_requests",
&PriorityLbChild::ignore_reresolution_requests)
.Finish();
return loader;
}
void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json,
const JsonArgs&,
ValidationErrors* errors) {
ValidationErrors::ScopedField field(errors, ".config");
auto it = json.object().find("config");
if (it == json.object().end()) {
errors->AddError("field not present");
return;
}
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
return;
}
config = std::move(*lb_config);
}
const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<PriorityLbConfig>()
.Field("children", &PriorityLbConfig::children_)
.Field("priorities", &PriorityLbConfig::priorities_)
.Finish();
return loader;
}
void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&,
ValidationErrors* errors) {
std::set<std::string> unknown_priorities;
for (const std::string& priority : priorities_) {
if (children_.find(priority) == children_.end()) {
unknown_priorities.insert(priority);
}
}
if (!unknown_priorities.empty()) {
errors->AddError(absl::StrCat("unknown priorit(ies): [",
absl::StrJoin(unknown_priorities, ", "),
"]"));
}
}
class PriorityLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<PriorityLb>(std::move(args));
}
absl::string_view name() const override { return kPriority; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
return LoadFromJson<RefCountedPtr<PriorityLbConfig>>(
json, JsonArgs(), "errors validating priority LB policy config");
}
};
} // namespace
void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<PriorityLbFactory>());
}
} // namespace grpc_core