blob: 52781b395909be8d5e9de8d73c63d71d5ed5ef93 [file] [log] [blame]
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/xds/xds_client.h"
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/ext/xds/xds_http_filters.h"
#include "src/core/ext/xds/xds_listener.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/security/credentials/channel_creds_registry.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/lame_client.h"
#include "src/core/lib/uri/uri_parser.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_XDS_RECONNECT_JITTER 0.2
#define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client");
TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
namespace {
Mutex* g_mu = nullptr;
const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
} // namespace
class XdsClient::Notifier {
public:
// Helper template function to invoke `OnError()` on a list of watchers \a
// watchers_list within \a work_serializer. Works with all 4 resource types.
template <class T>
static void ScheduleNotifyWatchersOnErrorInWorkSerializer(
XdsClient* xds_client, const T& watchers_list, grpc_error_handle error,
const DebugLocation& location) {
xds_client->work_serializer_.Schedule(
[watchers_list, error]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
},
location);
}
// Helper template function to invoke `OnResourceDoesNotExist()` on a list of
// watchers \a watchers_list within \a work_serializer. Works with all 4
// resource types.
template <class T>
static void ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
XdsClient* xds_client, const T& watchers_list,
const DebugLocation& location) {
xds_client->work_serializer_.Schedule(
[watchers_list]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnResourceDoesNotExist();
}
},
location);
}
};
//
// Internal class declarations
//
// An xds call wrapper that can restart a call upon failure. Holds a ref to
// the xds channel. The template parameter is the kind of wrapped xds call.
template <typename T>
class XdsClient::ChannelState::RetryableCall
: public InternallyRefCounted<RetryableCall<T>> {
public:
explicit RetryableCall(WeakRefCountedPtr<ChannelState> chand);
void Orphan() override;
void OnCallFinishedLocked();
T* calld() const { return calld_.get(); }
ChannelState* chand() const { return chand_.get(); }
bool IsCurrentCallOnChannel() const;
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimer(void* arg, grpc_error_handle error);
void OnRetryTimerLocked(grpc_error_handle error);
// The wrapped xds call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
OrphanablePtr<T> calld_;
// The owning xds channel.
WeakRefCountedPtr<ChannelState> chand_;
// Retry state.
BackOff backoff_;
grpc_timer retry_timer_;
grpc_closure on_retry_timer_;
bool retry_timer_callback_pending_ = false;
bool shutting_down_ = false;
};
// Contains an ADS call to the xds server.
class XdsClient::ChannelState::AdsCallState
: public InternallyRefCounted<AdsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
~AdsCallState() override;
void Orphan() override;
RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void UnsubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name, bool delay_unsubscription)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool HasSubscribedResources() const;
private:
class AdsResponseParser : public XdsApi::AdsResponseParserInterface {
public:
struct Result {
const XdsResourceType* type;
std::string type_url;
std::string version;
std::string nonce;
std::vector<std::string> errors;
std::map<std::string /*authority*/, std::set<XdsResourceKey>>
resources_seen;
bool have_valid_resources = false;
};
explicit AdsResponseParser(AdsCallState* ads_call_state)
: ads_call_state_(ads_call_state) {}
absl::Status ProcessAdsResponseFields(AdsResponseFields fields) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void ParseResource(const XdsEncodingContext& context, size_t idx,
absl::string_view type_url,
absl::string_view serialized_resource) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
Result TakeResult() { return std::move(result_); }
private:
XdsClient* xds_client() const { return ads_call_state_->xds_client(); }
AdsCallState* ads_call_state_;
const Timestamp update_time_ = ExecCtx::Get()->Now();
Result result_;
};
class ResourceTimer : public InternallyRefCounted<ResourceTimer> {
public:
ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
: type_(type), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
void Orphan() override {
MaybeCancelTimer();
Unref(DEBUG_LOCATION, "Orphan");
}
void MaybeStartTimer(RefCountedPtr<AdsCallState> ads_calld) {
if (timer_started_) return;
timer_started_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
&timer_callback_);
}
void MaybeCancelTimer() {
if (timer_pending_) {
grpc_timer_cancel(&timer_);
timer_pending_ = false;
}
}
private:
static void OnTimer(void* arg, grpc_error_handle error) {
ResourceTimer* self = static_cast<ResourceTimer*>(arg);
{
MutexLock lock(&self->ads_calld_->xds_client()->mu_);
self->OnTimerLocked(GRPC_ERROR_REF(error));
}
self->ads_calld_->xds_client()->work_serializer_.DrainQueue();
self->ads_calld_.reset();
self->Unref(DEBUG_LOCATION, "timer");
}
void OnTimerLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
timer_pending_ = false;
grpc_error_handle watcher_error =
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
"timeout obtaining resource {type=%s name=%s} from xds server",
type_->type_url(),
XdsClient::ConstructFullXdsResourceName(
name_.authority, type_->type_url(), name_.key)));
watcher_error = grpc_error_set_int(
watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] xds server %s: %s",
ads_calld_->xds_client(),
ads_calld_->chand()->server_.server_uri.c_str(),
grpc_error_std_string(watcher_error).c_str());
}
auto& authority_state =
ads_calld_->xds_client()->authority_state_map_[name_.authority];
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
ads_calld_->xds_client(), state.watchers, watcher_error,
DEBUG_LOCATION);
}
GRPC_ERROR_UNREF(error);
}
const XdsResourceType* type_;
const XdsResourceName name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool timer_started_ = false;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
};
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
// Nonce and error for this resource type.
std::string nonce;
grpc_error_handle error = GRPC_ERROR_NONE;
// Subscribed resources of this type.
std::map<std::string /*authority*/,
std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
subscribed_resources;
};
void SendMessageLocked(const XdsResourceType* type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnRequestSent(void* arg, grpc_error_handle error);
void OnRequestSentLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnResponseReceived(void* arg, grpc_error_handle error);
bool OnResponseReceivedLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnStatusReceived(void* arg, grpc_error_handle error);
void OnStatusReceivedLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool IsCurrentCallOnChannel() const;
// Constructs a list of resource names of a given type for an ADS
// request. Also starts the timer for each resource if needed.
std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type);
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
bool sent_initial_message_ = false;
bool seen_response_ = false;
// Always non-NULL.
grpc_call* call_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// Resource types for which requests need to be sent.
std::set<const XdsResourceType*> buffered_requests_;
// State for each resource type.
std::map<const XdsResourceType*, ResourceTypeState> state_map_;
};
// Contains an LRS call to the xds server.
class XdsClient::ChannelState::LrsCallState
: public InternallyRefCounted<LrsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
~LrsCallState() override;
void Orphan() override;
void MaybeStartReportingLocked();
RetryableCall<LrsCallState>* parent() { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
private:
// Reports client-side load stats according to a fixed interval.
class Reporter : public InternallyRefCounted<Reporter> {
public:
Reporter(RefCountedPtr<LrsCallState> parent, Duration report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
grpc_schedule_on_exec_ctx);
ScheduleNextReportLocked();
}
void Orphan() override;
private:
void ScheduleNextReportLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnNextReportTimer(void* arg, grpc_error_handle error);
bool OnNextReportTimerLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnReportDone(void* arg, grpc_error_handle error);
bool OnReportDoneLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool IsCurrentReporterOnCall() const {
return this == parent_->reporter_.get();
}
XdsClient* xds_client() const { return parent_->xds_client(); }
// The owning LRS call.
RefCountedPtr<LrsCallState> parent_;
// The load reporting state.
const Duration report_interval_;
bool last_report_counters_were_zero_ = false;
bool next_report_timer_callback_pending_ = false;
grpc_timer next_report_timer_;
grpc_closure on_next_report_timer_;
grpc_closure on_report_done_;
};
static void OnInitialRequestSent(void* arg, grpc_error_handle error);
void OnInitialRequestSentLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnResponseReceived(void* arg, grpc_error_handle error);
bool OnResponseReceivedLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
static void OnStatusReceived(void* arg, grpc_error_handle error);
void OnStatusReceivedLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
bool IsCurrentCallOnChannel() const;
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<LrsCallState>> parent_;
bool seen_response_ = false;
// Always non-NULL.
grpc_call* call_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_initial_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// Load reporting state.
bool send_all_clusters_ = false;
std::set<std::string> cluster_names_; // Asked for by the LRS server.
Duration load_reporting_interval_;
OrphanablePtr<Reporter> reporter_;
};
//
// XdsClient::ChannelState::StateWatcher
//
class XdsClient::ChannelState::StateWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(WeakRefCountedPtr<ChannelState> parent)
: parent_(std::move(parent)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
{
MutexLock lock(&parent_->xds_client_->mu_);
if (!parent_->shutting_down_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Notify all watchers of error.
gpr_log(GPR_INFO,
"[xds_client %p] xds channel for server %s in "
"state TRANSIENT_FAILURE: %s",
parent_->xds_client(), parent_->server_.server_uri.c_str(),
status.ToString().c_str());
parent_->xds_client_->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds channel in TRANSIENT_FAILURE"));
}
}
parent_->xds_client()->work_serializer_.DrainQueue();
}
WeakRefCountedPtr<ChannelState> parent_;
};
//
// XdsClient::ChannelState
//
namespace {
grpc_channel* CreateXdsChannel(grpc_channel_args* args,
const XdsBootstrap::XdsServer& server) {
RefCountedPtr<grpc_channel_credentials> channel_creds =
CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
server.channel_creds_type, server.channel_creds_config);
return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(),
args);
}
} // namespace
XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
const XdsBootstrap::XdsServer& server)
: DualRefCounted<ChannelState>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
? "ChannelState"
: nullptr),
xds_client_(std::move(xds_client)),
server_(server) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
xds_client_.get(), server.server_uri.c_str());
}
channel_ = CreateXdsChannel(xds_client_->args_, server);
GPR_ASSERT(channel_ != nullptr);
StartConnectivityWatchLocked();
}
XdsClient::ChannelState::~ChannelState() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] destroying xds channel %p for server %s",
xds_client(), this, server_.server_uri.c_str());
}
grpc_channel_destroy(channel_);
xds_client_.reset(DEBUG_LOCATION, "ChannelState");
}
// This method should only ever be called when holding the lock, but we can't
// use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
// called from DualRefCounted::Unref, which cannot have a lock annotation for a
// lock in this subclass.
void XdsClient::ChannelState::Orphan() ABSL_NO_THREAD_SAFETY_ANALYSIS {
shutting_down_ = true;
CancelConnectivityWatchLocked();
// At this time, all strong refs are removed, remove from channel map to
// prevent subsequent subscription from trying to use this ChannelState as it
// is shutting down.
xds_client_->xds_server_channel_map_.erase(server_);
ads_calld_.reset();
lrs_calld_.reset();
}
XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
const {
return ads_calld_->calld();
}
XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
const {
return lrs_calld_->calld();
}
bool XdsClient::ChannelState::HasActiveAdsCall() const {
return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
}
void XdsClient::ChannelState::MaybeStartLrsCall() {
if (lrs_calld_ != nullptr) return;
lrs_calld_.reset(new RetryableCall<LrsCallState>(
WeakRef(DEBUG_LOCATION, "ChannelState+lrs")));
}
void XdsClient::ChannelState::StopLrsCallLocked() {
xds_client_->xds_load_report_server_map_.erase(server_);
lrs_calld_.reset();
}
namespace {
bool IsLameChannel(grpc_channel* channel) {
grpc_channel_element* elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
return elem->filter == &grpc_lame_filter;
}
} // namespace
void XdsClient::ChannelState::StartConnectivityWatchLocked() {
if (IsLameChannel(channel_)) {
xds_client()->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds client has a lame channel"));
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
watcher_ = new StateWatcher(WeakRef(DEBUG_LOCATION, "ChannelState+watch"));
client_channel->AddConnectivityWatcher(
GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
}
void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
if (IsLameChannel(channel_)) {
return;
}
ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}
void XdsClient::ChannelState::SubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name) {
if (ads_calld_ == nullptr) {
// Start the ADS call if this is the first request.
ads_calld_.reset(new RetryableCall<AdsCallState>(
WeakRef(DEBUG_LOCATION, "ChannelState+ads")));
// Note: AdsCallState's ctor will automatically subscribe to all
// resources that the XdsClient already has watchers for, so we can
// return here.
return;
}
// If the ADS call is in backoff state, we don't need to do anything now
// because when the call is restarted it will resend all necessary requests.
if (ads_calld() == nullptr) return;
// Subscribe to this resource if the ADS call is active.
ads_calld()->SubscribeLocked(type, name);
}
void XdsClient::ChannelState::UnsubscribeLocked(const XdsResourceType* type,
const XdsResourceName& name,
bool delay_unsubscription) {
if (ads_calld_ != nullptr) {
auto* calld = ads_calld_->calld();
if (calld != nullptr) {
calld->UnsubscribeLocked(type, name, delay_unsubscription);
if (!calld->HasSubscribedResources()) {
ads_calld_.reset();
}
}
}
}
//
// XdsClient::ChannelState::RetryableCall<>
//
template <typename T>
XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
WeakRefCountedPtr<ChannelState> chand)
: chand_(std::move(chand)),
backoff_(BackOff::Options()
.set_initial_backoff(Duration::Seconds(
GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(Duration::Seconds(
GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
// Closure Initialization
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
StartNewCallLocked();
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
shutting_down_ = true;
calld_.reset();
if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
const bool seen_response = calld_->seen_response();
calld_.reset();
if (seen_response) {
// If we lost connection to the xds server, reset backoff and restart the
// call immediately.
backoff_.Reset();
StartNewCallLocked();
} else {
// If we failed to connect to the xds server, retry later.
StartRetryTimerLocked();
}
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
if (shutting_down_) return;
GPR_ASSERT(chand_->channel_ != nullptr);
GPR_ASSERT(calld_ == nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] xds server %s: start new call from retryable call %p",
chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
}
calld_ = MakeOrphanable<T>(
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
if (shutting_down_) return;
const Timestamp next_attempt_time = backoff_.NextAttemptTime();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
Duration timeout =
std::max(next_attempt_time - ExecCtx::Get()->Now(), Duration::Zero());
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: call attempt failed; "
"retry timer will fire in %" PRId64 "ms.",
chand()->xds_client(), chand()->server_.server_uri.c_str(),
timeout.millis());
}
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
retry_timer_callback_pending_ = true;
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error_handle error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
{
MutexLock lock(&calld->chand_->xds_client()->mu_);
calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
}
calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: retry timer fired (retryable "
"call: %p)",
chand()->xds_client(), chand()->server_.server_uri.c_str(), this);
}
StartNewCallLocked();
}
GRPC_ERROR_UNREF(error);
}
//
// XdsClient::ChannelState::AdsCallState::AdsResponseParser
//
absl::Status XdsClient::ChannelState::AdsCallState::AdsResponseParser::
ProcessAdsResponseFields(AdsResponseFields fields) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] xds server %s: received ADS response: type_url=%s, "
"version=%s, nonce=%s, num_resources=%" PRIuPTR,
ads_call_state_->xds_client(),
ads_call_state_->chand()->server_.server_uri.c_str(),
fields.type_url.c_str(), fields.version.c_str(), fields.nonce.c_str(),
fields.num_resources);
}
result_.type =
ads_call_state_->xds_client()->GetResourceTypeLocked(fields.type_url);
if (result_.type == nullptr) {
return absl::InvalidArgumentError(
absl::StrCat("unknown resource type ", fields.type_url));
}
result_.type_url = std::move(fields.type_url);
result_.version = std::move(fields.version);
result_.nonce = std::move(fields.nonce);
return absl::OkStatus();
}
namespace {
// Build a resource metadata struct for ADS result accepting methods and CSDS.
XdsApi::ResourceMetadata CreateResourceMetadataAcked(
std::string serialized_proto, std::string version, Timestamp update_time) {
XdsApi::ResourceMetadata resource_metadata;
resource_metadata.serialized_proto = std::move(serialized_proto);
resource_metadata.update_time = update_time;
resource_metadata.version = std::move(version);
resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
return resource_metadata;
}
// Update resource_metadata for NACK.
void UpdateResourceMetadataNacked(const std::string& version,
const std::string& details,
Timestamp update_time,
XdsApi::ResourceMetadata* resource_metadata) {
resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
resource_metadata->failed_version = version;
resource_metadata->failed_details = details;
resource_metadata->failed_update_time = update_time;
}
} // namespace
void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
const XdsEncodingContext& context, size_t idx, absl::string_view type_url,
absl::string_view serialized_resource) {
// Check the type_url of the resource.
bool is_v2 = false;
if (!result_.type->IsType(type_url, &is_v2)) {
result_.errors.emplace_back(
absl::StrCat("resource index ", idx, ": incorrect resource type ",
type_url, " (should be ", result_.type_url, ")"));
return;
}
// Parse the resource.
absl::StatusOr<XdsResourceType::DecodeResult> result =
result_.type->Decode(context, serialized_resource, is_v2);
if (!result.ok()) {
result_.errors.emplace_back(
absl::StrCat("resource index ", idx, ": ", result.status().ToString()));
return;
}
// Check the resource name.
auto resource_name =
XdsClient::ParseXdsResourceName(result->name, result_.type);
if (!resource_name.ok()) {
result_.errors.emplace_back(absl::StrCat(
"resource index ", idx, ": Cannot parse xDS resource name \"",
result->name, "\""));
return;
}
// Cancel resource-does-not-exist timer, if needed.
auto timer_it = ads_call_state_->state_map_.find(result_.type);
if (timer_it != ads_call_state_->state_map_.end()) {
auto it =
timer_it->second.subscribed_resources.find(resource_name->authority);
if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(resource_name->key);
if (res_it != it->second.end()) {
res_it->second->MaybeCancelTimer();
}
}
}
// Lookup the authority in the cache.
auto authority_it =
xds_client()->authority_state_map_.find(resource_name->authority);
if (authority_it == xds_client()->authority_state_map_.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
// Found authority, so look up type.
AuthorityState& authority_state = authority_it->second;
auto type_it = authority_state.resource_map.find(result_.type);
if (type_it == authority_state.resource_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
auto& type_map = type_it->second;
// Found type, so look up resource key.
auto it = type_map.find(resource_name->key);
if (it == type_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
ResourceState& resource_state = it->second;
// If needed, record that we've seen this resource.
if (result_.type->AllResourcesRequiredInSotW()) {
result_.resources_seen[resource_name->authority].insert(resource_name->key);
}
// Update resource state based on whether the resource is valid.
if (!result->resource.ok()) {
result_.errors.emplace_back(absl::StrCat(
"resource index ", idx, ": ", result->name,
": validation error: ", result->resource.status().ToString()));
Notifier::ScheduleNotifyWatchersOnErrorInWorkSerializer(
xds_client(), resource_state.watchers,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"invalid resource: ", result->resource.status().ToString())),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
DEBUG_LOCATION);
UpdateResourceMetadataNacked(result_.version,
result->resource.status().ToString(),
update_time_, &resource_state.meta);
return;
}
// Resource is valid.
result_.have_valid_resources = true;
// If it didn't change, ignore it.
if (resource_state.resource != nullptr &&
result_.type->ResourcesEqual(resource_state.resource.get(),
result->resource->get())) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] %s resource %s identical to current, ignoring.",
xds_client(), result_.type_url.c_str(), result->name.c_str());
}
return;
}
// Update the resource state.
resource_state.resource = std::move(*result->resource);
resource_state.meta = CreateResourceMetadataAcked(
std::string(serialized_resource), result_.version, update_time_);
// Notify watchers.
auto& watchers_list = resource_state.watchers;
auto* value =
result_.type->CopyResource(resource_state.resource.get()).release();
xds_client()->work_serializer_.Schedule(
[watchers_list, value]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&xds_client()->work_serializer_) {
for (const auto& p : watchers_list) {
p.first->OnGenericResourceChanged(value);
}
delete value;
},
DEBUG_LOCATION);
}
//
// XdsClient::ChannelState::AdsCallState
//
XdsClient::ChannelState::AdsCallState::AdsCallState(
RefCountedPtr<RetryableCall<AdsCallState>> parent)
: InternallyRefCounted<AdsCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
? "AdsCallState"
: nullptr),
parent_(std::move(parent)) {
// Init the ADS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
// Create a call with the specified method name.
const char* method =
chand()->server_.ShouldUseV3()
? "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
"StreamAggregatedResources"
: "/envoy.service.discovery.v2.AggregatedDiscoveryService/"
"StreamAggregatedResources";
call_ = grpc_channel_create_pollset_set_call(
chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xds_client()->interested_parties_,
StaticSlice::FromStaticString(method).c_slice(), nullptr,
Timestamp::InfFuture(), nullptr);
GPR_ASSERT(call_ != nullptr);
// Init data associated with the call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: starting ADS call "
"(calld: %p, call: %p)",
xds_client(), chand()->server_.server_uri.c_str(), this, call_);
}
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), nullptr);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: send request message.
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
for (const auto& a : xds_client()->authority_state_map_) {
const std::string& authority = a.first;
// Skip authorities that are not using this xDS channel.
if (a.second.channel_state != chand()) continue;
for (const auto& t : a.second.resource_map) {
const XdsResourceType* type = t.first;
for (const auto& r : t.second) {
const XdsResourceKey& resource_key = r.first;
SubscribeLocked(type, {authority, resource_key});
}
}
}
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
XdsClient::ChannelState::AdsCallState::~AdsCallState() {
grpc_metadata_array_destroy(&initial_metadata_recv_);
grpc_metadata_array_destroy(&trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(status_details_);
GPR_ASSERT(call_ != nullptr);
grpc_call_unref(call_);
}
void XdsClient::ChannelState::AdsCallState::Orphan() {
GPR_ASSERT(call_ != nullptr);
// If we are here because xds_client wants to cancel the call,
// on_status_received_ will complete the cancellation and clean up. Otherwise,
// we are here because xds_client has to orphan a failed call, then the
// following cancellation will be a no-op.
grpc_call_cancel_internal(call_);
state_map_.clear();
// Note that the initial ref is hold by on_status_received_. So the
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
const XdsResourceType* type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
// Buffer message sending if an existing message is in flight.
if (send_message_payload_ != nullptr) {
buffered_requests_.insert(type);
return;
}
auto& state = state_map_[type];
grpc_slice request_payload_slice;
request_payload_slice = xds_client()->api_.CreateAdsRequest(
chand()->server_,
chand()->server_.ShouldUseV3() ? type->type_url() : type->v2_type_url(),
chand()->resource_type_version_map_[type], state.nonce,
ResourceNamesForRequest(type), GRPC_ERROR_REF(state.error),
!sent_initial_message_);
sent_initial_message_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: sending ADS request: type=%s "
"version=%s nonce=%s error=%s",
xds_client(), chand()->server_.server_uri.c_str(),
std::string(type->type_url()).c_str(),
chand()->resource_type_version_map_[type].c_str(),
state.nonce.c_str(), grpc_error_std_string(state.error).c_str());
}
GRPC_ERROR_UNREF(state.error);
state.error = GRPC_ERROR_NONE;
// Create message payload.
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Send the message.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_;
Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: error starting ADS send_message "
"batch on calld=%p: call_error=%d",
xds_client(), chand()->server_.server_uri.c_str(), this,
call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
const XdsResourceType* type, const XdsResourceName& name) {
auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
if (state == nullptr) {
state = MakeOrphanable<ResourceTimer>(type, name);
SendMessageLocked(type);
}
}
void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
const XdsResourceType* type, const XdsResourceName& name,
bool delay_unsubscription) {
auto& type_state_map = state_map_[type];
auto& authority_map = type_state_map.subscribed_resources[name.authority];
authority_map.erase(name.key);
if (authority_map.empty()) {
type_state_map.subscribed_resources.erase(name.authority);
}
if (!delay_unsubscription) SendMessageLocked(type);
}
bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
for (const auto& p : state_map_) {
if (!p.second.subscribed_resources.empty()) return true;
}
return false;
}
void XdsClient::ChannelState::AdsCallState::OnRequestSent(
void* arg, grpc_error_handle error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
{
MutexLock lock(&ads_calld->xds_client()->mu_);
ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
}
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
}
void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
grpc_error_handle error) {
if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
// Clean up the sent message.
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
// Continue to send another pending message if any.
// TODO(roth): The current code to handle buffered messages has the
// advantage of sending only the most recent list of resource names for
// each resource type (no matter how many times that resource type has
// been requested to send while the current message sending is still
// pending). But its disadvantage is that we send the requests in fixed
// order of resource types. We need to fix this if we are seeing some
// resource type(s) starved due to frequent requests of other resource
// type(s).
auto it = buffered_requests_.begin();
if (it != buffered_requests_.end()) {
SendMessageLocked(*it);
buffered_requests_.erase(it);
}
}
GRPC_ERROR_UNREF(error);
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
void* arg, grpc_error_handle /* error */) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
bool done;
{
MutexLock lock(&ads_calld->xds_client()->mu_);
done = ads_calld->OnResponseReceivedLocked();
}
ads_calld->xds_client()->work_serializer_.DrainQueue();
if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
}
bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
return true;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// Parse and validate the response.
AdsResponseParser parser(this);
absl::Status status = xds_client()->api_.ParseAdsResponse(
chand()->server_, response_slice, &parser);
grpc_slice_unref_internal(response_slice);
if (!status.ok()) {
// Ignore unparsable response.
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: error parsing ADS response (%s) "
"-- ignoring",
xds_client(), chand()->server_.server_uri.c_str(),
status.ToString().c_str());
} else {
AdsResponseParser::Result result = parser.TakeResult();
// Update nonce.
auto& state = state_map_[result.type];
state.nonce = result.nonce;
// If we got an error, set state.error so that we'll NACK the update.
if (!result.errors.empty()) {
std::string error = absl::StrJoin(result.errors, "; ");
gpr_log(
GPR_ERROR,
"[xds_client %p] xds server %s: ADS response invalid for resource "
"type %s version %s, will NACK: nonce=%s error=%s",
xds_client(), chand()->server_.server_uri.c_str(),
result.type_url.c_str(), result.version.c_str(), state.nonce.c_str(),
error.c_str());
GRPC_ERROR_UNREF(state.error);
state.error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_CPP_STRING(error),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
// Delete resources not seen in update if needed.
if (result.type->AllResourcesRequiredInSotW()) {
for (auto& a : xds_client()->authority_state_map_) {
const std::string& authority = a.first;
AuthorityState& authority_state = a.second;
// Skip authorities that are not using this xDS channel.
if (authority_state.channel_state != chand()) continue;
auto seen_authority_it = result.resources_seen.find(authority);
// Find this resource type.
auto type_it = authority_state.resource_map.find(result.type);
if (type_it == authority_state.resource_map.end()) continue;
// Iterate over resource ids.
for (auto& r : type_it->second) {
const XdsResourceKey& resource_key = r.first;
ResourceState& resource_state = r.second;
if (seen_authority_it == result.resources_seen.end() ||
seen_authority_it->second.find(resource_key) ==
seen_authority_it->second.end()) {
// If the resource was newly requested but has not yet been
// received, we don't want to generate an error for the watchers,
// because this ADS response may be in reaction to an earlier
// request that did not yet request the new resource, so its absence
// from the response does not necessarily indicate that the resource
// does not exist. For that case, we rely on the request timeout
// instead.
if (resource_state.resource == nullptr) continue;
resource_state.resource.reset();
Notifier::
ScheduleNotifyWatchersOnResourceDoesNotExistInWorkSerializer(
xds_client(), resource_state.watchers, DEBUG_LOCATION);
}
}
}
}
// If we had valid resources, update the version.
if (result.have_valid_resources) {
seen_response_ = true;
chand()->resource_type_version_map_[result.type] =
std::move(result.version);
// Start load reporting if needed.
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
}
// Send ACK or NACK.
SendMessageLocked(result.type);
}
if (xds_client()->shutting_down_) return true;
// Keep listening for updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(call_ != nullptr);
// Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
return false;
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
void* arg, grpc_error_handle error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
{
MutexLock lock(&ads_calld->xds_client()->mu_);
ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
}
ads_calld->xds_client()->work_serializer_.DrainQueue();
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
grpc_error_handle error) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: ADS call status received "
"(chand=%p, ads_calld=%p, call=%p): "
"status=%d, details='%s', error='%s'",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
call_, status_code_, status_details,
grpc_error_std_string(error).c_str());
gpr_free(status_details);
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
// Send error to all watchers.
xds_client()->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
}
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
// If the retryable ADS call is null (which only happens when the xds channel
// is shutting down), all the ADS calls are stale.
if (chand()->ads_calld_ == nullptr) return false;
return this == chand()->ads_calld_->calld();
}
std::vector<std::string>
XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
const XdsResourceType* type) {
std::vector<std::string> resource_names;
auto it = state_map_.find(type);
if (it != state_map_.end()) {
for (auto& a : it->second.subscribed_resources) {
const std::string& authority = a.first;
for (auto& p : a.second) {
const XdsResourceKey& resource_key = p.first;
resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
authority, type->type_url(), resource_key));
OrphanablePtr<ResourceTimer>& resource_timer = p.second;
resource_timer->MaybeStartTimer(Ref(DEBUG_LOCATION, "ResourceTimer"));
}
}
}
return resource_names;
}
//
// XdsClient::ChannelState::LrsCallState::Reporter
//
void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
if (next_report_timer_callback_pending_) {
grpc_timer_cancel(&next_report_timer_);
}
}
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
const Timestamp next_report_time = ExecCtx::Get()->Now() + report_interval_;
grpc_timer_init(&next_report_timer_, next_report_time,
&on_next_report_timer_);
next_report_timer_callback_pending_ = true;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error_handle error) {
Reporter* self = static_cast<Reporter*>(arg);
bool done;
{
MutexLock lock(&self->xds_client()->mu_);
done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
}
if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
}
bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
grpc_error_handle error) {
next_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
GRPC_ERROR_UNREF(error);
return true;
}
return SendReportLocked();
}
namespace {
bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
for (const auto& p : snapshot) {
const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
if (!cluster_snapshot.dropped_requests.IsZero()) return false;
for (const auto& q : cluster_snapshot.locality_stats) {
const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
if (!locality_snapshot.IsZero()) return false;
}
}
return true;
}
} // namespace
bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->chand()->server_,
parent_->send_all_clusters_,
parent_->cluster_names_);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
auto it = xds_client()->xds_load_report_server_map_.find(
parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.channel_state->StopLrsCallLocked();
return true;
}
ScheduleNextReportLocked();
return false;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsRequest(std::move(snapshot));
parent_->send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Send the report.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = parent_->send_message_payload_;
grpc_call_error call_error = grpc_call_start_batch_and_execute(
parent_->call_, &op, 1, &on_report_done_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: error starting LRS send_message "
"batch on calld=%p: call_error=%d",
xds_client(), parent_->chand()->server_.server_uri.c_str(), this,
call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
return false;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
void* arg, grpc_error_handle error) {
Reporter* self = static_cast<Reporter*>(arg);
bool done;
{
MutexLock lock(&self->xds_client()->mu_);
done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
}
if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
}
bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
grpc_error_handle error) {
grpc_byte_buffer_destroy(parent_->send_message_payload_);
parent_->send_message_payload_ = nullptr;
// If there are no more registered stats to report, cancel the call.
auto it =
xds_client()->xds_load_report_server_map_.find(parent_->chand()->server_);
if (it == xds_client()->xds_load_report_server_map_.end() ||
it->second.load_report_map.empty()) {
it->second.channel_state->StopLrsCallLocked();
GRPC_ERROR_UNREF(error);
return true;
}
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
GRPC_ERROR_UNREF(error);
// If this reporter is no longer the current one on the call, the reason
// might be that it was orphaned for a new one due to config update.
if (!IsCurrentReporterOnCall()) {
parent_->MaybeStartReportingLocked();
}
return true;
}
ScheduleNextReportLocked();
return false;
}
//
// XdsClient::ChannelState::LrsCallState
//
XdsClient::ChannelState::LrsCallState::LrsCallState(
RefCountedPtr<RetryableCall<LrsCallState>> parent)
: InternallyRefCounted<LrsCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
? "LrsCallState"
: nullptr),
parent_(std::move(parent)) {
// Init the LRS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
const char* method =
chand()->server_.ShouldUseV3()
? "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"
: "/envoy.service.load_stats.v2.LoadReportingService/StreamLoadStats";
call_ = grpc_channel_create_pollset_set_call(
chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xds_client()->interested_parties_,
Slice::FromStaticString(method).c_slice(), nullptr,
Timestamp::InfFuture(), nullptr);
GPR_ASSERT(call_ != nullptr);
// Init the request payload.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Init other data associated with the LRS call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] xds server %s: starting LRS call (calld=%p, call=%p)",
xds_client(), chand()->server_.server_uri.c_str(), this, call_);
}
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
// Op: send request message.
GPR_ASSERT(send_message_payload_ != nullptr);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = send_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
XdsClient::ChannelState::LrsCallState::~LrsCallState() {
grpc_metadata_array_destroy(&initial_metadata_recv_);
grpc_metadata_array_destroy(&trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(status_details_);
GPR_ASSERT(call_ != nullptr);
grpc_call_unref(call_);
}
void XdsClient::ChannelState::LrsCallState::Orphan() {
reporter_.reset();
GPR_ASSERT(call_ != nullptr);
// If we are here because xds_client wants to cancel the call,
// on_status_received_ will complete the cancellation and clean up. Otherwise,
// we are here because xds_client has to orphan a failed call, then the
// following cancellation will be a no-op.
grpc_call_cancel_internal(call_);
// Note that the initial ref is hold by on_status_received_. So the
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
// Don't start again if already started.
if (reporter_ != nullptr) return;
// Don't start if the previous send_message op (of the initial request or the
// last report of the previous reporter) hasn't completed.
if (send_message_payload_ != nullptr) return;
// Don't start if no LRS response has arrived.
if (!seen_response()) return;
// Don't start if the ADS call hasn't received any valid response. Note that
// this must be the first channel because it is the current channel but its
// ADS call hasn't seen any response.
if (chand()->ads_calld_ == nullptr ||
chand()->ads_calld_->calld() == nullptr ||
!chand()->ads_calld_->calld()->seen_response()) {
return;
}
// Start reporting.
reporter_ = MakeOrphanable<Reporter>(
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error_handle /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
lrs_calld->OnInitialRequestSentLocked();
}
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
// Clear the send_message_payload_.
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
MaybeStartReportingLocked();
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
void* arg, grpc_error_handle /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
bool done;
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
done = lrs_calld->OnResponseReceivedLocked();
}
if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
}
bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
return true;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// This anonymous lambda is a hack to avoid the usage of goto.
[&]() {
// Parse the response.
bool send_all_clusters = false;
std::set<std::string> new_cluster_names;
Duration new_load_reporting_interval;
grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse(
response_slice, &send_all_clusters, &new_cluster_names,
&new_load_reporting_interval);
if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"[xds_client %p] xds server %s: LRS response parsing failed: %s",
xds_client(), chand()->server_.server_uri.c_str(),
grpc_error_std_string(parse_error).c_str());
GRPC_ERROR_UNREF(parse_error);
return;
}
seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] xds server %s: LRS response received, %" PRIuPTR
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
"ms",
xds_client(), chand()->server_.server_uri.c_str(),
new_cluster_names.size(), send_all_clusters,
new_load_reporting_interval.millis());
size_t i = 0;
for (const auto& name : new_cluster_names) {
gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
xds_client(), i++, name.c_str());
}
}
if (new_load_reporting_interval <
Duration::Milliseconds(
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
new_load_reporting_interval = Duration::Milliseconds(
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: increased load_report_interval "
"to minimum value %dms",
xds_client(), chand()->server_.server_uri.c_str(),
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
}
}
// Ignore identical update.
if (send_all_clusters == send_all_clusters_ &&
cluster_names_ == new_cluster_names &&
load_reporting_interval_ == new_load_reporting_interval) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] xds server %s: incoming LRS response identical "
"to current, ignoring.",
xds_client(), chand()->server_.server_uri.c_str());
}
return;
}
// Stop current load reporting (if any) to adopt the new config.
reporter_.reset();
// Record the new config.
send_all_clusters_ = send_all_clusters;
cluster_names_ = std::move(new_cluster_names);
load_reporting_interval_ = new_load_reporting_interval;
// Try starting sending load report.
MaybeStartReportingLocked();
}();
grpc_slice_unref_internal(response_slice);
if (xds_client()->shutting_down_) return true;
// Keep listening for LRS config updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(call_ != nullptr);
// Reuse the "OnResponseReceivedLocked" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
return false;
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
void* arg, grpc_error_handle error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
}
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
grpc_error_handle error) {
GPR_ASSERT(call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] xds server %s: LRS call status received "
"(chand=%p, calld=%p, call=%p): "
"status=%d, details='%s', error='%s'",
xds_client(), chand()->server_.server_uri.c_str(), chand(), this,
call_, status_code_, status_details,
grpc_error_std_string(error).c_str());
gpr_free(status_details);
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
}
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
// If the retryable LRS call is null (which only happens when the xds channel
// is shutting down), all the LRS calls are stale.
if (chand()->lrs_calld_ == nullptr) return false;
return this == chand()->lrs_calld_->calld();
}
//
// XdsClient
//
namespace {
Duration GetRequestTimeout(const grpc_channel_args* args) {
return Duration::Milliseconds(grpc_channel_args_find_integer(
args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
{15000, 0, INT_MAX}));
}
grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
absl::InlinedVector<grpc_arg, 1> args_to_add = {
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
};
return grpc_channel_args_copy_and_add(args, args_to_add.data(),
args_to_add.size());
}
} // namespace
XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
const grpc_channel_args* args)
: DualRefCounted<XdsClient>(
GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
: nullptr),
bootstrap_(std::move(bootstrap)),
args_(ModifyChannelArgs(args)),
request_timeout_(GetRequestTimeout(args)),
interested_parties_(grpc_pollset_set_create()),
certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
bootstrap_->certificate_providers())),
api_(this, &grpc_xds_client_trace, bootstrap_->node(),
&bootstrap_->certificate_providers(), &symtab_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
// Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
// destroyed.
grpc_init();
}
XdsClient::~XdsClient() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
}
grpc_channel_args_destroy(args_);
grpc_pollset_set_destroy(interested_parties_);
// Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
// is destroyed.
grpc_shutdown();
}
void XdsClient::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
}
{
MutexLock lock(g_mu);
if (g_xds_client == this) g_xds_client = nullptr;
}
{
MutexLock lock(&mu_);
shutting_down_ = true;
// Clear cache and any remaining watchers that may not have been cancelled.
authority_state_map_.clear();
invalid_watchers_.clear();
}
}
RefCountedPtr<XdsClient::ChannelState> XdsClient::GetOrCreateChannelStateLocked(
const XdsBootstrap::XdsServer& server) {
auto it = xds_server_channel_map_.find(server);
if (it != xds_server_channel_map_.end()) {
return it->second->Ref(DEBUG_LOCATION, "Authority");
}
// Channel not found, so create a new one.
auto channel_state = MakeRefCounted<ChannelState>(
WeakRef(DEBUG_LOCATION, "ChannelState"), server);
xds_server_channel_map_[server] = channel_state.get();
return channel_state;
}
void XdsClient::WatchResource(const XdsResourceType* type,
absl::string_view name,
RefCountedPtr<ResourceWatcherInterface> watcher) {
ResourceWatcherInterface* w = watcher.get();
// Lambda for handling failure cases.
auto fail = [&](grpc_error_handle error) mutable {
{
MutexLock lock(&mu_);
MaybeRegisterResourceTypeLocked(type);
invalid_watchers_[w] = watcher;
}
work_serializer_.Run(
// TODO(yashykt): When we move to C++14, capture watcher using
// std::move()
[watcher, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnError(error);
},
DEBUG_LOCATION);
};
auto resource_name = ParseXdsResourceName(name, type);
if (!resource_name.ok()) {
fail(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrFormat(
"Unable to parse resource name for listener %s", name)));
return;
}
// Find server to use.
const XdsBootstrap::XdsServer* xds_server = nullptr;
absl::string_view authority_name = resource_name->authority;
if (absl::ConsumePrefix(&authority_name, "xdstp:")) {
auto* authority = bootstrap_->LookupAuthority(std::string(authority_name));
if (authority == nullptr) {
fail(GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("authority \"", authority_name,
"\" not present in bootstrap config")));
return;
}
if (!authority->xds_servers.empty()) {
xds_server = &authority->xds_servers[0];
}
}
if (xds_server == nullptr) xds_server = &bootstrap_->server();
{
MutexLock lock(&mu_);
MaybeRegisterResourceTypeLocked(type);
AuthorityState& authority_state =
authority_state_map_[resource_name->authority];
ResourceState& resource_state =
authority_state.resource_map[type][resource_name->key];
resource_state.watchers[w] = watcher;
// If we already have a cached value for the resource, notify the new
// watcher immediately.
if (resource_state.resource != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] returning cached listener data for %s", this,
std::string(name).c_str());
}
auto* value = type->CopyResource(resource_state.resource.get()).release();
work_serializer_.Schedule(
[watcher, value]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnGenericResourceChanged(value);
delete value;
},
DEBUG_LOCATION);
}
// If the authority doesn't yet have a channel, set it, creating it if
// needed.
if (authority_state.channel_state == nullptr) {
authority_state.channel_state =
GetOrCreateChannelStateLocked(*xds_server);
}
authority_state.channel_state->SubscribeLocked(type, *resource_name);
}
work_serializer_.DrainQueue();
}
void XdsClient::CancelResourceWatch(const XdsResourceType* type,
absl::string_view name,
ResourceWatcherInterface* watcher,
bool delay_unsubscription) {
auto resource_name = ParseXdsResourceName(name, type);
MutexLock lock(&mu_);
if (!resource_name.ok()) {
invalid_watchers_.erase(watcher);
return;
}
if (shutting_down_) return;
// Find authority.
auto authority_it = authority_state_map_.find(resource_name->authority);
if (authority_it == authority_state_map_.end()) return;
AuthorityState& authority_state = authority_it->second;
// Find type map.
auto type_it = authority_state.resource_map.find(type);
if (type_it == authority_state.resource_map.end()) return;
auto& type_map = type_it->second;
// Find resource key.
auto resource_it = type_map.find(resource_name->key);
if (resource_it == type_map.end()) return;
ResourceState& resource_state = resource_it->second;
// Remove watcher.
resource_state.watchers.erase(watcher);
// Clean up empty map entries, if any.
if (resource_state.watchers.empty()) {
authority_state.channel_state->UnsubscribeLocked(type, *resource_name,
delay_unsubscription);
type_map.erase(resource_it);
if (type_map.empty()) {
authority_state.resource_map.erase(type_it);
if (authority_state.resource_map.empty()) {
authority_state.channel_state.reset();
}
}
}
}
void XdsClient::MaybeRegisterResourceTypeLocked(
const XdsResourceType* resource_type) {
auto it = resource_types_.find(resource_type->type_url());
if (it != resource_types_.end()) {
GPR_ASSERT(it->second == resource_type);
return;
}
resource_types_.emplace(resource_type->type_url(), resource_type);
v2_resource_types_.emplace(resource_type->v2_type_url(), resource_type);
resource_type->InitUpbSymtab(symtab_.ptr());
}
const XdsResourceType* XdsClient::GetResourceTypeLocked(
absl::string_view resource_type) {
auto it = resource_types_.find(resource_type);
if (it != resource_types_.end()) return it->second;
auto it2 = v2_resource_types_.find(resource_type);
if (it2 != v2_resource_types_.end()) return it2->second;
return nullptr;
}
absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
absl::string_view name, const XdsResourceType* type) {
// Old-style names use the empty string for authority.
// authority is prefixed with "old:" to indicate that it's an old-style name.
if (!absl::StartsWith(name, "xdstp:")) {
return XdsResourceName{"old:", {std::string(name), {}}};
}
// New style name. Parse URI.
auto uri = URI::Parse(name);
if (!uri.ok()) return uri.status();
// Split the resource type off of the path to get the id.
std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
if (!type->IsType(path_parts.first, nullptr)) {
return absl::InvalidArgumentError(
"xdstp URI path must indicate valid xDS resource type");
}
// Canonicalize order of query params.
std::vector<URI::QueryParam> query_params;
for (const auto& p : uri->query_parameter_map()) {
query_params.emplace_back(
URI::QueryParam{std::string(p.first), std::string(p.second)});
}
return XdsResourceName{
absl::StrCat("xdstp:", uri->authority()),
{std::string(path_parts.second), std::move(query_params)}};
}
std::string XdsClient::ConstructFullXdsResourceName(
absl::string_view authority, absl::string_view resource_type,
const XdsResourceKey& key) {
if (absl::ConsumePrefix(&authority, "xdstp:")) {
auto uri = URI::Create("xdstp", std::string(authority),
absl::StrCat("/", resource_type, "/", key.id),
key.query_params, /*fragment=*/"");
GPR_ASSERT(uri.ok());
return uri->ToString();
}
// Old-style name.
return key.id;
}
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name) {
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the const
// XdsBootstrap::XdsServer& and absl::string_views
// stored in the XdsClusterDropStats object point to the
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime.
auto server_it =
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first;
LoadReportState& load_report_state = load_report_it->second;
RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
if (load_report_state.drop_stats != nullptr) {
cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
}
if (cluster_drop_stats == nullptr) {
if (load_report_state.drop_stats != nullptr) {
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
}
cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), server_it->first,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/);
load_report_state.drop_stats = cluster_drop_stats.get();
}
server_it->second.channel_state->MaybeStartLrsCall();
return cluster_drop_stats;
}
void XdsClient::RemoveClusterDropStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = load_report_it->second;
if (load_report_state.drop_stats == cluster_drop_stats) {
// Record final snapshot in deleted_drop_stats, which will be
// added to the next load report.
load_report_state.deleted_drop_stats +=
load_report_state.drop_stats->GetSnapshotAndReset();
load_report_state.drop_stats = nullptr;
}
}
RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
if (!bootstrap_->XdsServerExists(xds_server)) return nullptr;
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the const
// XdsBootstrap::XdsServer& and absl::string_views
// stored in the XdsClusterDropStats object point to the
// XdsBootstrap::XdsServer and strings
// in the load_report_map_ key, so that they have the same lifetime.
auto server_it =
xds_load_report_server_map_.emplace(xds_server, LoadReportServer()).first;
if (server_it->second.channel_state == nullptr) {
server_it->second.channel_state = GetOrCreateChannelStateLocked(xds_server);
}
auto load_report_it = server_it->second.load_report_map
.emplace(std::move(key), LoadReportState())
.first;
LoadReportState& load_report_state = load_report_it->second;
LoadReportState::LocalityState& locality_state =
load_report_state.locality_stats[locality];
RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
if (locality_state.locality_stats != nullptr) {
cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
}
if (cluster_locality_stats == nullptr) {
if (locality_state.locality_stats != nullptr) {
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
}
cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/, std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get();
}
server_it->second.channel_state->MaybeStartLrsCall();
return cluster_locality_stats;
}
void XdsClient::RemoveClusterLocalityStats(
const XdsBootstrap::XdsServer& xds_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
auto server_it = xds_load_report_server_map_.find(xds_server);
if (server_it == xds_load_report_server_map_.end()) return;
auto load_report_it = server_it->second.load_report_map.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == server_it->second.load_report_map.end()) return;
LoadReportState& load_report_state = load_report_it->second;
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
LoadReportState::LocalityState& locality_state = locality_it->second;
if (locality_state.locality_stats == cluster_locality_stats) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
locality_state.locality_stats = nullptr;
}
}
void XdsClient::ResetBackoff() {
MutexLock lock(&mu_);
for (auto& p : xds_server_channel_map_) {
grpc_channel_reset_connect_backoff(p.second->channel());
}
}
void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
std::set<RefCountedPtr<ResourceWatcherInterface>> watchers;
for (const auto& a : authority_state_map_) { // authority
for (const auto& t : a.second.resource_map) { // type
for (const auto& r : t.second) { // resource id
for (const auto& w : r.second.watchers) { // watchers
watchers.insert(w.second);
}
}
}
}
work_serializer_.Schedule(
// TODO(yashykt): When we move to C++14, capture watchers using
// std::move()
[watchers, error]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_) {
for (const auto& watcher : watchers) {
watcher->OnError(GRPC_ERROR_REF(error));
}
GRPC_ERROR_UNREF(error);
},
DEBUG_LOCATION);
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
const XdsBootstrap::XdsServer& xds_server, bool send_all_clusters,
const std::set<std::string>& clusters) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
}
XdsApi::ClusterLoadReportMap snapshot_map;
auto server_it = xds_load_report_server_map_.find(xds_server);
if (server_it == xds_load_report_server_map_.end()) return snapshot_map;
auto& load_report_map = server_it->second.load_report_map;
for (auto load_report_it = load_report_map.begin();
load_report_it != load_report_map.end();) {
// Cluster key is cluster and EDS service name.
const auto& cluster_key = load_report_it->first;
LoadReportState& load_report = load_report_it->second;
// If the CDS response for a cluster indicates to use LRS but the
// LRS server does not say that it wants reports for this cluster,
// then we'll have stats objects here whose data we're not going to
// include in the load report. However, we still need to clear out
// the data from the stats objects, so that if the LRS server starts
// asking for the data in the future, we don't incorrectly include
// data from previous reporting intervals in that future report.
const bool record_stats =
send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
if (load_report.drop_stats != nullptr) {
snapshot.dropped_requests +=
load_report.drop_stats->GetSnapshotAndReset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
this, cluster_key.first.c_str(), cluster_key.second.c_str(),
load_report.drop_stats);
}
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
it != load_report.locality_stats.end();) {
const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
locality_snapshot = std::move(locality_state.deleted_locality_stats);
if (locality_state.locality_stats != nullptr) {
locality_snapshot +=
locality_state.locality_stats->GetSnapshotAndReset();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] cluster=%s eds_service_name=%s "
"locality=%s locality_stats=%p",
this, cluster_key.first.c_str(), cluster_key.second.c_str(),
locality_name->AsHumanReadableString().c_str(),
locality_state.locality_stats);
}
}
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats == nullptr) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
// Compute load report interval.
const Timestamp now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
if (record_stats) {
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() &&
load_report.drop_stats == nullptr) {
load_report_it = load_report_map.erase(load_report_it);
} else {
++load_report_it;
}
}
return snapshot_map;
}
std::string XdsClient::DumpClientConfigBinary() {
MutexLock lock(&mu_);
XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
for (const auto& a : authority_state_map_) { // authority
const std::string& authority = a.first;
for (const auto& t : a.second.resource_map) { // type
const XdsResourceType* type = t.first;
auto& resource_metadata_map =
resource_type_metadata_map[type->type_url()];
for (const auto& r : t.second) { // resource id
const XdsResourceKey& resource_key = r.first;
const ResourceState& resource_state = r.second;
resource_metadata_map[ConstructFullXdsResourceName(
authority, type->type_url(), resource_key)] = &resource_state.meta;
}
}
}
// Assemble config dump messages
return api_.AssembleClientConfig(resource_type_metadata_map);
}
//
// accessors for global state
//
void XdsClientGlobalInit() {
g_mu = new Mutex;
XdsHttpFilterRegistry::Init();
}
// TODO(roth): Find a better way to clear the fallback config that does
// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
gpr_free(g_fallback_bootstrap_config);
g_fallback_bootstrap_config = nullptr;
delete g_mu;
g_mu = nullptr;
XdsHttpFilterRegistry::Shutdown();
}
namespace {
std::string GetBootstrapContents(const char* fallback_config,
grpc_error_handle* error) {
// First, try GRPC_XDS_BOOTSTRAP env var.
UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
if (path != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
"environment variable: %s",
path.get());
}
grpc_slice contents;
*error =
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
if (*error != GRPC_ERROR_NONE) return "";
std::string contents_str(StringViewFromSlice(contents));
grpc_slice_unref_internal(contents);
return contents_str;
}
// Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
UniquePtr<char> env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
if (env_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
"environment variable");
}
return env_config.get();
}
// Finally, try fallback config.
if (fallback_config != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
}
return fallback_config;
}
// No bootstrap config found.
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
"not defined");
return "";
}
} // namespace
RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
grpc_error_handle* error) {
RefCountedPtr<XdsClient> xds_client;
// If getting bootstrap from channel args, create a local XdsClient
// instance for the channel or server instead of using the global instance.
const char* bootstrap_config = grpc_channel_args_find_string(
args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
if (bootstrap_config != nullptr) {
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(bootstrap_config, error);
if (*error == GRPC_ERROR_NONE) {
grpc_channel_args* xds_channel_args =
grpc_channel_args_find_pointer<grpc_channel_args>(
args,
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
}
return nullptr;
}
// Otherwise, use the global instance.
{
MutexLock lock(g_mu);
if (g_xds_client != nullptr) {
auto xds_client = g_xds_client->RefIfNonZero();
if (xds_client != nullptr) return xds_client;
}
// Find bootstrap contents.
std::string bootstrap_contents =
GetBootstrapContents(g_fallback_bootstrap_config, error);
if (*error != GRPC_ERROR_NONE) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
bootstrap_contents.c_str());
}
// Parse bootstrap.
std::unique_ptr<XdsBootstrap> bootstrap =
XdsBootstrap::Create(bootstrap_contents, error);
if (*error != GRPC_ERROR_NONE) return nullptr;
// Instantiate XdsClient.
xds_client =
MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
g_xds_client = xds_client.get();
}
return xds_client;
}
namespace internal {
void SetXdsChannelArgsForTest(grpc_channel_args* args) {
MutexLock lock(g_mu);
g_channel_args = args;
}
void UnsetGlobalXdsClientForTest() {
MutexLock lock(g_mu);
g_xds_client = nullptr;
}
void SetXdsFallbackBootstrapConfig(const char* config) {
MutexLock lock(g_mu);
gpr_free(g_fallback_bootstrap_config);
g_fallback_bootstrap_config = gpr_strdup(config);
}
} // namespace internal
//
// embedding XdsClient in channel args
//
#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
namespace {
void* XdsClientArgCopy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
return p;
}
void XdsClientArgDestroy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Unref(DEBUG_LOCATION, "channel arg");
}
int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); }
const grpc_arg_pointer_vtable kXdsClientArgVtable = {
XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
} // namespace
grpc_arg XdsClient::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
const_cast<XdsClient*>(this),
&kXdsClientArgVtable);
}
RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
const grpc_channel_args& args) {
XdsClient* xds_client =
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
if (xds_client == nullptr) return nullptr;
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
}
} // namespace grpc_core
// The returned bytes may contain NULL(0), so we can't use c-string.
grpc_slice grpc_dump_xds_configs() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error_handle error = GRPC_ERROR_NONE;
auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
if (error != GRPC_ERROR_NONE) {
// If we isn't using xDS, just return an empty string.
GRPC_ERROR_UNREF(error);
return grpc_empty_slice();
}
return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());
}