blob: a8cf390873c2704c6c94d7599187e5e7923b950f [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <string.h>
#include <algorithm>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
// IWYU pragma: no_include <type_traits>
namespace grpc_core {
TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
namespace {
using ::grpc_event_engine::experimental::EventEngine;
constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
// How long we keep a child around for after it has been removed from
// the config.
constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
// Config for weighted_target LB policy.
class WeightedTargetLbConfig : public LoadBalancingPolicy::Config {
public:
struct ChildConfig {
uint32_t weight;
RefCountedPtr<LoadBalancingPolicy::Config> config;
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors);
};
using TargetMap = std::map<std::string, ChildConfig>;
WeightedTargetLbConfig() = default;
WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
absl::string_view name() const override { return kWeightedTarget; }
const TargetMap& target_map() const { return target_map_; }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
private:
TargetMap target_map_;
};
// weighted_target LB policy.
class WeightedTargetLb : public LoadBalancingPolicy {
public:
explicit WeightedTargetLb(Args args);
absl::string_view name() const override { return kWeightedTarget; }
absl::Status UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
// A simple wrapper for ref-counting a picker from the child policy.
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
public:
explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// Picks a child using stateless WRR and then delegates to that
// child's picker.
class WeightedPicker : public SubchannelPicker {
public:
// Maintains a weighted list of pickers from each child that is in
// ready state. The first element in the pair represents the end of a
// range proportional to the child's weight. The start of the range
// is the previous value in the vector and is 0 for the first element.
using PickerList =
std::vector<std::pair<uint64_t, RefCountedPtr<ChildPickerWrapper>>>;
explicit WeightedPicker(PickerList pickers)
: pickers_(std::move(pickers)) {}
PickResult Pick(PickArgs args) override;
private:
PickerList pickers_;
absl::BitGen bit_gen_;
};
// Each WeightedChild holds a ref to its parent WeightedTargetLb.
class WeightedChild : public InternallyRefCounted<WeightedChild> {
public:
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
const std::string& name);
~WeightedChild() override;
void Orphan() override;
absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<ServerAddressList> addresses,
const std::string& resolution_note,
const ChannelArgs& args);
void ResetBackoffLocked();
void DeactivateLocked();
uint32_t weight() const { return weight_; }
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)) {}
~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<WeightedChild> weighted_child_;
};
class DelayedRemovalTimer
: public InternallyRefCounted<DelayedRemovalTimer> {
public:
explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
void Orphan() override;
private:
void OnTimerLocked();
RefCountedPtr<WeightedChild> weighted_child_;
absl::optional<EventEngine::TaskHandle> timer_handle_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker);
// The owning LB policy.
RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
const std::string name_;
uint32_t weight_ = 0;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
};
~WeightedTargetLb() override;
void ShutdownLocked() override;
void UpdateStateLocked();
// Current config from the resolver.
RefCountedPtr<WeightedTargetLbConfig> config_;
// Internal state.
bool shutting_down_ = false;
bool update_in_progress_ = false;
// Children.
std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
};
//
// WeightedTargetLb::WeightedPicker
//
WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
PickArgs args) {
// Generate a random number in [0, total weight).
const uint64_t key =
absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
// Find the index in pickers_ corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = pickers_.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (pickers_[mid].first > key) {
end_index = mid;
} else if (pickers_[mid].first < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(pickers_[index].first > key);
// Delegate to the child picker.
return pickers_[index].second->Pick(args);
}
//
// WeightedTargetLb
//
WeightedTargetLb::WeightedTargetLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this);
}
}
WeightedTargetLb::~WeightedTargetLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] destroying weighted_target LB policy",
this);
}
}
void WeightedTargetLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this);
}
shutting_down_ = true;
targets_.clear();
}
void WeightedTargetLb::ResetBackoffLocked() {
for (auto& p : targets_) p.second->ResetBackoffLocked();
}
absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
if (shutting_down_) return absl::OkStatus();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
}
update_in_progress_ = true;
// Update config.
config_ = std::move(args.config);
// Deactivate the targets not in the new config.
for (const auto& p : targets_) {
const std::string& name = p.first;
WeightedChild* child = p.second.get();
if (config_->target_map().find(name) == config_->target_map().end()) {
child->DeactivateLocked();
}
}
// Update all children.
absl::StatusOr<HierarchicalAddressMap> address_map =
MakeHierarchicalAddressMap(args.addresses);
std::vector<std::string> errors;
for (const auto& p : config_->target_map()) {
const std::string& name = p.first;
const WeightedTargetLbConfig::ChildConfig& config = p.second;
auto& target = targets_[name];
// Create child if it does not already exist.
if (target == nullptr) {
target = MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
absl::StatusOr<ServerAddressList> addresses;
if (address_map.ok()) {
addresses = std::move((*address_map)[name]);
} else {
addresses = address_map.status();
}
absl::Status status = target->UpdateLocked(config, std::move(addresses),
args.resolution_note, args.args);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("child ", name, ": ", status.ToString()));
}
}
update_in_progress_ = false;
if (config_->target_map().empty()) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"no children in weighted_target policy: ", args.resolution_note));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
std::make_unique<TransientFailurePicker>(status));
return absl::OkStatus();
}
UpdateStateLocked();
// Return status.
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}
void WeightedTargetLb::UpdateStateLocked() {
// If we're in the process of propagating an update from our parent to
// our children, ignore any updates that come from the children. We
// will instead return a new picker once the update has been seen by
// all children. This avoids unnecessary picker churn while an update
// is being propagated to our children.
if (update_in_progress_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] scanning children to determine "
"connectivity state",
this);
}
// Construct lists of child pickers with associated weights, one for
// children that are in state READY and another for children that are
// in state TRANSIENT_FAILURE. Each child is represented by a portion of
// the range proportional to its weight, such that the total range is the
// sum of the weights of all children.
WeightedPicker::PickerList ready_picker_list;
uint64_t ready_end = 0;
WeightedPicker::PickerList tf_picker_list;
uint64_t tf_end = 0;
// Also count the number of children in CONNECTING and IDLE, to determine
// the aggregated state.
size_t num_connecting = 0;
size_t num_idle = 0;
for (const auto& p : targets_) {
const std::string& child_name = p.first;
const WeightedChild* child = p.second.get();
// Skip the targets that are not in the latest update.
if (config_->target_map().find(child_name) == config_->target_map().end()) {
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] child=%s state=%s weight=%u picker=%p",
this, child_name.c_str(),
ConnectivityStateName(child->connectivity_state()),
child->weight(), child->picker_wrapper().get());
}
switch (child->connectivity_state()) {
case GRPC_CHANNEL_READY: {
GPR_ASSERT(child->weight() > 0);
ready_end += child->weight();
ready_picker_list.emplace_back(ready_end, child->picker_wrapper());
break;
}
case GRPC_CHANNEL_CONNECTING: {
++num_connecting;
break;
}
case GRPC_CHANNEL_IDLE: {
++num_idle;
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
GPR_ASSERT(child->weight() > 0);
tf_end += child->weight();
tf_picker_list.emplace_back(tf_end, child->picker_wrapper());
break;
}
default:
GPR_UNREACHABLE_CODE(return );
}
}
// Determine aggregated connectivity state.
grpc_connectivity_state connectivity_state;
if (!ready_picker_list.empty()) {
connectivity_state = GRPC_CHANNEL_READY;
} else if (num_connecting > 0) {
connectivity_state = GRPC_CHANNEL_CONNECTING;
} else if (num_idle > 0) {
connectivity_state = GRPC_CHANNEL_IDLE;
} else {
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
this, ConnectivityStateName(connectivity_state));
}
std::unique_ptr<SubchannelPicker> picker;
absl::Status status;
switch (connectivity_state) {
case GRPC_CHANNEL_READY:
picker = std::make_unique<WeightedPicker>(std::move(ready_picker_list));
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
picker =
std::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
picker = std::make_unique<WeightedPicker>(std::move(tf_picker_list));
}
channel_control_helper()->UpdateState(connectivity_state, status,
std::move(picker));
}
//
// WeightedTargetLb::WeightedChild::DelayedRemovalTimer
//
WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
: weighted_child_(std::move(weighted_child)),
engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
timer_handle_ =
engine_->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
self->weighted_child_->weighted_target_policy_->work_serializer()->Run(
[self = std::move(self)] { self->OnTimerLocked(); },
DEBUG_LOCATION);
});
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
if (timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: cancelling "
"delayed removal timer",
weighted_child_->weighted_target_policy_.get(),
weighted_child_.get(), weighted_child_->name_.c_str());
}
engine_->Cancel(*timer_handle_);
}
Unref();
}
void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
GPR_ASSERT(timer_handle_.has_value());
timer_handle_.reset();
weighted_child_->weighted_target_policy_->targets_.erase(
weighted_child_->name_);
}
//
// WeightedTargetLb::WeightedChild
//
WeightedTargetLb::WeightedChild::WeightedChild(
RefCountedPtr<WeightedTargetLb> weighted_target_policy,
const std::string& name)
: weighted_target_policy_(std::move(weighted_target_policy)), name_(name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
weighted_target_policy_.get(), this, name_.c_str());
}
}
WeightedTargetLb::WeightedChild::~WeightedChild() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: destroying child",
weighted_target_policy_.get(), this, name_.c_str());
}
weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
}
void WeightedTargetLb::WeightedChild::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
weighted_target_policy_.get(), this, name_.c_str());
}
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
grpc_pollset_set_del_pollset_set(
child_policy_->interested_parties(),
weighted_target_policy_->interested_parties());
child_policy_.reset();
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_wrapper_.reset();
delayed_removal_timer_.reset();
Unref();
}
OrphanablePtr<LoadBalancingPolicy>
WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_weighted_target_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: Created new child "
"policy handler %p",
weighted_target_policy_.get(), this, name_.c_str(),
lb_policy.get());
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(
lb_policy->interested_parties(),
weighted_target_policy_->interested_parties());
return lb_policy;
}
absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<ServerAddressList> addresses,
const std::string& resolution_note, const ChannelArgs& args) {
if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.
if (weight_ != config.weight &&
GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO, "[weighted_target_lb %p] WeightedChild %p %s: weight=%u",
weighted_target_policy_.get(), this, name_.c_str(), config.weight);
}
weight_ = config.weight;
// Reactivate if needed.
if (delayed_removal_timer_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: reactivating",
weighted_target_policy_.get(), this, name_.c_str());
}
delayed_removal_timer_.reset();
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.config = config.config;
update_args.addresses = std::move(addresses);
update_args.resolution_note = resolution_note;
update_args.args = args;
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: Updating child "
"policy handler %p",
weighted_target_policy_.get(), this, name_.c_str(),
child_policy_.get());
}
return child_policy_->UpdateLocked(std::move(update_args));
}
void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked();
}
void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
// Cache the picker in the WeightedChild.
picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: connectivity "
"state update: state=%s (%s) picker_wrapper=%p",
weighted_target_policy_.get(), this, name_.c_str(),
ConnectivityStateName(state), status.ToString().c_str(),
picker_wrapper_.get());
}
// If the child reports IDLE, immediately tell it to exit idle.
if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
// Decide what state to report for aggregation purposes.
// If the last recorded state was TRANSIENT_FAILURE and the new state
// is something other than READY, don't change the state.
if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
state == GRPC_CHANNEL_READY) {
connectivity_state_ = state;
}
// Notify the LB policy.
weighted_target_policy_->UpdateStateLocked();
}
void WeightedTargetLb::WeightedChild::DeactivateLocked() {
// If already deactivated, don't do that again.
if (weight_ == 0) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,
"[weighted_target_lb %p] WeightedChild %p %s: deactivating",
weighted_target_policy_.get(), this, name_.c_str());
}
// Set the child weight to 0 so that future picker won't contain this child.
weight_ = 0;
// Start a timer to delete the child.
delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
}
//
// WeightedTargetLb::WeightedChild::Helper
//
RefCountedPtr<SubchannelInterface>
WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr;
return weighted_child_->weighted_target_policy_->channel_control_helper()
->CreateSubchannel(std::move(address), args);
}
void WeightedTargetLb::WeightedChild::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return;
weighted_child_->OnConnectivityStateUpdateLocked(state, status,
std::move(picker));
}
void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() {
if (weighted_child_->weighted_target_policy_->shutting_down_) return;
weighted_child_->weighted_target_policy_->channel_control_helper()
->RequestReresolution();
}
absl::string_view WeightedTargetLb::WeightedChild::Helper::GetAuthority() {
return weighted_child_->weighted_target_policy_->channel_control_helper()
->GetAuthority();
}
void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
TraceSeverity severity, absl::string_view message) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return;
weighted_child_->weighted_target_policy_->channel_control_helper()
->AddTraceEvent(severity, message);
}
//
// factory
//
const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<ChildConfig>()
// Note: The config field requires custom parsing, so it's
// handled in JsonPostLoad() instead.
.Field("weight", &ChildConfig::weight)
.Finish();
return loader;
}
void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(
const Json& json, const JsonArgs&, ValidationErrors* errors) {
ValidationErrors::ScopedField field(errors, ".childPolicy");
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors->AddError("field not present");
return;
}
auto lb_config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
it->second);
if (!lb_config.ok()) {
errors->AddError(lb_config.status().message());
return;
}
config = std::move(*lb_config);
}
const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<WeightedTargetLbConfig>()
.Field("targets", &WeightedTargetLbConfig::target_map_)
.Finish();
return loader;
}
class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<WeightedTargetLb>(std::move(args));
} // namespace
absl::string_view name() const override { return kWeightedTarget; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
// weighted_target was mentioned as a policy in the deprecated
// loadBalancingPolicy field or in the client API.
return absl::InvalidArgumentError(
"field:loadBalancingPolicy error:weighted_target policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
return LoadRefCountedFromJson<WeightedTargetLbConfig>(
json, JsonArgs(), "errors validating weighted_target LB policy config");
}
}; // namespace grpc_core
} // namespace
void RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
std::make_unique<WeightedTargetLbFactory>());
}
} // namespace grpc_core