blob: 9148e4e64992094a89db96982dce96975db4fb6e [file] [log] [blame]
//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h"
#include "src/core/ext/xds/certificate_provider_store.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_resource_type_impl.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/matchers/matchers.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_distributor.h"
#include "src/core/lib/security/credentials/tls/grpc_tls_certificate_provider.h"
#include "src/core/lib/security/credentials/xds/xds_credentials.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
TraceFlag grpc_cds_lb_trace(false, "cds_lb");
namespace {
constexpr absl::string_view kCds = "cds_experimental";
constexpr int kMaxAggregateClusterRecursionDepth = 16;
// Config for this LB policy.
class CdsLbConfig : public LoadBalancingPolicy::Config {
public:
explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
const std::string& cluster() const { return cluster_; }
absl::string_view name() const override { return kCds; }
private:
std::string cluster_;
};
// CDS LB policy.
class CdsLb : public LoadBalancingPolicy {
public:
CdsLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
absl::string_view name() const override { return kCds; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void ExitIdleLocked() override;
private:
// Watcher for getting cluster data from XdsClient.
class ClusterWatcher : public XdsClusterResourceType::WatcherInterface {
public:
ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnResourceChanged(XdsClusterResource cluster_data) override {
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
// TODO(roth): When we move to C++14, capture cluster_data with
// std::move().
[this, cluster_data]() mutable {
parent_->OnClusterChanged(name_, std::move(cluster_data));
Unref();
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
[this, status]() {
parent_->OnError(name_, status);
Unref();
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
Ref().release(); // Ref held by lambda
parent_->work_serializer()->Run(
[this]() {
parent_->OnResourceDoesNotExist(name_);
Unref();
},
DEBUG_LOCATION);
}
private:
RefCountedPtr<CdsLb> parent_;
std::string name_;
};
struct WatcherState {
// Pointer to watcher, to be used when cancelling.
// Not owned, so do not dereference.
ClusterWatcher* watcher = nullptr;
// Most recent update obtained from this watcher.
absl::optional<XdsClusterResource> update;
};
// Delegating helper to be passed to child policy.
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<CdsLb> parent_;
};
~CdsLb() override;
void ShutdownLocked() override;
absl::StatusOr<bool> GenerateDiscoveryMechanismForCluster(
const std::string& name, int depth, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_added);
void OnClusterChanged(const std::string& name,
XdsClusterResource cluster_data);
void OnError(const std::string& name, absl::Status status);
void OnResourceDoesNotExist(const std::string& name);
absl::Status UpdateXdsCertificateProvider(
const std::string& cluster_name, const XdsClusterResource& cluster_data);
void CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcher* watcher,
bool delay_unsubscription = false);
void MaybeDestroyChildPolicyLocked();
RefCountedPtr<CdsLbConfig> config_;
// Current channel args from the resolver.
ChannelArgs args_;
// The xds client.
RefCountedPtr<GrpcXdsClient> xds_client_;
// Maps from cluster name to the state for that cluster.
// The root of the tree is config_->cluster().
std::map<std::string, WatcherState> watchers_;
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Internal state.
bool shutting_down_ = false;
};
//
// CdsLb::Helper
//
RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
ServerAddress address, const ChannelArgs& args) {
if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args);
}
void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] state updated by child: %s (%s)", this,
ConnectivityStateName(state), status.ToString().c_str());
}
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
void CdsLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.",
parent_.get());
}
parent_->channel_control_helper()->RequestReresolution();
}
absl::string_view CdsLb::Helper::GetAuthority() {
return parent_->channel_control_helper()->GetAuthority();
}
void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (parent_->shutting_down_) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// CdsLb
//
CdsLb::CdsLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
xds_client_.get());
}
}
CdsLb::~CdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
}
}
void CdsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
}
shutting_down_ = true;
MaybeDestroyChildPolicyLocked();
if (xds_client_ != nullptr) {
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
watcher.first.c_str());
}
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/false);
}
watchers_.clear();
xds_client_.reset(DEBUG_LOCATION, "CdsLb");
}
args_ = ChannelArgs();
}
void CdsLb::MaybeDestroyChildPolicyLocked() {
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
}
void CdsLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void CdsLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void CdsLb::UpdateLocked(UpdateArgs args) {
// Update config.
auto old_config = std::move(config_);
config_ = std::move(args.config);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
config_->cluster().c_str());
}
// Update args.
args_ = std::move(args.args);
// If cluster name changed, cancel watcher and restart.
if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
if (old_config != nullptr) {
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
watcher.first.c_str());
}
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/true);
}
watchers_.clear();
}
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), config_->cluster());
watchers_[config_->cluster()].watcher = watcher.get();
XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(),
std::move(watcher));
}
}
// Generates the discovery mechanism config for the specified cluster name.
//
// If no CdsUpdate has been received for the cluster, starts the watcher
// if needed, and returns false. Otherwise, generates the discovery
// mechanism config, adds it to *discovery_mechanisms, and returns true.
//
// For aggregate clusters, may call itself recursively. Returns an
// error if depth exceeds kMaxAggregateClusterRecursionDepth.
absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
const std::string& name, int depth, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_added) {
if (depth == kMaxAggregateClusterRecursionDepth) {
return absl::FailedPreconditionError(
"aggregate cluster graph exceeds max depth");
}
if (!clusters_added->insert(name).second) {
return true; // Discovery mechanism already added from some other branch.
}
auto& state = watchers_[name];
// Create a new watcher if needed.
if (state.watcher == nullptr) {
auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
name.c_str());
}
state.watcher = watcher.get();
XdsClusterResourceType::StartWatch(xds_client_.get(), name,
std::move(watcher));
return false;
}
// Don't have the update we need yet.
if (!state.update.has_value()) return false;
// For AGGREGATE clusters, recursively expand to child clusters.
if (state.update->cluster_type ==
XdsClusterResource::ClusterType::AGGREGATE) {
bool missing_cluster = false;
for (const std::string& child_name :
state.update->prioritized_cluster_names) {
auto result = GenerateDiscoveryMechanismForCluster(
child_name, depth + 1, discovery_mechanisms, clusters_added);
if (!result.ok()) return result;
if (!*result) missing_cluster = true;
}
return !missing_cluster;
}
Json::Object mechanism = {
{"clusterName", name},
{"max_concurrent_requests", state.update->max_concurrent_requests},
};
if (state.update->outlier_detection.has_value()) {
auto& outlier_detection_update = state.update->outlier_detection.value();
Json::Object outlier_detection;
outlier_detection["interval"] =
outlier_detection_update.interval.ToJsonString();
outlier_detection["baseEjectionTime"] =
outlier_detection_update.base_ejection_time.ToJsonString();
outlier_detection["maxEjectionTime"] =
outlier_detection_update.max_ejection_time.ToJsonString();
outlier_detection["maxEjectionPercent"] =
outlier_detection_update.max_ejection_percent;
if (outlier_detection_update.success_rate_ejection.has_value()) {
outlier_detection["successRateEjection"] = Json::Object{
{"stdevFactor",
outlier_detection_update.success_rate_ejection->stdev_factor},
{"enforcementPercentage",
outlier_detection_update.success_rate_ejection
->enforcement_percentage},
{"minimumHosts",
outlier_detection_update.success_rate_ejection->minimum_hosts},
{"requestVolume",
outlier_detection_update.success_rate_ejection->request_volume},
};
}
if (outlier_detection_update.failure_percentage_ejection.has_value()) {
outlier_detection["failurePercentageEjection"] = Json::Object{
{"threshold",
outlier_detection_update.failure_percentage_ejection->threshold},
{"enforcementPercentage",
outlier_detection_update.failure_percentage_ejection
->enforcement_percentage},
{"minimumHosts",
outlier_detection_update.failure_percentage_ejection->minimum_hosts},
{"requestVolume", outlier_detection_update
.failure_percentage_ejection->request_volume},
};
}
mechanism["outlierDetection"] = std::move(outlier_detection);
}
switch (state.update->cluster_type) {
case XdsClusterResource::ClusterType::EDS:
mechanism["type"] = "EDS";
if (!state.update->eds_service_name.empty()) {
mechanism["edsServiceName"] = state.update->eds_service_name;
}
break;
case XdsClusterResource::ClusterType::LOGICAL_DNS:
mechanism["type"] = "LOGICAL_DNS";
mechanism["dnsHostname"] = state.update->dns_hostname;
break;
default:
GPR_ASSERT(0);
break;
}
if (state.update->lrs_load_reporting_server.has_value()) {
mechanism["lrsLoadReportingServer"] = GrpcXdsBootstrap::XdsServerToJson(
*state.update->lrs_load_reporting_server);
}
discovery_mechanisms->emplace_back(std::move(mechanism));
return true;
}
void CdsLb::OnClusterChanged(const std::string& name,
XdsClusterResource cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(
GPR_INFO,
"[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
}
// Store the update in the map if we are still interested in watching this
// cluster (i.e., it is not cancelled already).
// If we've already deleted this entry, then this is an update notification
// that was scheduled before the deletion, so we can just ignore it.
auto it = watchers_.find(name);
if (it == watchers_.end()) return;
it->second.update = cluster_data;
// Take care of integration with new certificate code.
absl::Status status =
UpdateXdsCertificateProvider(name, it->second.update.value());
if (!status.ok()) {
return OnError(name, status);
}
// Scan the map starting from the root cluster to generate the list of
// discovery mechanisms. If we don't have some of the data we need (i.e., we
// just started up and not all watchers have returned data yet), then don't
// update the child policy at all.
Json::Array discovery_mechanisms;
std::set<std::string> clusters_added;
auto result = GenerateDiscoveryMechanismForCluster(
config_->cluster(), /*depth=*/0, &discovery_mechanisms, &clusters_added);
if (!result.ok()) {
return OnError(name, result.status());
}
if (*result) {
// LB policy is configured by aggregate cluster, not by the individual
// underlying cluster that we may be processing an update for.
auto it = watchers_.find(config_->cluster());
GPR_ASSERT(it != watchers_.end());
const std::string& lb_policy = it->second.update->lb_policy;
// Construct config for child policy.
Json::Object xds_lb_policy;
if (lb_policy == "RING_HASH") {
xds_lb_policy["RING_HASH"] = Json::Object{
{"min_ring_size", cluster_data.min_ring_size},
{"max_ring_size", cluster_data.max_ring_size},
};
} else {
xds_lb_policy["ROUND_ROBIN"] = Json::Object();
}
Json::Object child_config = {
{"xdsLbPolicy",
Json::Array{
xds_lb_policy,
}},
{"discoveryMechanisms", std::move(discovery_mechanisms)},
};
Json json = Json::Array{
Json::Object{
{"xds_cluster_resolver_experimental", std::move(child_config)},
},
};
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
std::string json_str = json.Dump(/*indent=*/1);
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
this, json_str.c_str());
}
grpc_error_handle error = GRPC_ERROR_NONE;
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
json);
if (!config.ok()) {
OnError(name, absl::UnavailableError(config.status().message()));
return;
}
// Create child policy if not already present.
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args args;
args.work_serializer = work_serializer();
args.args = args_;
args.channel_control_helper = absl::make_unique<Helper>(Ref());
child_policy_ =
CoreConfiguration::Get()
.lb_policy_registry()
.CreateLoadBalancingPolicy((*config)->name(), std::move(args));
if (child_policy_ == nullptr) {
OnError(name, absl::UnavailableError("failed to create child policy"));
return;
}
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
std::string((*config)->name()).c_str(), child_policy_.get());
}
}
// Update child policy.
UpdateArgs args;
args.config = std::move(*config);
if (xds_certificate_provider_ != nullptr) {
args.args = args_.SetObject(xds_certificate_provider_);
} else {
args.args = args_;
}
child_policy_->UpdateLocked(std::move(args));
}
// Remove entries in watchers_ for any clusters not in clusters_added
for (auto it = watchers_.begin(); it != watchers_.end();) {
const std::string& cluster_name = it->first;
if (clusters_added.find(cluster_name) != clusters_added.end()) {
++it;
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
cluster_name.c_str());
}
CancelClusterDataWatch(cluster_name, it->second.watcher,
/*delay_unsubscription=*/false);
it = watchers_.erase(it);
}
}
void CdsLb::OnError(const std::string& name, absl::Status status) {
gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
this, name.c_str(), status.ToString().c_str());
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
if (child_policy_ == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(absl::UnavailableError(
absl::StrCat(name, ": ", status.ToString()))));
}
}
void CdsLb::OnResourceDoesNotExist(const std::string& name) {
gpr_log(GPR_ERROR,
"[cdslb %p] CDS resource for %s does not exist -- reporting "
"TRANSIENT_FAILURE",
this, name.c_str());
absl::Status status = absl::UnavailableError(
absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist"));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
MaybeDestroyChildPolicyLocked();
}
absl::Status CdsLb::UpdateXdsCertificateProvider(
const std::string& cluster_name, const XdsClusterResource& cluster_data) {
// Early out if channel is not configured to use xds security.
auto* channel_credentials = args_.GetObject<grpc_channel_credentials>();
if (channel_credentials == nullptr ||
channel_credentials->type() != XdsCredentials::Type()) {
xds_certificate_provider_ = nullptr;
return absl::OkStatus();
}
if (xds_certificate_provider_ == nullptr) {
xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
}
// Configure root cert.
absl::string_view root_provider_instance_name =
cluster_data.common_tls_context.certificate_validation_context
.ca_certificate_provider_instance.instance_name;
absl::string_view root_provider_cert_name =
cluster_data.common_tls_context.certificate_validation_context
.ca_certificate_provider_instance.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_root_provider;
if (!root_provider_instance_name.empty()) {
new_root_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(root_provider_instance_name);
if (new_root_provider == nullptr) {
return absl::UnavailableError(
absl::StrCat("Certificate provider instance name: \"",
root_provider_instance_name, "\" not recognized."));
}
}
if (root_certificate_provider_ != new_root_provider) {
if (root_certificate_provider_ != nullptr &&
root_certificate_provider_->interested_parties() != nullptr) {
grpc_pollset_set_del_pollset_set(
interested_parties(),
root_certificate_provider_->interested_parties());
}
if (new_root_provider != nullptr &&
new_root_provider->interested_parties() != nullptr) {
grpc_pollset_set_add_pollset_set(interested_parties(),
new_root_provider->interested_parties());
}
root_certificate_provider_ = std::move(new_root_provider);
}
xds_certificate_provider_->UpdateRootCertNameAndDistributor(
cluster_name, root_provider_cert_name,
root_certificate_provider_ == nullptr
? nullptr
: root_certificate_provider_->distributor());
// Configure identity cert.
absl::string_view identity_provider_instance_name =
cluster_data.common_tls_context.tls_certificate_provider_instance
.instance_name;
absl::string_view identity_provider_cert_name =
cluster_data.common_tls_context.tls_certificate_provider_instance
.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_identity_provider;
if (!identity_provider_instance_name.empty()) {
new_identity_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(identity_provider_instance_name);
if (new_identity_provider == nullptr) {
return absl::UnavailableError(
absl::StrCat("Certificate provider instance name: \"",
identity_provider_instance_name, "\" not recognized."));
}
}
if (identity_certificate_provider_ != new_identity_provider) {
if (identity_certificate_provider_ != nullptr &&
identity_certificate_provider_->interested_parties() != nullptr) {
grpc_pollset_set_del_pollset_set(
interested_parties(),
identity_certificate_provider_->interested_parties());
}
if (new_identity_provider != nullptr &&
new_identity_provider->interested_parties() != nullptr) {
grpc_pollset_set_add_pollset_set(
interested_parties(), new_identity_provider->interested_parties());
}
identity_certificate_provider_ = std::move(new_identity_provider);
}
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
cluster_name, identity_provider_cert_name,
identity_certificate_provider_ == nullptr
? nullptr
: identity_certificate_provider_->distributor());
// Configure SAN matchers.
const std::vector<StringMatcher>& match_subject_alt_names =
cluster_data.common_tls_context.certificate_validation_context
.match_subject_alt_names;
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
cluster_name, match_subject_alt_names);
return absl::OkStatus();
}
void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcher* watcher,
bool delay_unsubscription) {
if (xds_certificate_provider_ != nullptr) {
std::string name(cluster_name);
xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
}
XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name, watcher,
delay_unsubscription);
}
//
// factory
//
class CdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
auto xds_client =
args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION, "CdsLb");
if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"cds LB policy");
return nullptr;
}
return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
}
absl::string_view name() const override { return kCds; }
absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json& json) const override {
if (json.type() == Json::Type::JSON_NULL) {
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
// field or in the client API.
return absl::InvalidArgumentError(
"field:loadBalancingPolicy error:cds policy requires configuration. "
"Please use loadBalancingConfig field of service config instead.");
}
std::vector<std::string> errors;
// cluster name.
std::string cluster;
auto it = json.object_value().find("cluster");
if (it == json.object_value().end()) {
errors.emplace_back("required field 'cluster' not present");
} else if (it->second.type() != Json::Type::STRING) {
errors.emplace_back("field:cluster error:type should be string");
} else {
cluster = it->second.string_value();
}
if (!errors.empty()) {
return absl::InvalidArgumentError(
absl::StrCat("errors parsing CDS LB policy config: [",
absl::StrJoin(errors, "; "), "]"));
}
return MakeRefCounted<CdsLbConfig>(std::move(cluster));
}
};
} // namespace
void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder) {
builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
absl::make_unique<CdsLbFactory>());
}
} // namespace grpc_core