blob: fdb6d98233a119a6fe064ffd20c2e416cf8c1582 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace {
//
// global circuit breaker atomic map
//
class CircuitBreakerCallCounterMap {
public:
using Key =
std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
class CallCounter : public RefCounted<CallCounter> {
public:
explicit CallCounter(Key key) : key_(std::move(key)) {}
~CallCounter() override;
uint32_t Load() {
return concurrent_requests_.load(std::memory_order_seq_cst);
}
uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
void Decrement() { concurrent_requests_.fetch_sub(1); }
private:
Key key_;
std::atomic<uint32_t> concurrent_requests_{0};
};
RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
const std::string& eds_service_name);
private:
Mutex mu_;
std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
};
CircuitBreakerCallCounterMap* const g_call_counter_map =
new CircuitBreakerCallCounterMap;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
const std::string& eds_service_name) {
Key key(cluster, eds_service_name);
RefCountedPtr<CallCounter> result;
MutexLock lock(&mu_);
auto it = map_.find(key);
if (it == map_.end()) {
it = map_.insert({key, nullptr}).first;
} else {
result = it->second->RefIfNonZero();
}
if (result == nullptr) {
result = MakeRefCounted<CallCounter>(std::move(key));
it->second = result.get();
}
return result;
}
CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
MutexLock lock(&g_call_counter_map->mu_);
auto it = g_call_counter_map->map_.find(key_);
if (it != g_call_counter_map->map_.end() && it->second == this) {
g_call_counter_map->map_.erase(it);
}
}
//
// LB policy
//
constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
// Config for xDS Cluster Impl LB policy.
class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
public:
XdsClusterImplLbConfig(
RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string cluster_name, std::string eds_service_name,
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server,
uint32_t max_concurrent_requests,
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config)
: child_policy_(std::move(child_policy)),
cluster_name_(std::move(cluster_name)),
eds_service_name_(std::move(eds_service_name)),
lrs_load_reporting_server_(std::move(lrs_load_reporting_server)),
max_concurrent_requests_(max_concurrent_requests),
drop_config_(std::move(drop_config)) {}
absl::string_view name() const override { return kXdsClusterImpl; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
const std::string& cluster_name() const { return cluster_name_; }
const std::string& eds_service_name() const { return eds_service_name_; }
const absl::optional<XdsBootstrap::XdsServer>& lrs_load_reporting_server()
const {
return lrs_load_reporting_server_;
};
uint32_t max_concurrent_requests() const { return max_concurrent_requests_; }
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config() const {
return drop_config_;
}
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_;
std::string eds_service_name_;
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
};
// xDS Cluster Impl LB policy.
class XdsClusterImplLb : public LoadBalancingPolicy {
public:
XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
absl::string_view name() const override { return kXdsClusterImpl; }
void UpdateLocked(UpdateArgs args) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
private:
class StatsSubchannelWrapper : public DelegatingSubchannel {
public:
StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel,
RefCountedPtr<XdsClusterLocalityStats> locality_stats)
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_stats_(std::move(locality_stats)) {}
XdsClusterLocalityStats* locality_stats() const {
return locality_stats_.get();
}
private:
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
};
// A simple wrapper for ref-counting a picker from the child policy.
class RefCountedPicker : public RefCounted<RefCountedPicker> {
public:
explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
: picker_(std::move(picker)) {}
PickResult Pick(PickArgs args) { return picker_->Pick(args); }
private:
std::unique_ptr<SubchannelPicker> picker_;
};
// A picker that wraps the picker from the child to perform drops.
class Picker : public SubchannelPicker {
public:
Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker);
PickResult Pick(PickArgs args) override;
private:
class SubchannelCallTracker;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
uint32_t max_concurrent_requests_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<RefCountedPicker> picker_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
: xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
~Helper() override {
xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper");
}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
};
~XdsClusterImplLb() override;
void ShutdownLocked() override;
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
void UpdateChildPolicyLocked(absl::StatusOr<ServerAddressList> addresses,
std::string resolution_note,
const ChannelArgs& args);
void MaybeUpdatePickerLocked();
// Current config from the resolver.
RefCountedPtr<XdsClusterImplLbConfig> config_;
// Current concurrent number of requests.
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
// Internal state.
bool shutting_down_ = false;
// The xds client.
RefCountedPtr<XdsClient> xds_client_;
// The stats for client-side load reporting.
RefCountedPtr<XdsClusterDropStats> drop_stats_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Latest state and picker reported by the child policy.
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr<RefCountedPicker> picker_;
};
//
// XdsClusterImplLb::Picker::SubchannelCallTracker
//
class XdsClusterImplLb::Picker::SubchannelCallTracker
: public LoadBalancingPolicy::SubchannelCallTrackerInterface {
public:
SubchannelCallTracker(
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker,
RefCountedPtr<XdsClusterLocalityStats> locality_stats,
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
: original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)),
locality_stats_(std::move(locality_stats)),
call_counter_(std::move(call_counter)) {}
~SubchannelCallTracker() override {
locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
GPR_DEBUG_ASSERT(!started_);
}
void Start() override {
// Increment number of calls in flight.
call_counter_->Increment();
// Record a call started.
if (locality_stats_ != nullptr) {
locality_stats_->AddCallStarted();
}
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Start();
}
#ifndef NDEBUG
started_ = true;
#endif
}
void Finish(FinishArgs args) override {
// Delegate if needed.
if (original_subchannel_call_tracker_ != nullptr) {
original_subchannel_call_tracker_->Finish(args);
}
// Record call completion for load reporting.
if (locality_stats_ != nullptr) {
locality_stats_->AddCallFinished(!args.status.ok());
}
// Decrement number of calls in flight.
call_counter_->Decrement();
#ifndef NDEBUG
started_ = false;
#endif
}
private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_;
RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
#ifndef NDEBUG
bool started_ = false;
#endif
};
//
// XdsClusterImplLb::Picker
//
XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<RefCountedPicker> picker)
: call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_(
xds_cluster_impl_lb->config_->max_concurrent_requests()),
drop_config_(xds_cluster_impl_lb->config_->drop_config()),
drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
xds_cluster_impl_lb, this);
}
}
LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
// Handle EDS drops.
const std::string* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
return PickResult::Drop(absl::UnavailableError(
absl::StrCat("EDS-configured drop: ", *drop_category)));
}
// Check if we exceeded the max concurrent requests circuit breaking limit.
// Note: We check the value here, but we don't actually increment the
// counter for the current request until the channel calls the subchannel
// call tracker's Start() method. This means that we may wind up
// allowing more concurrent requests than the configured limit.
if (call_counter_->Load() >= max_concurrent_requests_) {
if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
}
// If we're not dropping the call, we should always have a child picker.
if (picker_ == nullptr) { // Should never happen.
return PickResult::Fail(absl::InternalError(
"xds_cluster_impl picker not given any child picker"));
}
// Not dropping, so delegate to child picker.
PickResult result = picker_->Pick(args);
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
RefCountedPtr<XdsClusterLocalityStats> locality_stats;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker");
// Unwrap subchannel to pass back up the stack.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Inject subchannel call tracker to record call completion.
complete_pick->subchannel_call_tracker =
absl::make_unique<SubchannelCallTracker>(
std::move(complete_pick->subchannel_call_tracker),
std::move(locality_stats),
call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
} else {
// TODO(roth): We should ideally also record call failures here in the case
// where a pick fails. This is challenging, because we don't know which
// picks are for wait_for_ready RPCs or how many times we'll return a
// failure for the same wait_for_ready RPC.
}
return result;
}
//
// XdsClusterImplLb
//
XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
this, xds_client_.get());
}
}
XdsClusterImplLb::~XdsClusterImplLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
this);
}
}
void XdsClusterImplLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
}
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
// Drop our ref to the child's picker, in case it's holding a ref to
// the child.
picker_.reset();
drop_stats_.reset();
xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
}
void XdsClusterImplLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void XdsClusterImplLb::ResetBackoffLocked() {
// The XdsClient will have its backoff reset by the xds resolver, so we
// don't need to do it here.
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
}
// Update config.
const bool is_initial_update = config_ == nullptr;
auto old_config = std::move(config_);
config_ = std::move(args.config);
// On initial update, create drop stats.
if (is_initial_update) {
if (config_->lrs_load_reporting_server().has_value()) {
drop_stats_ = xds_client_->AddClusterDropStats(
config_->lrs_load_reporting_server().value(), config_->cluster_name(),
config_->eds_service_name());
if (drop_stats_ == nullptr) {
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
"LRS server %s, cluster %s, EDS service name %s, load "
"reporting for drops will not be done.",
this, config_->lrs_load_reporting_server()->server_uri.c_str(),
config_->cluster_name().c_str(),
config_->eds_service_name().c_str());
}
}
call_counter_ = g_call_counter_map->GetOrCreate(
config_->cluster_name(), config_->eds_service_name());
} else {
// Cluster name, EDS service name, and LRS server name should never
// change, because the xds_cluster_resolver policy above us should be
// swapped out if that happens.
GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
GPR_ASSERT(config_->lrs_load_reporting_server() ==
old_config->lrs_load_reporting_server());
}
// Update picker if max_concurrent_requests has changed.
if (is_initial_update || config_->max_concurrent_requests() !=
old_config->max_concurrent_requests()) {
MaybeUpdatePickerLocked();
}
// Update child policy.
UpdateChildPolicyLocked(std::move(args.addresses),
std::move(args.resolution_note), args.args);
}
void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported.
if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
auto drop_picker = absl::make_unique<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity (drop all): "
"state=READY picker=%p",
this, drop_picker.get());
}
channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
std::move(drop_picker));
return;
}
// Otherwise, update only if we have a child picker.
if (picker_ != nullptr) {
auto drop_picker = absl::make_unique<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] updating connectivity: state=%s "
"status=(%s) picker=%p",
this, ConnectivityStateName(state_), status_.ToString().c_str(),
drop_picker.get());
}
channel_control_helper()->UpdateState(state_, status_,
std::move(drop_picker));
}
}
OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
const ChannelArgs& args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_xds_cluster_impl_lb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] Created new child policy handler %p",
this, lb_policy.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void XdsClusterImplLb::UpdateChildPolicyLocked(
absl::StatusOr<ServerAddressList> addresses, std::string resolution_note,
const ChannelArgs& args) {
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
// Construct update args.
UpdateArgs update_args;
update_args.addresses = std::move(addresses);
update_args.resolution_note = std::move(resolution_note);
update_args.config = config_->child_policy();
update_args.args =
args.Set(GRPC_ARG_XDS_CLUSTER_NAME, config_->cluster_name());
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
//
// XdsClusterImplLb::Helper
//
RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the EdsPicker.
if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
.has_value()) {
RefCountedPtr<XdsLocalityName> locality_name;
auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
if (attribute != nullptr) {
const auto* locality_attr =
static_cast<const XdsLocalityAttribute*>(attribute);
locality_name = locality_attr->locality_name();
}
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
.value(),
xds_cluster_impl_policy_->config_->cluster_name(),
xds_cluster_impl_policy_->config_->eds_service_name(),
std::move(locality_name));
if (locality_stats != nullptr) {
return MakeRefCounted<StatsSubchannelWrapper>(
xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args),
std::move(locality_stats));
}
gpr_log(GPR_ERROR,
"[xds_cluster_impl_lb %p] Failed to get locality stats object for "
"LRS server %s, cluster %s, EDS service name %s; load reports will "
"not be generated (not wrapping subchannel)",
this,
xds_cluster_impl_policy_->config_->lrs_load_reporting_server()
->server_uri.c_str(),
xds_cluster_impl_policy_->config_->cluster_name().c_str(),
xds_cluster_impl_policy_->config_->eds_service_name().c_str());
}
// Load reporting not enabled, so don't wrap the subchannel.
return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
void XdsClusterImplLb::Helper::UpdateState(
grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (xds_cluster_impl_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
"[xds_cluster_impl_lb %p] child connectivity state update: "
"state=%s (%s) "
"picker=%p",
xds_cluster_impl_policy_.get(), ConnectivityStateName(state),
status.ToString().c_str(), picker.get());
}
// Save the state and picker.
xds_cluster_impl_policy_->state_ = state;
xds_cluster_impl_policy_->status_ = status;
xds_cluster_impl_policy_->picker_ =
MakeRefCounted<RefCountedPicker>(std::move(picker));
// Wrap the picker and return it to the channel.
xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
}
void XdsClusterImplLb::Helper::RequestReresolution() {
if (xds_cluster_impl_policy_->shutting_down_) return;
xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
}
absl::string_view XdsClusterImplLb::Helper::GetAuthority() {
return xds_cluster_impl_policy_->channel_control_helper()->GetAuthority();
}
void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (xds_cluster_impl_policy_->shutting_down_) return;
xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
message);
}
//
// factory
//
class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
"XdsClusterImplLb");
if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"xds_cluster_impl LB policy");
return nullptr;
}
return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
std::move(args));
}
absl::string_view name() const override { return kXdsClusterImpl; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
// This policy was configured in the deprecated loadBalancingPolicy
// field or in the client API.
return absl::InvalidArgumentError(
"field:loadBalancingPolicy error:xds_cluster_impl policy requires "
"configuration. Please use loadBalancingConfig field of service "
"config instead.");
}
std::vector<std::string> errors;
// Child policy.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
auto it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
errors.emplace_back("field:childPolicy error:required field missing");
} else {
auto config = CoreConfiguration::Get()
.lb_policy_registry()
.ParseLoadBalancingConfig(it->second);
if (!config.ok()) {
errors.emplace_back(absl::StrCat("field:childPolicy error:",
config.status().message()));
} else {
child_policy = std::move(*config);
}
}
// Cluster name.
std::string cluster_name;
it = json.object_value().find("clusterName");
if (it == json.object_value().end()) {
errors.emplace_back("field:clusterName error:required field missing");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:clusterName error:type should be string");
} else {
cluster_name = it->second.string_value();
}
// EDS service name.
std::string eds_service_name;
it = json.object_value().find("edsServiceName");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:edsServiceName error:type should be string");
} else {
eds_service_name = it->second.string_value();
}
}
// LRS load reporting server name.
absl::optional<XdsBootstrap::XdsServer> lrs_load_reporting_server;
it = json.object_value().find("lrsLoadReportingServer");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::OBJECT) {
errors.emplace_back(
"field:lrsLoadReportingServer error:type should be object");
} else {
grpc_error_handle parser_error;
lrs_load_reporting_server = GrpcXdsBootstrap::XdsServerParse(
it->second.object_value(), &parser_error);
if (!GRPC_ERROR_IS_NONE(parser_error)) {
errors.emplace_back(
absl::StrCat("error parsing lrs_load_reporting_server: ",
grpc_error_std_string(parser_error)));
GRPC_ERROR_UNREF(parser_error);
}
}
}
// Max concurrent requests.
uint32_t max_concurrent_requests = 1024;
it = json.object_value().find("maxConcurrentRequests");
if (it != json.object_value().end()) {
if (it->second.type() != Json::Type::NUMBER) {
errors.emplace_back(
"field:max_concurrent_requests error:must be of type number");
} else {
max_concurrent_requests =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
}
// Drop config.
auto drop_config = MakeRefCounted<XdsEndpointResource::DropConfig>();
it = json.object_value().find("dropCategories");
if (it == json.object_value().end()) {
errors.emplace_back("field:dropCategories error:required field missing");
} else {
absl::Status status = ParseDropCategories(it->second, drop_config.get());
if (!status.ok()) errors.emplace_back(status.message());
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrCat(
"errors parseing xds_cluster_impl_experimental LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<XdsClusterImplLbConfig>(
std::move(child_policy), std::move(cluster_name),
std::move(eds_service_name), std::move(lrs_load_reporting_server),
max_concurrent_requests, std::move(drop_config));
}
private:
static absl::Status ParseDropCategories(
const Json& json, XdsEndpointResource::DropConfig* drop_config) {
if (json.type() != Json::Type::ARRAY) {
return absl::InvalidArgumentError("dropCategories field is not an array");
}
std::vector<std::string> errors;
for (size_t i = 0; i < json.array_value().size(); ++i) {
const Json& entry = json.array_value()[i];
absl::Status status = ParseDropCategory(entry, drop_config);
if (!status.ok()) {
errors.emplace_back(
absl::StrCat("error parsing index ", i, ": ", status.message()));
}
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing dropCategories field: [",
absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}
static absl::Status ParseDropCategory(
const Json& json, XdsEndpointResource::DropConfig* drop_config) {
if (json.type() != Json::Type::OBJECT) {
return absl::InvalidArgumentError(
"dropCategories entry is not an object");
}
std::vector<std::string> errors;
std::string category;
auto it = json.object_value().find("category");
if (it == json.object_value().end()) {
errors.emplace_back("\"category\" field not present");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("\"category\" field is not a string");
} else {
category = it->second.string_value();
}
uint32_t requests_per_million = 0;
it = json.object_value().find("requests_per_million");
if (it == json.object_value().end()) {
errors.emplace_back("\"requests_per_million\" field is not present");
} else if (it->second.type() != Json::Type::NUMBER) {
errors.emplace_back("\"requests_per_million\" field is not a number");
} else {
requests_per_million =
gpr_parse_nonnegative_int(it->second.string_value().c_str());
}
if (!errors.empty()) {
return absl::InvalidArgumentError(absl::StrJoin(errors, "; "));
}
drop_config->AddCategory(std::move(category), requests_per_million);
return absl::OkStatus();
}
};
} // namespace
void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
absl::make_unique<XdsClusterImplLbFactory>());
}
} // namespace grpc_core