blob: d4733e3ee26c88e9ccf15ddc32409a2f2d7fc9f3 [file] [log] [blame]
//
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_replace.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "re2/re2.h"
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/slice/slice.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/ext/xds/xds_route_config.h"
#include "src/core/ext/xds/xds_routing.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_factory.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
UniqueTypeName XdsClusterAttribute::TypeName() {
static UniqueTypeName::Factory kFactory("xds_cluster_name");
return kFactory.Create();
}
namespace {
std::string GetDefaultAuthorityInternal(const URI& uri) {
// Obtain the authority to use for the data plane connections, which is
// also used to select the right VirtualHost from the RouteConfiguration.
// We need to take the part of the URI path following the last
// "/" character or the entire path if the path contains no "/" character.
size_t pos = uri.path().find_last_of('/');
if (pos == uri.path().npos) return uri.path();
return uri.path().substr(pos + 1);
}
std::string GetDataPlaneAuthority(const ChannelArgs& args, const URI& uri) {
absl::optional<std::string> authority =
args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
if (authority.has_value()) return std::move(*authority);
return GetDefaultAuthorityInternal(uri);
}
//
// XdsResolver
//
class XdsResolver : public Resolver {
public:
explicit XdsResolver(ResolverArgs args)
: work_serializer_(std::move(args.work_serializer)),
result_handler_(std::move(args.result_handler)),
args_(std::move(args.args)),
interested_parties_(args.pollset_set),
uri_(std::move(args.uri)),
data_plane_authority_(GetDataPlaneAuthority(args_, uri_)),
channel_id_(absl::Uniform<uint64_t>(absl::BitGen())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(
GPR_INFO,
"[xds_resolver %p] created for URI %s; data plane authority is %s",
this, uri_.ToString().c_str(), data_plane_authority_.c_str());
}
}
~XdsResolver() override {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
}
}
void StartLocked() override;
void ShutdownLocked() override;
void ResetBackoffLocked() override {
if (xds_client_ != nullptr) xds_client_->ResetBackoff();
}
private:
class ListenerWatcher : public XdsListenerResourceType::WatcherInterface {
public:
explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsListenerResource listener) override {
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self), listener = std::move(listener)]() mutable {
self->resolver_->OnListenerUpdate(std::move(listener));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self), status = std::move(status)]() mutable {
self->resolver_->OnError(self->resolver_->lds_resource_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
RefCountedPtr<ListenerWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self)]() {
self->resolver_->OnResourceDoesNotExist(
absl::StrCat(self->resolver_->lds_resource_name_,
": xDS listener resource does not exist"));
},
DEBUG_LOCATION);
}
private:
RefCountedPtr<XdsResolver> resolver_;
};
class RouteConfigWatcher
: public XdsRouteConfigResourceType::WatcherInterface {
public:
explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
: resolver_(std::move(resolver)) {}
void OnResourceChanged(XdsRouteConfigResource route_config) override {
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self),
route_config = std::move(route_config)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnRouteConfigUpdate(std::move(route_config));
},
DEBUG_LOCATION);
}
void OnError(absl::Status status) override {
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self), status = std::move(status)]() mutable {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnError(self->resolver_->route_config_name_,
std::move(status));
},
DEBUG_LOCATION);
}
void OnResourceDoesNotExist() override {
RefCountedPtr<RouteConfigWatcher> self = Ref();
resolver_->work_serializer_->Run(
[self = std::move(self)]() {
if (self != self->resolver_->route_config_watcher_) return;
self->resolver_->OnResourceDoesNotExist(absl::StrCat(
self->resolver_->route_config_name_,
": xDS route configuration resource does not exist"));
},
DEBUG_LOCATION);
}
private:
RefCountedPtr<XdsResolver> resolver_;
};
// An entry in the map of clusters that need to be present in the LB
// policy config. The map holds a weak ref. One strong ref is held by
// the ConfigSelector, and another is held by each call assigned to
// the cluster by the ConfigSelector. The ref for each call is held
// until the call is committed. When the strong refs go away, we hop
// back into the WorkSerializer to remove the entry from the map.
class ClusterState : public DualRefCounted<ClusterState> {
public:
ClusterState(RefCountedPtr<XdsResolver> resolver,
absl::string_view cluster_name)
: resolver_(std::move(resolver)), cluster_name_(cluster_name) {}
void Orphan() override {
auto* resolver = resolver_.get();
resolver->work_serializer_->Run(
[resolver = std::move(resolver_)]() {
resolver->MaybeRemoveUnusedClusters();
},
DEBUG_LOCATION);
}
const std::string& cluster_name() const { return cluster_name_; }
private:
RefCountedPtr<XdsResolver> resolver_;
std::string cluster_name_;
};
// A map containing cluster refs held by the XdsConfigSelector. A ref to
// this map will be taken by each call processed by the XdsConfigSelector,
// stored in a the call's call attributes, and later unreffed
// by the ClusterSelection filter.
class XdsClusterMap : public RefCounted<XdsClusterMap> {
public:
explicit XdsClusterMap(
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters)
: clusters_(std::move(clusters)) {}
bool operator==(const XdsClusterMap& other) const {
return clusters_ == other.clusters_;
}
RefCountedPtr<ClusterState> Find(absl::string_view name) const {
auto it = clusters_.find(name);
if (it == clusters_.end()) {
return nullptr;
}
return it->second;
}
private:
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
};
class XdsConfigSelector : public ConfigSelector {
public:
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
absl::Status* status);
~XdsConfigSelector() override;
const char* name() const override { return "XdsConfigSelector"; }
bool Equals(const ConfigSelector* other) const override {
const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
// Don't need to compare resolver_, since that will always be the same.
return route_table_ == other_xds->route_table_ &&
*cluster_map_ == *other_xds->cluster_map_;
}
absl::Status GetCallConfig(GetCallConfigArgs args) override;
std::vector<const grpc_channel_filter*> GetFilters() override {
return filters_;
}
private:
struct Route {
struct ClusterWeightState {
uint32_t range_end;
absl::string_view cluster;
RefCountedPtr<ServiceConfig> method_config;
bool operator==(const ClusterWeightState& other) const;
};
XdsRouteConfigResource::Route route;
RefCountedPtr<ServiceConfig> method_config;
std::vector<ClusterWeightState> weighted_cluster_state;
bool operator==(const Route& other) const;
};
using RouteTable = std::vector<Route>;
class RouteListIterator;
absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateMethodConfig(
const XdsRouteConfigResource::Route& route,
const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
cluster_weight);
RefCountedPtr<XdsResolver> resolver_;
RouteTable route_table_;
RefCountedPtr<XdsClusterMap> cluster_map_;
std::vector<const grpc_channel_filter*> filters_;
};
class ClusterSelectionFilter : public ChannelFilter {
public:
const static grpc_channel_filter kFilter;
static absl::StatusOr<ClusterSelectionFilter> Create(
const ChannelArgs& /* unused */, ChannelFilter::Args filter_args) {
return ClusterSelectionFilter(filter_args);
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override {
auto* service_config_call_data =
static_cast<ClientChannelServiceConfigCallData*>(
GetContext<grpc_call_context_element>()
[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA]
.value);
GPR_ASSERT(service_config_call_data != nullptr);
auto* cluster_data = static_cast<XdsClusterMapAttribute*>(
service_config_call_data->GetCallAttribute(
XdsClusterMapAttribute::TypeName()));
auto* cluster_name_attribute = static_cast<XdsClusterAttribute*>(
service_config_call_data->GetCallAttribute(
XdsClusterAttribute::TypeName()));
if (cluster_data != nullptr && cluster_name_attribute != nullptr) {
auto cluster =
cluster_data->LockAndGetCluster(cluster_name_attribute->cluster());
if (cluster != nullptr) {
service_config_call_data->SetOnCommit(
[cluster = std::move(cluster)]() mutable { cluster.reset(); });
}
}
return next_promise_factory(std::move(call_args));
}
private:
explicit ClusterSelectionFilter(ChannelFilter::Args filter_args)
: filter_args_(filter_args) {}
ChannelFilter::Args filter_args_;
};
RefCountedPtr<ClusterState> GetOrCreateClusterState(
absl::string_view cluster_name) {
auto it = cluster_state_map_.find(cluster_name);
if (it == cluster_state_map_.end()) {
auto cluster = MakeRefCounted<ClusterState>(Ref(), cluster_name);
cluster_state_map_.emplace(cluster->cluster_name(), cluster->WeakRef());
return cluster;
}
return it->second->Ref();
}
class XdsClusterMapAttribute
: public ServiceConfigCallData::CallAttributeInterface {
public:
static UniqueTypeName TypeName() {
static UniqueTypeName::Factory factory("xds_cluster_lb_data");
return factory.Create();
}
explicit XdsClusterMapAttribute(RefCountedPtr<XdsClusterMap> cluster_map)
: cluster_map_(std::move(cluster_map)) {}
// This method can be called only once. The first call will release the
// reference to the cluster map, and subsequent calls will return nullptr.
RefCountedPtr<ClusterState> LockAndGetCluster(
absl::string_view cluster_name) {
if (cluster_map_ == nullptr) {
return nullptr;
}
auto cluster = cluster_map_->Find(cluster_name);
cluster_map_.reset();
return cluster;
}
UniqueTypeName type() const override { return TypeName(); }
private:
RefCountedPtr<XdsClusterMap> cluster_map_;
};
void OnListenerUpdate(XdsListenerResource listener);
void OnRouteConfigUpdate(XdsRouteConfigResource rds_update);
void OnError(absl::string_view context, absl::Status status);
void OnResourceDoesNotExist(std::string context);
absl::StatusOr<RefCountedPtr<ServiceConfig>> CreateServiceConfig();
void GenerateResult();
void MaybeRemoveUnusedClusters();
uint64_t channel_id() const { return channel_id_; }
std::shared_ptr<WorkSerializer> work_serializer_;
std::unique_ptr<ResultHandler> result_handler_;
ChannelArgs args_;
grpc_pollset_set* interested_parties_;
URI uri_;
RefCountedPtr<GrpcXdsClient> xds_client_;
std::string lds_resource_name_;
std::string data_plane_authority_;
uint64_t channel_id_;
ListenerWatcher* listener_watcher_ = nullptr;
// This will not contain the RouteConfiguration, even if it comes with the
// LDS response; instead, the relevant VirtualHost from the
// RouteConfiguration will be saved in current_virtual_host_.
XdsListenerResource::HttpConnectionManager current_listener_;
std::string route_config_name_;
RouteConfigWatcher* route_config_watcher_ = nullptr;
absl::optional<XdsRouteConfigResource::VirtualHost> current_virtual_host_;
std::map<std::string /*cluster_specifier_plugin_name*/,
std::string /*LB policy config*/>
cluster_specifier_plugin_map_;
std::map<absl::string_view, WeakRefCountedPtr<ClusterState>>
cluster_state_map_;
};
//
// XdsResolver::XdsConfigSelector::Route
//
bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
if (sc1 == nullptr) return sc2 == nullptr;
if (sc2 == nullptr) return false;
return sc1->json_string() == sc2->json_string();
}
const grpc_channel_filter XdsResolver::ClusterSelectionFilter::kFilter =
MakePromiseBasedFilter<ClusterSelectionFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>(
"cluster_selection_filter");
bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
const ClusterWeightState& other) const {
return range_end == other.range_end && cluster == other.cluster &&
MethodConfigsEqual(method_config.get(), other.method_config.get());
}
bool XdsResolver::XdsConfigSelector::Route::operator==(
const Route& other) const {
return route == other.route &&
weighted_cluster_state == other.weighted_cluster_state &&
MethodConfigsEqual(method_config.get(), other.method_config.get());
}
// Implementation of XdsRouting::RouteListIterator for getting the matching
// route for a request.
class XdsResolver::XdsConfigSelector::RouteListIterator
: public XdsRouting::RouteListIterator {
public:
explicit RouteListIterator(const RouteTable* route_table)
: route_table_(route_table) {}
size_t Size() const override { return route_table_->size(); }
const XdsRouteConfigResource::Route::Matchers& GetMatchersForRoute(
size_t index) const override {
return (*route_table_)[index].route.matchers;
}
private:
const RouteTable* route_table_;
};
//
// XdsResolver::XdsConfigSelector
//
XdsResolver::XdsConfigSelector::XdsConfigSelector(
RefCountedPtr<XdsResolver> resolver, absl::Status* status)
: resolver_(std::move(resolver)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
resolver_.get(), this);
}
// 1. Construct the route table
// 2 Update resolver's cluster state map
// 3. Construct cluster list to hold on to entries in the cluster state
// map.
// Reserve the necessary entries up-front to avoid reallocation as we add
// elements. This is necessary because the string_view in the entry's
// weighted_cluster_state field points to the memory in the route field, so
// moving the entry in a reallocation will cause the string_view to point to
// invalid data.
route_table_.reserve(resolver_->current_virtual_host_->routes.size());
std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters;
auto maybe_add_cluster = [&](absl::string_view cluster_name) {
if (clusters.find(cluster_name) != clusters.end()) return;
auto cluster_state = resolver_->GetOrCreateClusterState(cluster_name);
absl::string_view name = cluster_state->cluster_name();
clusters.emplace(name, std::move(cluster_state));
};
for (auto& route : resolver_->current_virtual_host_->routes) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
resolver_.get(), this, route.ToString().c_str());
}
route_table_.emplace_back();
auto& route_entry = route_table_.back();
route_entry.route = route;
auto* route_action =
absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&route_entry.route.action);
if (route_action != nullptr) {
// If the route doesn't specify a timeout, set its timeout to the global
// one.
if (!route_action->max_stream_duration.has_value()) {
route_action->max_stream_duration =
resolver_->current_listener_.http_max_stream_duration;
}
Match(
route_action->action,
// cluster name
[&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
cluster_name) {
auto result = CreateMethodConfig(route_entry.route, nullptr);
if (!result.ok()) {
*status = result.status();
return;
}
route_entry.method_config = std::move(*result);
maybe_add_cluster(
absl::StrCat("cluster:", cluster_name.cluster_name));
},
// WeightedClusters
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
weighted_clusters) {
uint32_t end = 0;
for (const auto& weighted_cluster : weighted_clusters) {
Route::ClusterWeightState cluster_weight_state;
auto result =
CreateMethodConfig(route_entry.route, &weighted_cluster);
if (!result.ok()) {
*status = result.status();
return;
}
cluster_weight_state.method_config = std::move(*result);
end += weighted_cluster.weight;
cluster_weight_state.range_end = end;
cluster_weight_state.cluster = weighted_cluster.name;
route_entry.weighted_cluster_state.push_back(
std::move(cluster_weight_state));
maybe_add_cluster(
absl::StrCat("cluster:", weighted_cluster.name));
}
},
// ClusterSpecifierPlugin
[&](const XdsRouteConfigResource::Route::RouteAction::
ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
auto result = CreateMethodConfig(route_entry.route, nullptr);
if (!result.ok()) {
*status = result.status();
return;
}
route_entry.method_config = std::move(*result);
maybe_add_cluster(absl::StrCat(
"cluster_specifier_plugin:",
cluster_specifier_plugin_name.cluster_specifier_plugin_name));
});
if (!status->ok()) return;
}
}
cluster_map_ = MakeRefCounted<XdsClusterMap>(std::move(clusters));
// Populate filter list.
const auto& http_filter_registry =
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
.http_filter_registry();
for (const auto& http_filter : resolver_->current_listener_.http_filters) {
// Find filter. This is guaranteed to succeed, because it's checked
// at config validation time in the XdsApi code.
const XdsHttpFilterImpl* filter_impl =
http_filter_registry.GetFilterForType(
http_filter.config.config_proto_type_name);
GPR_ASSERT(filter_impl != nullptr);
// Add C-core filter to list.
if (filter_impl->channel_filter() != nullptr) {
filters_.push_back(filter_impl->channel_filter());
}
}
filters_.push_back(&ClusterSelectionFilter::kFilter);
}
XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
resolver_.get(), this);
}
cluster_map_.reset();
resolver_->MaybeRemoveUnusedClusters();
}
absl::StatusOr<RefCountedPtr<ServiceConfig>>
XdsResolver::XdsConfigSelector::CreateMethodConfig(
const XdsRouteConfigResource::Route& route,
const XdsRouteConfigResource::Route::RouteAction::ClusterWeight*
cluster_weight) {
std::vector<std::string> fields;
const auto& route_action =
absl::get<XdsRouteConfigResource::Route::RouteAction>(route.action);
// Set retry policy if any.
if (route_action.retry_policy.has_value() &&
!route_action.retry_policy->retry_on.Empty()) {
std::vector<std::string> retry_parts;
retry_parts.push_back(absl::StrFormat(
"\"retryPolicy\": {\n"
" \"maxAttempts\": %d,\n"
" \"initialBackoff\": \"%s\",\n"
" \"maxBackoff\": \"%s\",\n"
" \"backoffMultiplier\": 2,\n",
route_action.retry_policy->num_retries + 1,
route_action.retry_policy->retry_back_off.base_interval.ToJsonString(),
route_action.retry_policy->retry_back_off.max_interval.ToJsonString()));
std::vector<std::string> code_parts;
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) {
code_parts.push_back(" \"CANCELLED\"");
}
if (route_action.retry_policy->retry_on.Contains(
GRPC_STATUS_DEADLINE_EXCEEDED)) {
code_parts.push_back(" \"DEADLINE_EXCEEDED\"");
}
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) {
code_parts.push_back(" \"INTERNAL\"");
}
if (route_action.retry_policy->retry_on.Contains(
GRPC_STATUS_RESOURCE_EXHAUSTED)) {
code_parts.push_back(" \"RESOURCE_EXHAUSTED\"");
}
if (route_action.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) {
code_parts.push_back(" \"UNAVAILABLE\"");
}
retry_parts.push_back(
absl::StrFormat(" \"retryableStatusCodes\": [\n %s ]\n",
absl::StrJoin(code_parts, ",\n")));
retry_parts.push_back(" }");
fields.emplace_back(absl::StrJoin(retry_parts, ""));
}
// Set timeout.
if (route_action.max_stream_duration.has_value() &&
(route_action.max_stream_duration != Duration::Zero())) {
fields.emplace_back(
absl::StrFormat(" \"timeout\": \"%s\"",
route_action.max_stream_duration->ToJsonString()));
}
// Handle xDS HTTP filters.
auto result = XdsRouting::GeneratePerHTTPFilterConfigs(
static_cast<const GrpcXdsBootstrap&>(resolver_->xds_client_->bootstrap())
.http_filter_registry(),
resolver_->current_listener_.http_filters,
resolver_->current_virtual_host_.value(), route, cluster_weight,
resolver_->args_);
if (!result.ok()) return result.status();
for (const auto& p : result->per_filter_configs) {
fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
absl::StrJoin(p.second, ",\n"),
"\n ]"));
}
// Construct service config.
if (!fields.empty()) {
std::string json = absl::StrCat(
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" {}\n"
" ],\n"
" ",
absl::StrJoin(fields, ",\n"),
"\n } ]\n"
"}");
return ServiceConfigImpl::Create(result->args, json.c_str());
}
return nullptr;
}
absl::optional<uint64_t> HeaderHashHelper(
const XdsRouteConfigResource::Route::RouteAction::HashPolicy::Header&
header_policy,
grpc_metadata_batch* initial_metadata) {
std::string value_buffer;
absl::optional<absl::string_view> header_value = XdsRouting::GetHeaderValue(
initial_metadata, header_policy.header_name, &value_buffer);
if (!header_value.has_value()) return absl::nullopt;
if (header_policy.regex != nullptr) {
// If GetHeaderValue() did not already store the value in
// value_buffer, copy it there now, so we can modify it.
if (header_value->data() != value_buffer.data()) {
value_buffer = std::string(*header_value);
}
RE2::GlobalReplace(&value_buffer, *header_policy.regex,
header_policy.regex_substitution);
header_value = value_buffer;
}
return XXH64(header_value->data(), header_value->size(), 0);
}
absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
GetCallConfigArgs args) {
Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
auto route_index = XdsRouting::GetRouteForRequest(
RouteListIterator(&route_table_), path->as_string_view(),
args.initial_metadata);
if (!route_index.has_value()) {
return absl::UnavailableError(
"No matching route found in xDS route config");
}
auto& entry = route_table_[*route_index];
// Found a route match
const auto* route_action =
absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
&entry.route.action);
if (route_action == nullptr) {
return absl::UnavailableError("Matching route has inappropriate action");
}
std::string cluster_name;
RefCountedPtr<ServiceConfig> method_config;
Match(
route_action->action,
// cluster name
[&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
action_cluster_name) {
cluster_name =
absl::StrCat("cluster:", action_cluster_name.cluster_name);
method_config = entry.method_config;
},
// WeightedClusters
[&](const std::vector<
XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
/*weighted_clusters*/) {
const uint32_t key = absl::Uniform<uint32_t>(
absl::BitGen(), 0, entry.weighted_cluster_state.back().range_end);
// Find the index in weighted clusters corresponding to key.
size_t mid = 0;
size_t start_index = 0;
size_t end_index = entry.weighted_cluster_state.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (entry.weighted_cluster_state[mid].range_end > key) {
end_index = mid;
} else if (entry.weighted_cluster_state[mid].range_end < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
cluster_name = absl::StrCat(
"cluster:", entry.weighted_cluster_state[index].cluster);
method_config = entry.weighted_cluster_state[index].method_config;
},
// ClusterSpecifierPlugin
[&](const XdsRouteConfigResource::Route::RouteAction::
ClusterSpecifierPluginName& cluster_specifier_plugin_name) {
cluster_name = absl::StrCat(
"cluster_specifier_plugin:",
cluster_specifier_plugin_name.cluster_specifier_plugin_name);
method_config = entry.method_config;
});
auto cluster = cluster_map_->Find(cluster_name);
GPR_ASSERT(cluster != nullptr);
// Generate a hash.
absl::optional<uint64_t> hash;
for (const auto& hash_policy : route_action->hash_policies) {
absl::optional<uint64_t> new_hash = Match(
hash_policy.policy,
[&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
Header& header) {
return HeaderHashHelper(header, args.initial_metadata);
},
[&](const XdsRouteConfigResource::Route::RouteAction::HashPolicy::
ChannelId&) -> absl::optional<uint64_t> {
return resolver_->channel_id();
});
if (new_hash.has_value()) {
// Rotating the old value prevents duplicate hash rules from cancelling
// each other out and preserves all of the entropy
const uint64_t old_value =
hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
hash = old_value ^ new_hash.value();
}
// If the policy is a terminal policy and a hash has been generated,
// ignore the rest of the hash policies.
if (hash_policy.terminal && hash.has_value()) {
break;
}
}
if (!hash.has_value()) {
hash = absl::Uniform<uint64_t>(absl::BitGen());
}
// Populate service config call data.
if (method_config != nullptr) {
auto* parsed_method_configs =
method_config->GetMethodParsedConfigVector(grpc_empty_slice());
args.service_config_call_data->SetServiceConfig(std::move(method_config),
parsed_method_configs);
}
args.service_config_call_data->SetCallAttribute(
args.arena->New<XdsClusterAttribute>(cluster->cluster_name()));
std::string hash_string = absl::StrCat(hash.value());
char* hash_value =
static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
memcpy(hash_value, hash_string.c_str(), hash_string.size());
hash_value[hash_string.size()] = '\0';
args.service_config_call_data->SetCallAttribute(
args.arena->New<RequestHashAttribute>(hash_value));
args.service_config_call_data->SetCallAttribute(
args.arena->ManagedNew<XdsClusterMapAttribute>(cluster_map_));
return absl::OkStatus();
}
//
// XdsResolver
//
void XdsResolver::StartLocked() {
auto xds_client = GrpcXdsClient::GetOrCreate(args_, "xds resolver");
if (!xds_client.ok()) {
gpr_log(GPR_ERROR,
"Failed to create xds client -- channel will remain in "
"TRANSIENT_FAILURE: %s",
xds_client.status().ToString().c_str());
absl::Status status = absl::UnavailableError(absl::StrCat(
"Failed to create XdsClient: ", xds_client.status().message()));
Result result;
result.addresses = status;
result.service_config = std::move(status);
result.args = args_;
result_handler_->ReportResult(std::move(result));
return;
}
xds_client_ = std::move(*xds_client);
std::string resource_name_fragment(absl::StripPrefix(uri_.path(), "/"));
if (!uri_.authority().empty()) {
// target_uri.authority is set case
const auto* authority_config =
static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
xds_client_->bootstrap().LookupAuthority(uri_.authority()));
if (authority_config == nullptr) {
absl::Status status = absl::UnavailableError(
absl::StrCat("Invalid target URI -- authority not found for ",
uri_.authority().c_str()));
Result result;
result.addresses = status;
result.service_config = std::move(status);
result.args = args_;
result_handler_->ReportResult(std::move(result));
return;
}
std::string name_template =
authority_config->client_listener_resource_name_template();
if (name_template.empty()) {
name_template = absl::StrCat(
"xdstp://", URI::PercentEncodeAuthority(uri_.authority()),
"/envoy.config.listener.v3.Listener/%s");
}
lds_resource_name_ = absl::StrReplaceAll(
name_template,
{{"%s", URI::PercentEncodePath(resource_name_fragment)}});
} else {
// target_uri.authority not set
absl::string_view name_template =
static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
.client_default_listener_resource_name_template();
if (name_template.empty()) {
name_template = "%s";
}
if (absl::StartsWith(name_template, "xdstp:")) {
resource_name_fragment = URI::PercentEncodePath(resource_name_fragment);
}
lds_resource_name_ =
absl::StrReplaceAll(name_template, {{"%s", resource_name_fragment}});
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] Started with lds_resource_name %s.",
this, lds_resource_name_.c_str());
}
grpc_pollset_set_add_pollset_set(
static_cast<GrpcXdsClient*>(xds_client_.get())->interested_parties(),
interested_parties_);
auto watcher = MakeRefCounted<ListenerWatcher>(Ref());
listener_watcher_ = watcher.get();
XdsListenerResourceType::StartWatch(xds_client_.get(), lds_resource_name_,
std::move(watcher));
}
void XdsResolver::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
}
if (xds_client_ != nullptr) {
if (listener_watcher_ != nullptr) {
XdsListenerResourceType::CancelWatch(
xds_client_.get(), lds_resource_name_, listener_watcher_,
/*delay_unsubscription=*/false);
}
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/false);
}
grpc_pollset_set_del_pollset_set(
static_cast<GrpcXdsClient*>(xds_client_.get())->interested_parties(),
interested_parties_);
xds_client_.reset(DEBUG_LOCATION, "xds resolver");
}
}
void XdsResolver::OnListenerUpdate(XdsListenerResource listener) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
}
if (xds_client_ == nullptr) return;
auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
&listener.listener);
if (hcm == nullptr) {
return OnError(lds_resource_name_,
absl::UnavailableError("not an API listener"));
}
current_listener_ = std::move(*hcm);
MatchMutable(
&current_listener_.route_config,
// RDS resource name
[&](std::string* rds_name) {
// If the RDS name changed, update the RDS watcher.
// Note that this will be true on the initial update, because
// route_config_name_ will be empty.
if (route_config_name_ != *rds_name) {
// If we already had a watch (i.e., if the previous config had
// a different RDS name), stop the previous watch.
// There will be no previous watch if either (a) this is the
// initial resource update or (b) the previous Listener had an
// inlined RouteConfig.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_,
/*delay_unsubscription=*/true);
route_config_watcher_ = nullptr;
}
// Start watch for the new RDS resource name.
route_config_name_ = std::move(*rds_name);
auto watcher = MakeRefCounted<RouteConfigWatcher>(Ref());
route_config_watcher_ = watcher.get();
XdsRouteConfigResourceType::StartWatch(
xds_client_.get(), route_config_name_, std::move(watcher));
} else {
// RDS resource name has not changed, so no watch needs to be
// updated, but we still need to propagate any changes in the
// HCM config (e.g., the list of HTTP filters).
GenerateResult();
}
},
// inlined RouteConfig
[&](XdsRouteConfigResource* route_config) {
// If the previous update specified an RDS resource instead of
// having an inlined RouteConfig, we need to cancel the RDS watch.
if (route_config_watcher_ != nullptr) {
XdsRouteConfigResourceType::CancelWatch(
xds_client_.get(), route_config_name_, route_config_watcher_);
route_config_watcher_ = nullptr;
route_config_name_.clear();
}
OnRouteConfigUpdate(std::move(*route_config));
});
}
namespace {
class VirtualHostListIterator : public XdsRouting::VirtualHostListIterator {
public:
explicit VirtualHostListIterator(
const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
: virtual_hosts_(virtual_hosts) {}
size_t Size() const override { return virtual_hosts_->size(); }
const std::vector<std::string>& GetDomainsForVirtualHost(
size_t index) const override {
return (*virtual_hosts_)[index].domains;
}
private:
const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
};
} // namespace
void XdsResolver::OnRouteConfigUpdate(XdsRouteConfigResource rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
}
if (xds_client_ == nullptr) {
return;
}
// Find the relevant VirtualHost from the RouteConfiguration.
auto vhost_index = XdsRouting::FindVirtualHostForDomain(
VirtualHostListIterator(&rds_update.virtual_hosts),
data_plane_authority_);
if (!vhost_index.has_value()) {
OnError(
route_config_name_.empty() ? lds_resource_name_ : route_config_name_,
absl::UnavailableError(absl::StrCat("could not find VirtualHost for ",
data_plane_authority_,
" in RouteConfiguration")));
return;
}
// Save the virtual host in the resolver.
current_virtual_host_ = std::move(rds_update.virtual_hosts[*vhost_index]);
cluster_specifier_plugin_map_ =
std::move(rds_update.cluster_specifier_plugin_map);
// Send a new result to the channel.
GenerateResult();
}
void XdsResolver::OnError(absl::string_view context, absl::Status status) {
gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s: %s",
this, std::string(context).c_str(), status.ToString().c_str());
if (xds_client_ == nullptr) return;
status =
absl::UnavailableError(absl::StrCat(context, ": ", status.ToString()));
Result result;
result.addresses = status;
result.service_config = std::move(status);
// Need to explicitly convert to the right RefCountedPtr<> type for
// use with ChannelArgs::SetObject().
RefCountedPtr<GrpcXdsClient> xds_client =
xds_client_->Ref(DEBUG_LOCATION, "xds resolver result");
result.args = args_.SetObject(std::move(xds_client));
result_handler_->ReportResult(std::move(result));
}
void XdsResolver::OnResourceDoesNotExist(std::string context) {
gpr_log(GPR_ERROR,
"[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
"update and returning empty service config",
this);
if (xds_client_ == nullptr) {
return;
}
current_virtual_host_.reset();
Result result;
result.addresses.emplace();
result.service_config = ServiceConfigImpl::Create(args_, "{}");
GPR_ASSERT(result.service_config.ok());
result.resolution_note = std::move(context);
result.args = args_;
result_handler_->ReportResult(std::move(result));
}
absl::StatusOr<RefCountedPtr<ServiceConfig>>
XdsResolver::CreateServiceConfig() {
std::vector<std::string> clusters;
for (const auto& cluster : cluster_state_map_) {
absl::string_view child_name = cluster.first;
if (absl::ConsumePrefix(&child_name, "cluster_specifier_plugin:")) {
clusters.push_back(absl::StrFormat(
" \"%s\":{\n"
" \"childPolicy\": %s\n"
" }",
cluster.first,
cluster_specifier_plugin_map_[std::string(child_name)]));
} else {
absl::ConsumePrefix(&child_name, "cluster:");
clusters.push_back(
absl::StrFormat(" \"%s\":{\n"
" \"childPolicy\":[ {\n"
" \"cds_experimental\":{\n"
" \"cluster\": \"%s\"\n"
" }\n"
" } ]\n"
" }",
cluster.first, child_name));
}
}
std::vector<std::string> config_parts;
config_parts.push_back(
"{\n"
" \"loadBalancingConfig\":[\n"
" { \"xds_cluster_manager_experimental\":{\n"
" \"children\":{\n");
config_parts.push_back(absl::StrJoin(clusters, ",\n"));
config_parts.push_back(
" }\n"
" } }\n"
" ]\n"
"}");
std::string json = absl::StrJoin(config_parts, "");
return ServiceConfigImpl::Create(args_, json.c_str());
}
void XdsResolver::GenerateResult() {
if (!current_virtual_host_.has_value()) return;
// First create XdsConfigSelector, which may add new entries to the cluster
// state map, and then CreateServiceConfig for LB policies.
absl::Status status;
auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &status);
if (!status.ok()) {
OnError("could not create ConfigSelector",
absl::UnavailableError(status.message()));
return;
}
Result result;
result.addresses.emplace();
result.service_config = CreateServiceConfig();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
result.service_config.ok()
? std::string((*result.service_config)->json_string()).c_str()
: result.service_config.status().ToString().c_str());
}
// Need to explicitly convert to the right RefCountedPtr<> type for
// use with ChannelArgs::SetObject().
RefCountedPtr<GrpcXdsClient> xds_client =
xds_client_->Ref(DEBUG_LOCATION, "xds resolver result");
result.args =
args_.SetObject(std::move(xds_client)).SetObject(config_selector);
result_handler_->ReportResult(std::move(result));
}
void XdsResolver::MaybeRemoveUnusedClusters() {
bool update_needed = false;
for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
if (cluster_state != nullptr) {
++it;
} else {
update_needed = true;
it = cluster_state_map_.erase(it);
}
}
if (update_needed && xds_client_ != nullptr) {
// Send a new result to the channel.
GenerateResult();
}
}
//
// Factory
//
class XdsResolverFactory : public ResolverFactory {
public:
absl::string_view scheme() const override { return "xds"; }
bool IsValidUri(const URI& uri) const override {
if (uri.path().empty() || uri.path().back() == '/') {
gpr_log(GPR_ERROR,
"URI path does not contain valid data plane authority");
return false;
}
return true;
}
std::string GetDefaultAuthority(const URI& uri) const override {
return GetDefaultAuthorityInternal(uri);
}
OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
if (!IsValidUri(args.uri)) return nullptr;
return MakeOrphanable<XdsResolver>(std::move(args));
}
};
} // namespace
void RegisterXdsResolver(CoreConfiguration::Builder* builder) {
builder->resolver_registry()->RegisterResolverFactory(
std::make_unique<XdsResolverFactory>());
}
} // namespace grpc_core