blob: 49bff27695812a1fc37c8e873e6c2f15ebe479de [file] [log] [blame]
//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <string.h>
#include "absl/strings/str_cat.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/xds/xds_certificate_provider.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/security/credentials/xds/xds_credentials.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_core {
TraceFlag grpc_cds_lb_trace(false, "cds_lb");
namespace {
constexpr char kCds[] = "cds_experimental";
// Config for this LB policy.
class CdsLbConfig : public LoadBalancingPolicy::Config {
public:
explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
const std::string& cluster() const { return cluster_; }
const char* name() const override { return kCds; }
private:
std::string cluster_;
};
// CDS LB policy.
class CdsLb : public LoadBalancingPolicy {
public:
CdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
const char* name() const override { return kCds; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
void ExitIdleLocked() override;
private:
// Watcher for getting cluster data from XdsClient.
class ClusterWatcher : public XdsClient::ClusterWatcherInterface {
public:
ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
: parent_(std::move(parent)), name_(std::move(name)) {}
void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
new Notifier(parent_, name_, std::move(cluster_data));
}
void OnError(grpc_error_handle error) override {
new Notifier(parent_, name_, error);
}
void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
private:
class Notifier {
public:
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
XdsApi::CdsUpdate update);
Notifier(RefCountedPtr<CdsLb> parent, std::string name,
grpc_error_handle error);
explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
private:
enum Type { kUpdate, kError, kDoesNotExist };
static void RunInExecCtx(void* arg, grpc_error_handle error);
void RunInWorkSerializer(grpc_error_handle error);
RefCountedPtr<CdsLb> parent_;
std::string name_;
grpc_closure closure_;
XdsApi::CdsUpdate update_;
Type type_;
};
RefCountedPtr<CdsLb> parent_;
std::string name_;
};
struct WatcherState {
// Pointer to watcher, to be used when cancelling.
// Not owned, so do not dereference.
ClusterWatcher* watcher = nullptr;
// Most recent update obtained from this watcher.
absl::optional<XdsApi::CdsUpdate> update;
};
// Delegating helper to be passed to child policy.
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<CdsLb> parent_;
};
~CdsLb() override;
void ShutdownLocked() override;
bool GenerateDiscoveryMechanismForCluster(
const std::string& name, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_needed);
void OnClusterChanged(const std::string& name,
XdsApi::CdsUpdate cluster_data);
void OnError(const std::string& name, grpc_error_handle error);
void OnResourceDoesNotExist(const std::string& name);
grpc_error_handle UpdateXdsCertificateProvider(
const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data);
void CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher,
bool delay_unsubscription = false);
void MaybeDestroyChildPolicyLocked();
RefCountedPtr<CdsLbConfig> config_;
// Current channel args from the resolver.
const grpc_channel_args* args_ = nullptr;
// The xds client.
RefCountedPtr<XdsClient> xds_client_;
// Maps from cluster name to the state for that cluster.
// The root of the tree is config_->cluster().
std::map<std::string, WatcherState> watchers_;
RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
// Child LB policy.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Internal state.
bool shutting_down_ = false;
};
//
// CdsLb::ClusterWatcher::Notifier
//
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name,
XdsApi::CdsUpdate update)
: parent_(std::move(parent)),
name_(std::move(name)),
update_(std::move(update)),
type_(kUpdate) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name,
grpc_error_handle error)
: parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
}
CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
std::string name)
: parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
grpc_error_handle error) {
Notifier* self = static_cast<Notifier*>(arg);
GRPC_ERROR_REF(error);
self->parent_->work_serializer()->Run(
[self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
}
void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(
grpc_error_handle error) {
switch (type_) {
case kUpdate:
parent_->OnClusterChanged(name_, std::move(update_));
break;
case kError:
parent_->OnError(name_, error);
break;
case kDoesNotExist:
parent_->OnResourceDoesNotExist(name_);
break;
};
delete this;
}
//
// CdsLb::Helper
//
RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args);
}
void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO,
"[cdslb %p] state updated by child: %s message_state: (%s)", this,
ConnectivityStateName(state), status.ToString().c_str());
}
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
void CdsLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.",
parent_.get());
}
parent_->channel_control_helper()->RequestReresolution();
}
void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (parent_->shutting_down_) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// CdsLb
//
CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
xds_client_.get());
}
}
CdsLb::~CdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
}
}
void CdsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
}
shutting_down_ = true;
MaybeDestroyChildPolicyLocked();
if (xds_client_ != nullptr) {
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
watcher.first.c_str());
}
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/false);
}
watchers_.clear();
xds_client_.reset(DEBUG_LOCATION, "CdsLb");
}
grpc_channel_args_destroy(args_);
args_ = nullptr;
}
void CdsLb::MaybeDestroyChildPolicyLocked() {
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
}
void CdsLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
void CdsLb::ExitIdleLocked() {
if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
}
void CdsLb::UpdateLocked(UpdateArgs args) {
// Update config.
auto old_config = std::move(config_);
config_ = std::move(args.config);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
config_->cluster().c_str());
}
// Update args.
grpc_channel_args_destroy(args_);
args_ = args.args;
args.args = nullptr;
// If cluster name changed, cancel watcher and restart.
if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
if (old_config != nullptr) {
for (auto& watcher : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
watcher.first.c_str());
}
CancelClusterDataWatch(watcher.first, watcher.second.watcher,
/*delay_unsubscription=*/true);
}
watchers_.clear();
}
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
watchers_[config_->cluster()].watcher = watcher.get();
xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
}
}
// This method will attempt to generate one or multiple entries of discovery
// mechanism recursively:
// For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be
// generated cluster name, type and other data from the CdsUpdate inserted into
// the entry and the entry appended to the array of entries.
// Note, discovery mechanism entry can be generated if an CdsUpdate is
// available; otherwise, just return false. For cluster type AGGREGATE,
// recursively call the method for each child cluster.
bool CdsLb::GenerateDiscoveryMechanismForCluster(
const std::string& name, Json::Array* discovery_mechanisms,
std::set<std::string>* clusters_needed) {
clusters_needed->insert(name);
auto& state = watchers_[name];
// Create a new watcher if needed.
if (state.watcher == nullptr) {
auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
name.c_str());
}
state.watcher = watcher.get();
xds_client_->WatchClusterData(name, std::move(watcher));
return false;
}
// Don't have the update we need yet.
if (!state.update.has_value()) return false;
// For AGGREGATE clusters, recursively expand to child clusters.
if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) {
bool missing_cluster = false;
for (const std::string& child_name :
state.update->prioritized_cluster_names) {
if (!GenerateDiscoveryMechanismForCluster(
child_name, discovery_mechanisms, clusters_needed)) {
missing_cluster = true;
}
}
return !missing_cluster;
}
std::string type;
switch (state.update->cluster_type) {
case XdsApi::CdsUpdate::ClusterType::EDS:
type = "EDS";
break;
case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS:
type = "LOGICAL_DNS";
break;
default:
GPR_ASSERT(0);
break;
}
Json::Object mechanism = {
{"clusterName", name},
{"max_concurrent_requests", state.update->max_concurrent_requests},
{"type", std::move(type)},
};
if (!state.update->eds_service_name.empty()) {
mechanism["edsServiceName"] = state.update->eds_service_name;
}
if (state.update->lrs_load_reporting_server_name.has_value()) {
mechanism["lrsLoadReportingServerName"] =
state.update->lrs_load_reporting_server_name.value();
}
discovery_mechanisms->emplace_back(std::move(mechanism));
return true;
}
void CdsLb::OnClusterChanged(const std::string& name,
XdsApi::CdsUpdate cluster_data) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(
GPR_INFO,
"[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
}
// Store the update in the map if we are still interested in watching this
// cluster (i.e., it is not cancelled already).
// If we've already deleted this entry, then this is an update notification
// that was scheduled before the deletion, so we can just ignore it.
auto it = watchers_.find(name);
if (it == watchers_.end()) return;
it->second.update = cluster_data;
// Take care of integration with new certificate code.
grpc_error_handle error = GRPC_ERROR_NONE;
error = UpdateXdsCertificateProvider(name, it->second.update.value());
if (error != GRPC_ERROR_NONE) {
return OnError(name, error);
}
// Scan the map starting from the root cluster to generate the list of
// discovery mechanisms. If we don't have some of the data we need (i.e., we
// just started up and not all watchers have returned data yet), then don't
// update the child policy at all.
Json::Array discovery_mechanisms;
std::set<std::string> clusters_needed;
if (GenerateDiscoveryMechanismForCluster(
config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
// Construct config for child policy.
Json::Object xds_lb_policy;
if (cluster_data.lb_policy == "RING_HASH") {
std::string hash_function;
switch (cluster_data.hash_function) {
case XdsApi::CdsUpdate::HashFunction::XX_HASH:
hash_function = "XX_HASH";
break;
case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
hash_function = "MURMUR_HASH_2";
break;
default:
GPR_ASSERT(0);
break;
}
xds_lb_policy["RING_HASH"] = Json::Object{
{"min_ring_size", cluster_data.min_ring_size},
{"max_ring_size", cluster_data.max_ring_size},
{"hash_function", hash_function},
};
} else {
xds_lb_policy["ROUND_ROBIN"] = Json::Object();
}
Json::Object child_config = {
{"xdsLbPolicy",
Json::Array{
xds_lb_policy,
}},
{"discoveryMechanisms", std::move(discovery_mechanisms)},
};
Json json = Json::Array{
Json::Object{
{"xds_cluster_resolver_experimental", std::move(child_config)},
},
};
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
std::string json_str = json.Dump(/*indent=*/1);
gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
this, json_str.c_str());
}
RefCountedPtr<LoadBalancingPolicy::Config> config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
if (error != GRPC_ERROR_NONE) {
OnError(name, error);
return;
}
// Create child policy if not already present.
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args args;
args.work_serializer = work_serializer();
args.args = args_;
args.channel_control_helper = absl::make_unique<Helper>(Ref());
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
config->name(), std::move(args));
if (child_policy_ == nullptr) {
OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to create child policy"));
return;
}
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
config->name(), child_policy_.get());
}
}
// Update child policy.
UpdateArgs args;
args.config = std::move(config);
if (xds_certificate_provider_ != nullptr) {
grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
} else {
args.args = grpc_channel_args_copy(args_);
}
child_policy_->UpdateLocked(std::move(args));
}
// Remove entries in watchers_ for any clusters not in clusters_needed
for (auto it = watchers_.begin(); it != watchers_.end();) {
const std::string& cluster_name = it->first;
if (clusters_needed.find(cluster_name) != clusters_needed.end()) {
++it;
continue;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
cluster_name.c_str());
}
CancelClusterDataWatch(cluster_name, it->second.watcher,
/*delay_unsubscription=*/false);
it = watchers_.erase(it);
}
}
void CdsLb::OnError(const std::string& name, grpc_error_handle error) {
gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
this, name.c_str(), grpc_error_std_string(error).c_str());
// Go into TRANSIENT_FAILURE if we have not yet created the child
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
if (child_policy_ == nullptr) {
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
} else {
GRPC_ERROR_UNREF(error);
}
}
void CdsLb::OnResourceDoesNotExist(const std::string& name) {
gpr_log(GPR_ERROR,
"[cdslb %p] CDS resource for %s does not exist -- reporting "
"TRANSIENT_FAILURE",
this, name.c_str());
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("CDS resource \"", config_->cluster(),
"\" does not exist")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
MaybeDestroyChildPolicyLocked();
}
grpc_error_handle CdsLb::UpdateXdsCertificateProvider(
const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) {
// Early out if channel is not configured to use xds security.
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args_);
if (channel_credentials == nullptr ||
channel_credentials->type() != kCredentialsTypeXds) {
xds_certificate_provider_ = nullptr;
return GRPC_ERROR_NONE;
}
if (xds_certificate_provider_ == nullptr) {
xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
}
// Configure root cert.
absl::string_view root_provider_instance_name =
cluster_data.common_tls_context.combined_validation_context
.validation_context_certificate_provider_instance.instance_name;
absl::string_view root_provider_cert_name =
cluster_data.common_tls_context.combined_validation_context
.validation_context_certificate_provider_instance.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_root_provider;
if (!root_provider_instance_name.empty()) {
new_root_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(root_provider_instance_name);
if (new_root_provider == nullptr) {
return grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("Certificate provider instance name: \"",
root_provider_instance_name, "\" not recognized.")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
}
}
if (root_certificate_provider_ != new_root_provider) {
if (root_certificate_provider_ != nullptr &&
root_certificate_provider_->interested_parties() != nullptr) {
grpc_pollset_set_del_pollset_set(
interested_parties(),
root_certificate_provider_->interested_parties());
}
if (new_root_provider != nullptr &&
new_root_provider->interested_parties() != nullptr) {
grpc_pollset_set_add_pollset_set(interested_parties(),
new_root_provider->interested_parties());
}
root_certificate_provider_ = std::move(new_root_provider);
}
xds_certificate_provider_->UpdateRootCertNameAndDistributor(
cluster_name, root_provider_cert_name,
root_certificate_provider_ == nullptr
? nullptr
: root_certificate_provider_->distributor());
// Configure identity cert.
absl::string_view identity_provider_instance_name =
cluster_data.common_tls_context
.tls_certificate_certificate_provider_instance.instance_name;
absl::string_view identity_provider_cert_name =
cluster_data.common_tls_context
.tls_certificate_certificate_provider_instance.certificate_name;
RefCountedPtr<XdsCertificateProvider> new_identity_provider;
if (!identity_provider_instance_name.empty()) {
new_identity_provider =
xds_client_->certificate_provider_store()
.CreateOrGetCertificateProvider(identity_provider_instance_name);
if (new_identity_provider == nullptr) {
return grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("Certificate provider instance name: \"",
identity_provider_instance_name,
"\" not recognized.")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
}
}
if (identity_certificate_provider_ != new_identity_provider) {
if (identity_certificate_provider_ != nullptr &&
identity_certificate_provider_->interested_parties() != nullptr) {
grpc_pollset_set_del_pollset_set(
interested_parties(),
identity_certificate_provider_->interested_parties());
}
if (new_identity_provider != nullptr &&
new_identity_provider->interested_parties() != nullptr) {
grpc_pollset_set_add_pollset_set(
interested_parties(), new_identity_provider->interested_parties());
}
identity_certificate_provider_ = std::move(new_identity_provider);
}
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
cluster_name, identity_provider_cert_name,
identity_certificate_provider_ == nullptr
? nullptr
: identity_certificate_provider_->distributor());
// Configure SAN matchers.
const std::vector<StringMatcher>& match_subject_alt_names =
cluster_data.common_tls_context.combined_validation_context
.default_validation_context.match_subject_alt_names;
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
cluster_name, match_subject_alt_names);
return GRPC_ERROR_NONE;
}
void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
XdsClient::ClusterWatcherInterface* watcher,
bool delay_unsubscription) {
if (xds_certificate_provider_ != nullptr) {
std::string name(cluster_name);
xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
nullptr);
xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
}
xds_client_->CancelClusterDataWatch(cluster_name, watcher,
delay_unsubscription);
}
//
// factory
//
class CdsLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
RefCountedPtr<XdsClient> xds_client =
XdsClient::GetFromChannelArgs(*args.args);
if (xds_client == nullptr) {
gpr_log(GPR_ERROR,
"XdsClient not present in channel args -- cannot instantiate "
"cds LB policy");
return nullptr;
}
return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
}
const char* name() const override { return kCds; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error_handle* error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
// xds was mentioned as a policy in the deprecated loadBalancingPolicy
// field or in the client API.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:loadBalancingPolicy error:cds policy requires configuration. "
"Please use loadBalancingConfig field of service config instead.");
return nullptr;
}
std::vector<grpc_error_handle> error_list;
// cluster name.
std::string cluster;
auto it = json.object_value().find("cluster");
if (it == json.object_value().end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"required field 'cluster' not present"));
} else if (it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:cluster error:type should be string"));
} else {
cluster = it->second.string_value();
}
if (!error_list.empty()) {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list);
return nullptr;
}
return MakeRefCounted<CdsLbConfig>(std::move(cluster));
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_cds_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::CdsLbFactory>());
}
void grpc_lb_policy_cds_shutdown() {}