| // |
| // Copyright 2018 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <inttypes.h> |
| |
| #include <algorithm> |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/status/status.h" |
| #include "absl/status/statusor.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_join.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/event_engine/event_engine.h> |
| #include <grpc/grpc.h> |
| #include <grpc/impl/connectivity_state.h> |
| #include <grpc/support/log.h> |
| |
| #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/config/core_configuration.h" |
| #include "src/core/lib/debug/trace.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/time.h" |
| #include "src/core/lib/gprpp/validation_errors.h" |
| #include "src/core/lib/gprpp/work_serializer.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/iomgr/pollset_set.h" |
| #include "src/core/lib/json/json.h" |
| #include "src/core/lib/json/json_args.h" |
| #include "src/core/lib/json/json_object_loader.h" |
| #include "src/core/lib/load_balancing/lb_policy.h" |
| #include "src/core/lib/load_balancing/lb_policy_factory.h" |
| #include "src/core/lib/load_balancing/lb_policy_registry.h" |
| #include "src/core/lib/load_balancing/subchannel_interface.h" |
| #include "src/core/lib/resolver/server_address.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_lb_priority_trace(false, "priority_lb"); |
| |
| namespace { |
| |
| using ::grpc_event_engine::experimental::EventEngine; |
| |
| constexpr absl::string_view kPriority = "priority_experimental"; |
| |
| // How long we keep a child around for after it is no longer being used |
| // (either because it has been removed from the config or because we |
| // have switched to a higher-priority child). |
| constexpr Duration kChildRetentionInterval = Duration::Minutes(15); |
| |
| // Default for how long we wait for a newly created child to get connected |
| // before starting to attempt the next priority. Overridable via channel arg. |
| constexpr Duration kDefaultChildFailoverTimeout = Duration::Seconds(10); |
| |
| // Config for priority LB policy. |
| class PriorityLbConfig : public LoadBalancingPolicy::Config { |
| public: |
| struct PriorityLbChild { |
| RefCountedPtr<LoadBalancingPolicy::Config> config; |
| bool ignore_reresolution_requests = false; |
| |
| static const JsonLoaderInterface* JsonLoader(const JsonArgs&); |
| void JsonPostLoad(const Json& json, const JsonArgs&, |
| ValidationErrors* errors); |
| }; |
| |
| PriorityLbConfig() = default; |
| |
| PriorityLbConfig(const PriorityLbConfig&) = delete; |
| PriorityLbConfig& operator=(const PriorityLbConfig&) = delete; |
| |
| PriorityLbConfig(PriorityLbConfig&& other) = delete; |
| PriorityLbConfig& operator=(PriorityLbConfig&& other) = delete; |
| |
| absl::string_view name() const override { return kPriority; } |
| |
| const std::map<std::string, PriorityLbChild>& children() const { |
| return children_; |
| } |
| const std::vector<std::string>& priorities() const { return priorities_; } |
| |
| static const JsonLoaderInterface* JsonLoader(const JsonArgs&); |
| void JsonPostLoad(const Json& json, const JsonArgs&, |
| ValidationErrors* errors); |
| |
| private: |
| std::map<std::string, PriorityLbChild> children_; |
| std::vector<std::string> priorities_; |
| }; |
| |
| // priority LB policy. |
| class PriorityLb : public LoadBalancingPolicy { |
| public: |
| explicit PriorityLb(Args args); |
| |
| absl::string_view name() const override { return kPriority; } |
| |
| absl::Status UpdateLocked(UpdateArgs args) override; |
| void ExitIdleLocked() override; |
| void ResetBackoffLocked() override; |
| |
| private: |
| // Each ChildPriority holds a ref to the PriorityLb. |
| class ChildPriority : public InternallyRefCounted<ChildPriority> { |
| public: |
| ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name); |
| |
| ~ChildPriority() override { |
| priority_policy_.reset(DEBUG_LOCATION, "ChildPriority"); |
| } |
| |
| const std::string& name() const { return name_; } |
| |
| absl::Status UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config, |
| bool ignore_reresolution_requests); |
| void ExitIdleLocked(); |
| void ResetBackoffLocked(); |
| void MaybeDeactivateLocked(); |
| void MaybeReactivateLocked(); |
| |
| void Orphan() override; |
| |
| RefCountedPtr<SubchannelPicker> GetPicker(); |
| |
| grpc_connectivity_state connectivity_state() const { |
| return connectivity_state_; |
| } |
| |
| const absl::Status& connectivity_status() const { |
| return connectivity_status_; |
| } |
| |
| bool FailoverTimerPending() const { return failover_timer_ != nullptr; } |
| |
| private: |
| class Helper : public ChannelControlHelper { |
| public: |
| explicit Helper(RefCountedPtr<ChildPriority> priority) |
| : priority_(std::move(priority)) {} |
| |
| ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); } |
| |
| RefCountedPtr<SubchannelInterface> CreateSubchannel( |
| ServerAddress address, const ChannelArgs& args) override; |
| void UpdateState(grpc_connectivity_state state, |
| const absl::Status& status, |
| RefCountedPtr<SubchannelPicker> picker) override; |
| void RequestReresolution() override; |
| absl::string_view GetAuthority() override; |
| EventEngine* GetEventEngine() override; |
| void AddTraceEvent(TraceSeverity severity, |
| absl::string_view message) override; |
| |
| private: |
| RefCountedPtr<ChildPriority> priority_; |
| }; |
| |
| class DeactivationTimer : public InternallyRefCounted<DeactivationTimer> { |
| public: |
| explicit DeactivationTimer(RefCountedPtr<ChildPriority> child_priority); |
| |
| void Orphan() override; |
| |
| private: |
| void OnTimerLocked(); |
| |
| RefCountedPtr<ChildPriority> child_priority_; |
| absl::optional<EventEngine::TaskHandle> timer_handle_; |
| }; |
| |
| class FailoverTimer : public InternallyRefCounted<FailoverTimer> { |
| public: |
| explicit FailoverTimer(RefCountedPtr<ChildPriority> child_priority); |
| |
| void Orphan() override; |
| |
| private: |
| void OnTimerLocked(); |
| |
| RefCountedPtr<ChildPriority> child_priority_; |
| absl::optional<EventEngine::TaskHandle> timer_handle_; |
| }; |
| |
| // Methods for dealing with the child policy. |
| OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
| const ChannelArgs& args); |
| |
| void OnConnectivityStateUpdateLocked( |
| grpc_connectivity_state state, const absl::Status& status, |
| RefCountedPtr<SubchannelPicker> picker); |
| |
| RefCountedPtr<PriorityLb> priority_policy_; |
| const std::string name_; |
| bool ignore_reresolution_requests_ = false; |
| |
| OrphanablePtr<LoadBalancingPolicy> child_policy_; |
| |
| grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
| absl::Status connectivity_status_; |
| RefCountedPtr<SubchannelPicker> picker_; |
| |
| bool seen_ready_or_idle_since_transient_failure_ = true; |
| |
| OrphanablePtr<DeactivationTimer> deactivation_timer_; |
| OrphanablePtr<FailoverTimer> failover_timer_; |
| }; |
| |
| ~PriorityLb() override; |
| |
| void ShutdownLocked() override; |
| |
| // Returns the priority of the specified child name, or UINT32_MAX if |
| // the child is not in the current priority list. |
| uint32_t GetChildPriorityLocked(const std::string& child_name) const; |
| |
| // Deletes a child. Called when the child's deactivation timer fires. |
| void DeleteChild(ChildPriority* child); |
| |
| // Iterates through the list of priorities to choose one: |
| // - If the child for a priority doesn't exist, creates it. |
| // - If a child's failover timer is pending, selects that priority |
| // while we wait for the child to attempt to connect. |
| // - If the child is connected, selects that priority. |
| // - Otherwise, continues on to the next child. |
| // Delegates to the last child if none of the children are connecting. |
| // Reports TRANSIENT_FAILURE if the priority list is empty. |
| // |
| // This method is idempotent; it should yield the same result every |
| // time as a function of the state of the children. |
| void ChoosePriorityLocked(); |
| |
| // Sets the specified priority as the current priority. |
| // Optionally deactivates any children at lower priorities. |
| // Returns the child's picker to the channel. |
| void SetCurrentPriorityLocked(int32_t priority, |
| bool deactivate_lower_priorities, |
| const char* reason); |
| |
| const Duration child_failover_timeout_; |
| |
| // Current channel args and config from the resolver. |
| ChannelArgs args_; |
| RefCountedPtr<PriorityLbConfig> config_; |
| absl::StatusOr<HierarchicalAddressMap> addresses_; |
| std::string resolution_note_; |
| |
| // Internal state. |
| bool shutting_down_ = false; |
| |
| bool update_in_progress_ = false; |
| |
| // All children that currently exist. |
| // Some of these children may be in deactivated state. |
| std::map<std::string, OrphanablePtr<ChildPriority>> children_; |
| // The priority that is being used. |
| uint32_t current_priority_ = UINT32_MAX; |
| }; |
| |
| // |
| // PriorityLb |
| // |
| |
| PriorityLb::PriorityLb(Args args) |
| : LoadBalancingPolicy(std::move(args)), |
| child_failover_timeout_(std::max( |
| Duration::Zero(), |
| args.args |
| .GetDurationFromIntMillis(GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS) |
| .value_or(kDefaultChildFailoverTimeout))) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] created", this); |
| } |
| } |
| |
| PriorityLb::~PriorityLb() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this); |
| } |
| } |
| |
| void PriorityLb::ShutdownLocked() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this); |
| } |
| shutting_down_ = true; |
| children_.clear(); |
| } |
| |
| void PriorityLb::ExitIdleLocked() { |
| if (current_priority_ != UINT32_MAX) { |
| const std::string& child_name = config_->priorities()[current_priority_]; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] exiting IDLE for current priority %d child %s", |
| this, current_priority_, child_name.c_str()); |
| } |
| children_[child_name]->ExitIdleLocked(); |
| } |
| } |
| |
| void PriorityLb::ResetBackoffLocked() { |
| for (const auto& p : children_) p.second->ResetBackoffLocked(); |
| } |
| |
| absl::Status PriorityLb::UpdateLocked(UpdateArgs args) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] received update", this); |
| } |
| // Update config. |
| config_ = std::move(args.config); |
| // Update args. |
| args_ = std::move(args.args); |
| // Update addresses. |
| addresses_ = MakeHierarchicalAddressMap(args.addresses); |
| resolution_note_ = std::move(args.resolution_note); |
| // Check all existing children against the new config. |
| update_in_progress_ = true; |
| std::vector<std::string> errors; |
| for (const auto& p : children_) { |
| const std::string& child_name = p.first; |
| auto& child = p.second; |
| auto config_it = config_->children().find(child_name); |
| if (config_it == config_->children().end()) { |
| // Existing child not found in new config. Deactivate it. |
| child->MaybeDeactivateLocked(); |
| } else { |
| // Existing child found in new config. Update it. |
| absl::Status status = |
| child->UpdateLocked(config_it->second.config, |
| config_it->second.ignore_reresolution_requests); |
| if (!status.ok()) { |
| errors.emplace_back( |
| absl::StrCat("child ", child_name, ": ", status.ToString())); |
| } |
| } |
| } |
| update_in_progress_ = false; |
| // Try to get connected. |
| ChoosePriorityLocked(); |
| // Return status. |
| if (!errors.empty()) { |
| return absl::UnavailableError(absl::StrCat( |
| "errors from children: [", absl::StrJoin(errors, "; "), "]")); |
| } |
| return absl::OkStatus(); |
| } |
| |
| uint32_t PriorityLb::GetChildPriorityLocked( |
| const std::string& child_name) const { |
| for (uint32_t priority = 0; priority < config_->priorities().size(); |
| ++priority) { |
| if (config_->priorities()[priority] == child_name) return priority; |
| } |
| return UINT32_MAX; |
| } |
| |
| void PriorityLb::DeleteChild(ChildPriority* child) { |
| children_.erase(child->name()); |
| } |
| |
| void PriorityLb::ChoosePriorityLocked() { |
| // If priority list is empty, report TF. |
| if (config_->priorities().empty()) { |
| absl::Status status = |
| absl::UnavailableError("priority policy has empty priority list"); |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, status, |
| MakeRefCounted<TransientFailurePicker>(status)); |
| return; |
| } |
| // Iterate through priorities, searching for one in READY or IDLE, |
| // creating new children as needed. |
| current_priority_ = UINT32_MAX; |
| for (uint32_t priority = 0; priority < config_->priorities().size(); |
| ++priority) { |
| // If the child for the priority does not exist yet, create it. |
| const std::string& child_name = config_->priorities()[priority]; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this, |
| priority, child_name.c_str()); |
| } |
| auto& child = children_[child_name]; |
| // Create child if needed. |
| if (child == nullptr) { |
| child = MakeOrphanable<ChildPriority>( |
| Ref(DEBUG_LOCATION, "ChildPriority"), child_name); |
| auto child_config = config_->children().find(child_name); |
| GPR_DEBUG_ASSERT(child_config != config_->children().end()); |
| // TODO(roth): If the child reports a non-OK status with the |
| // update, we need to propagate that back to the resolver somehow. |
| (void)child->UpdateLocked( |
| child_config->second.config, |
| child_config->second.ignore_reresolution_requests); |
| } else { |
| // The child already exists. Reactivate if needed. |
| child->MaybeReactivateLocked(); |
| } |
| // Select this child if it is in states READY or IDLE. |
| if (child->connectivity_state() == GRPC_CHANNEL_READY || |
| child->connectivity_state() == GRPC_CHANNEL_IDLE) { |
| SetCurrentPriorityLocked( |
| priority, /*deactivate_lower_priorities=*/true, |
| ConnectivityStateName(child->connectivity_state())); |
| return; |
| } |
| // Select this child if its failover timer is pending. |
| if (child->FailoverTimerPending()) { |
| SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false, |
| "failover timer pending"); |
| return; |
| } |
| // Child has been failing for a while. Move on to the next priority. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] skipping priority %u, child %s: state=%s, " |
| "failover timer not pending", |
| this, priority, child_name.c_str(), |
| ConnectivityStateName(child->connectivity_state())); |
| } |
| } |
| // If we didn't find any priority to try, pick the first one in state |
| // CONNECTING. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] no priority reachable, checking for CONNECTING " |
| "priority to delegate to", |
| this); |
| } |
| for (uint32_t priority = 0; priority < config_->priorities().size(); |
| ++priority) { |
| // If the child for the priority does not exist yet, create it. |
| const std::string& child_name = config_->priorities()[priority]; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this, |
| priority, child_name.c_str()); |
| } |
| auto& child = children_[child_name]; |
| GPR_ASSERT(child != nullptr); |
| if (child->connectivity_state() == GRPC_CHANNEL_CONNECTING) { |
| SetCurrentPriorityLocked(priority, /*deactivate_lower_priorities=*/false, |
| "CONNECTING (pass 2)"); |
| return; |
| } |
| } |
| // Did not find any child in CONNECTING, delegate to last child. |
| SetCurrentPriorityLocked(config_->priorities().size() - 1, |
| /*deactivate_lower_priorities=*/false, |
| "no usable children"); |
| } |
| |
| void PriorityLb::SetCurrentPriorityLocked(int32_t priority, |
| bool deactivate_lower_priorities, |
| const char* reason) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] selecting priority %u, child %s (%s, " |
| "deactivate_lower_priorities=%d)", |
| this, priority, config_->priorities()[priority].c_str(), reason, |
| deactivate_lower_priorities); |
| } |
| current_priority_ = priority; |
| if (deactivate_lower_priorities) { |
| for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) { |
| const std::string& child_name = config_->priorities()[p]; |
| auto it = children_.find(child_name); |
| if (it != children_.end()) it->second->MaybeDeactivateLocked(); |
| } |
| } |
| auto& child = children_[config_->priorities()[priority]]; |
| GPR_ASSERT(child != nullptr); |
| channel_control_helper()->UpdateState(child->connectivity_state(), |
| child->connectivity_status(), |
| child->GetPicker()); |
| } |
| |
| // |
| // PriorityLb::ChildPriority::DeactivationTimer |
| // |
| |
| PriorityLb::ChildPriority::DeactivationTimer::DeactivationTimer( |
| RefCountedPtr<PriorityLb::ChildPriority> child_priority) |
| : child_priority_(std::move(child_priority)) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): deactivating -- will remove in " |
| "%" PRId64 "ms", |
| child_priority_->priority_policy_.get(), |
| child_priority_->name_.c_str(), child_priority_.get(), |
| kChildRetentionInterval.millis()); |
| } |
| timer_handle_ = |
| child_priority_->priority_policy_->channel_control_helper() |
| ->GetEventEngine() |
| ->RunAfter(kChildRetentionInterval, [self = Ref(DEBUG_LOCATION, |
| "Timer")]() mutable { |
| ApplicationCallbackExecCtx callback_exec_ctx; |
| ExecCtx exec_ctx; |
| auto self_ptr = self.get(); |
| self_ptr->child_priority_->priority_policy_->work_serializer()->Run( |
| [self = std::move(self)]() { self->OnTimerLocked(); }, |
| DEBUG_LOCATION); |
| }); |
| } |
| |
| void PriorityLb::ChildPriority::DeactivationTimer::Orphan() { |
| if (timer_handle_.has_value()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", |
| child_priority_->priority_policy_.get(), |
| child_priority_->name_.c_str(), child_priority_.get()); |
| } |
| child_priority_->priority_policy_->channel_control_helper() |
| ->GetEventEngine() |
| ->Cancel(*timer_handle_); |
| timer_handle_.reset(); |
| } |
| Unref(); |
| } |
| |
| void PriorityLb::ChildPriority::DeactivationTimer::OnTimerLocked() { |
| if (timer_handle_.has_value()) { |
| timer_handle_.reset(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): deactivation timer fired, " |
| "deleting child", |
| child_priority_->priority_policy_.get(), |
| child_priority_->name_.c_str(), child_priority_.get()); |
| } |
| child_priority_->priority_policy_->DeleteChild(child_priority_.get()); |
| } |
| } |
| |
| // |
| // PriorityLb::ChildPriority::FailoverTimer |
| // |
| |
| PriorityLb::ChildPriority::FailoverTimer::FailoverTimer( |
| RefCountedPtr<PriorityLb::ChildPriority> child_priority) |
| : child_priority_(std::move(child_priority)) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log( |
| GPR_INFO, |
| "[priority_lb %p] child %s (%p): starting failover timer for %" PRId64 |
| "ms", |
| child_priority_->priority_policy_.get(), child_priority_->name_.c_str(), |
| child_priority_.get(), |
| child_priority_->priority_policy_->child_failover_timeout_.millis()); |
| } |
| timer_handle_ = |
| child_priority_->priority_policy_->channel_control_helper() |
| ->GetEventEngine() |
| ->RunAfter( |
| child_priority_->priority_policy_->child_failover_timeout_, |
| [self = Ref(DEBUG_LOCATION, "Timer")]() mutable { |
| ApplicationCallbackExecCtx callback_exec_ctx; |
| ExecCtx exec_ctx; |
| auto self_ptr = self.get(); |
| self_ptr->child_priority_->priority_policy_->work_serializer() |
| ->Run([self = std::move(self)]() { self->OnTimerLocked(); }, |
| DEBUG_LOCATION); |
| }); |
| } |
| |
| void PriorityLb::ChildPriority::FailoverTimer::Orphan() { |
| if (timer_handle_.has_value()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): cancelling failover timer", |
| child_priority_->priority_policy_.get(), |
| child_priority_->name_.c_str(), child_priority_.get()); |
| } |
| child_priority_->priority_policy_->channel_control_helper() |
| ->GetEventEngine() |
| ->Cancel(*timer_handle_); |
| timer_handle_.reset(); |
| } |
| Unref(); |
| } |
| |
| void PriorityLb::ChildPriority::FailoverTimer::OnTimerLocked() { |
| if (timer_handle_.has_value()) { |
| timer_handle_.reset(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): failover timer fired, " |
| "reporting TRANSIENT_FAILURE", |
| child_priority_->priority_policy_.get(), |
| child_priority_->name_.c_str(), child_priority_.get()); |
| } |
| child_priority_->OnConnectivityStateUpdateLocked( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"), |
| nullptr); |
| } |
| } |
| |
| // |
| // PriorityLb::ChildPriority |
| // |
| |
| PriorityLb::ChildPriority::ChildPriority( |
| RefCountedPtr<PriorityLb> priority_policy, std::string name) |
| : priority_policy_(std::move(priority_policy)), name_(std::move(name)) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)", |
| priority_policy_.get(), name_.c_str(), this); |
| } |
| // Start the failover timer. |
| failover_timer_ = MakeOrphanable<FailoverTimer>(Ref()); |
| } |
| |
| void PriorityLb::ChildPriority::Orphan() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned", |
| priority_policy_.get(), name_.c_str(), this); |
| } |
| failover_timer_.reset(); |
| deactivation_timer_.reset(); |
| // Remove the child policy's interested_parties pollset_set from the |
| // xDS policy. |
| grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
| priority_policy_->interested_parties()); |
| child_policy_.reset(); |
| // Drop our ref to the child's picker, in case it's holding a ref to |
| // the child. |
| picker_.reset(); |
| Unref(DEBUG_LOCATION, "ChildPriority+Orphan"); |
| } |
| |
| RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
| PriorityLb::ChildPriority::GetPicker() { |
| if (picker_ == nullptr) { |
| return MakeRefCounted<QueuePicker>( |
| priority_policy_->Ref(DEBUG_LOCATION, "QueuePicker")); |
| } |
| return picker_; |
| } |
| |
| absl::Status PriorityLb::ChildPriority::UpdateLocked( |
| RefCountedPtr<LoadBalancingPolicy::Config> config, |
| bool ignore_reresolution_requests) { |
| if (priority_policy_->shutting_down_) return absl::OkStatus(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", |
| priority_policy_.get(), name_.c_str(), this); |
| } |
| ignore_reresolution_requests_ = ignore_reresolution_requests; |
| // Create policy if needed. |
| if (child_policy_ == nullptr) { |
| child_policy_ = CreateChildPolicyLocked(priority_policy_->args_); |
| } |
| // Construct update args. |
| UpdateArgs update_args; |
| update_args.config = std::move(config); |
| if (priority_policy_->addresses_.ok()) { |
| update_args.addresses = (*priority_policy_->addresses_)[name_]; |
| } else { |
| update_args.addresses = priority_policy_->addresses_.status(); |
| } |
| update_args.resolution_note = priority_policy_->resolution_note_; |
| update_args.args = priority_policy_->args_; |
| // Update the policy. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): updating child policy handler %p", |
| priority_policy_.get(), name_.c_str(), this, child_policy_.get()); |
| } |
| return child_policy_->UpdateLocked(std::move(update_args)); |
| } |
| |
| OrphanablePtr<LoadBalancingPolicy> |
| PriorityLb::ChildPriority::CreateChildPolicyLocked(const ChannelArgs& args) { |
| LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.work_serializer = priority_policy_->work_serializer(); |
| lb_policy_args.args = args; |
| lb_policy_args.channel_control_helper = |
| std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
| OrphanablePtr<LoadBalancingPolicy> lb_policy = |
| MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
| &grpc_lb_priority_trace); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): created new child policy " |
| "handler %p", |
| priority_policy_.get(), name_.c_str(), this, lb_policy.get()); |
| } |
| // Add the parent's interested_parties pollset_set to that of the newly |
| // created child policy. This will make the child policy progress upon |
| // activity on the parent LB, which in turn is tied to the application's call. |
| grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
| priority_policy_->interested_parties()); |
| return lb_policy; |
| } |
| |
| void PriorityLb::ChildPriority::ExitIdleLocked() { |
| child_policy_->ExitIdleLocked(); |
| } |
| |
| void PriorityLb::ChildPriority::ResetBackoffLocked() { |
| child_policy_->ResetBackoffLocked(); |
| } |
| |
| void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked( |
| grpc_connectivity_state state, const absl::Status& status, |
| RefCountedPtr<SubchannelPicker> picker) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { |
| gpr_log(GPR_INFO, |
| "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p", |
| priority_policy_.get(), name_.c_str(), this, |
| ConnectivityStateName(state), status.ToString().c_str(), |
| picker.get()); |
| } |
| // Store the state and picker. |
| connectivity_state_ = state; |
| connectivity_status_ = status; |
| // When the failover timer fires, this method will be called with picker |
| // set to null, because we want to consider the child to be in |
| // TRANSIENT_FAILURE, but we have no new picker to report. In that case, |
| // just keep using the old picker, in case we wind up delegating to this |
| // child when all priorities are failing. |
| if (picker != nullptr) picker_ = std::move(picker); |
| // If we transition to state CONNECTING and we've not seen |
| // TRANSIENT_FAILURE more recently than READY or IDLE, start failover |
| // timer if not already pending. |
| // In any other state, update seen_ready_or_idle_since_transient_failure_ |
| // and cancel failover timer. |
| if (state == GRPC_CHANNEL_CONNECTING) { |
| if (seen_ready_or_idle_since_transient_failure_ && |
| failover_timer_ == nullptr) { |
| failover_timer_ = MakeOrphanable<FailoverTimer>(Ref()); |
| } |
| } else if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE) { |
| seen_ready_or_idle_since_transient_failure_ = true; |
| failover_timer_.reset(); |
| } else if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| seen_ready_or_idle_since_transient_failure_ = false; |
| failover_timer_.reset(); |
| } |
| // Call the LB policy's ChoosePriorityLocked() to choose a priority to |
| // use based on the updated state of this child. |
| // |
| // Note that if we're in the process of propagating an update from our |
| // parent to our children, we skip this, because we don't want to |
| // choose a new priority based on inconsistent state. Instead, the |
| // policy will choose a new priority once the update has been seen by |
| // all children. |
| if (!priority_policy_->update_in_progress_) { |
| priority_policy_->ChoosePriorityLocked(); |
| } |
| } |
| |
| void PriorityLb::ChildPriority::MaybeDeactivateLocked() { |
| if (deactivation_timer_ == nullptr) { |
| deactivation_timer_ = MakeOrphanable<DeactivationTimer>(Ref()); |
| } |
| } |
| |
| void PriorityLb::ChildPriority::MaybeReactivateLocked() { |
| deactivation_timer_.reset(); |
| } |
| |
| // |
| // PriorityLb::ChildPriority::Helper |
| // |
| |
| RefCountedPtr<SubchannelInterface> |
| PriorityLb::ChildPriority::Helper::CreateSubchannel(ServerAddress address, |
| const ChannelArgs& args) { |
| if (priority_->priority_policy_->shutting_down_) return nullptr; |
| return priority_->priority_policy_->channel_control_helper() |
| ->CreateSubchannel(std::move(address), args); |
| } |
| |
| void PriorityLb::ChildPriority::Helper::UpdateState( |
| grpc_connectivity_state state, const absl::Status& status, |
| RefCountedPtr<SubchannelPicker> picker) { |
| if (priority_->priority_policy_->shutting_down_) return; |
| // Notify the priority. |
| priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker)); |
| } |
| |
| void PriorityLb::ChildPriority::Helper::RequestReresolution() { |
| if (priority_->priority_policy_->shutting_down_) return; |
| if (priority_->ignore_reresolution_requests_) { |
| return; |
| } |
| priority_->priority_policy_->channel_control_helper()->RequestReresolution(); |
| } |
| |
| absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() { |
| return priority_->priority_policy_->channel_control_helper()->GetAuthority(); |
| } |
| |
| EventEngine* PriorityLb::ChildPriority::Helper::GetEventEngine() { |
| return priority_->priority_policy_->channel_control_helper() |
| ->GetEventEngine(); |
| } |
| |
| void PriorityLb::ChildPriority::Helper::AddTraceEvent( |
| TraceSeverity severity, absl::string_view message) { |
| if (priority_->priority_policy_->shutting_down_) return; |
| priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity, |
| message); |
| } |
| |
| // |
| // factory |
| // |
| |
| const JsonLoaderInterface* PriorityLbConfig::PriorityLbChild::JsonLoader( |
| const JsonArgs&) { |
| static const auto* loader = |
| JsonObjectLoader<PriorityLbChild>() |
| // Note: The "config" field requires custom parsing, so it's |
| // handled in JsonPostLoad() instead of here. |
| .OptionalField("ignore_reresolution_requests", |
| &PriorityLbChild::ignore_reresolution_requests) |
| .Finish(); |
| return loader; |
| } |
| |
| void PriorityLbConfig::PriorityLbChild::JsonPostLoad(const Json& json, |
| const JsonArgs&, |
| ValidationErrors* errors) { |
| ValidationErrors::ScopedField field(errors, ".config"); |
| auto it = json.object().find("config"); |
| if (it == json.object().end()) { |
| errors->AddError("field not present"); |
| return; |
| } |
| auto lb_config = |
| CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( |
| it->second); |
| if (!lb_config.ok()) { |
| errors->AddError(lb_config.status().message()); |
| return; |
| } |
| config = std::move(*lb_config); |
| } |
| |
| const JsonLoaderInterface* PriorityLbConfig::JsonLoader(const JsonArgs&) { |
| static const auto* loader = |
| JsonObjectLoader<PriorityLbConfig>() |
| .Field("children", &PriorityLbConfig::children_) |
| .Field("priorities", &PriorityLbConfig::priorities_) |
| .Finish(); |
| return loader; |
| } |
| |
| void PriorityLbConfig::JsonPostLoad(const Json& /*json*/, const JsonArgs&, |
| ValidationErrors* errors) { |
| std::set<std::string> unknown_priorities; |
| for (const std::string& priority : priorities_) { |
| if (children_.find(priority) == children_.end()) { |
| unknown_priorities.insert(priority); |
| } |
| } |
| if (!unknown_priorities.empty()) { |
| errors->AddError(absl::StrCat("unknown priorit(ies): [", |
| absl::StrJoin(unknown_priorities, ", "), |
| "]")); |
| } |
| } |
| |
| class PriorityLbFactory : public LoadBalancingPolicyFactory { |
| public: |
| OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
| LoadBalancingPolicy::Args args) const override { |
| return MakeOrphanable<PriorityLb>(std::move(args)); |
| } |
| |
| absl::string_view name() const override { return kPriority; } |
| |
| absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>> |
| ParseLoadBalancingConfig(const Json& json) const override { |
| return LoadFromJson<RefCountedPtr<PriorityLbConfig>>( |
| json, JsonArgs(), "errors validating priority LB policy config"); |
| } |
| }; |
| |
| } // namespace |
| |
| void RegisterPriorityLbPolicy(CoreConfiguration::Builder* builder) { |
| builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( |
| std::make_unique<PriorityLbFactory>()); |
| } |
| |
| } // namespace grpc_core |