blob: e3d9ced4c34e0726b118c8765ca3ff2774167943 [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/string_view.h"
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/xds/xds_api.h"
#include "src/core/ext/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_XDS_RECONNECT_JITTER 0.2
#define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
namespace grpc_core {
TraceFlag grpc_xds_client_trace(false, "xds_client");
//
// Internal class declarations
//
// An xds call wrapper that can restart a call upon failure. Holds a ref to
// the xds channel. The template parameter is the kind of wrapped xds call.
template <typename T>
class XdsClient::ChannelState::RetryableCall
: public InternallyRefCounted<RetryableCall<T>> {
public:
explicit RetryableCall(RefCountedPtr<ChannelState> chand);
void Orphan() override;
void OnCallFinishedLocked();
T* calld() const { return calld_.get(); }
ChannelState* chand() const { return chand_.get(); }
bool IsCurrentCallOnChannel() const;
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimer(void* arg, grpc_error* error);
void OnRetryTimerLocked(grpc_error* error);
// The wrapped xds call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
OrphanablePtr<T> calld_;
// The owning xds channel.
RefCountedPtr<ChannelState> chand_;
// Retry state.
BackOff backoff_;
grpc_timer retry_timer_;
grpc_closure on_retry_timer_;
bool retry_timer_callback_pending_ = false;
bool shutting_down_ = false;
};
// Contains an ADS call to the xds server.
class XdsClient::ChannelState::AdsCallState
: public InternallyRefCounted<AdsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
~AdsCallState() override;
void Orphan() override;
RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
void Subscribe(const std::string& type_url, const std::string& name);
void Unsubscribe(const std::string& type_url, const std::string& name,
bool delay_unsubscription);
bool HasSubscribedResources() const;
private:
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
ResourceState(const std::string& type_url, const std::string& name)
: type_url_(type_url), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
void Orphan() override {
Finish();
Unref(DEBUG_LOCATION, "Orphan");
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
if (sent_) return;
sent_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
grpc_timer_init(
&timer_,
ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
&timer_callback_);
}
void Finish() {
if (timer_pending_) {
grpc_timer_cancel(&timer_);
timer_pending_ = false;
}
}
private:
static void OnTimer(void* arg, grpc_error* error) {
ResourceState* self = static_cast<ResourceState*>(arg);
{
MutexLock lock(&self->ads_calld_->xds_client()->mu_);
self->OnTimerLocked(GRPC_ERROR_REF(error));
}
self->ads_calld_.reset();
self->Unref(DEBUG_LOCATION, "timer");
}
void OnTimerLocked(grpc_error* error) {
if (error == GRPC_ERROR_NONE && timer_pending_) {
timer_pending_ = false;
grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrFormat(
"timeout obtaining resource {type=%s name=%s} from xds server",
type_url_, name_)
.c_str());
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
grpc_error_string(watcher_error));
}
if (type_url_ == XdsApi::kLdsTypeUrl) {
ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
} else if (type_url_ == XdsApi::kRdsTypeUrl) {
RouteConfigState& state =
ads_calld_->xds_client()->route_config_map_[name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
} else if (type_url_ == XdsApi::kCdsTypeUrl) {
ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
} else if (type_url_ == XdsApi::kEdsTypeUrl) {
EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
for (const auto& p : state.watchers) {
p.first->OnError(GRPC_ERROR_REF(watcher_error));
}
} else {
GPR_UNREACHABLE_CODE(return );
}
GRPC_ERROR_UNREF(watcher_error);
}
GRPC_ERROR_UNREF(error);
}
const std::string type_url_;
const std::string name_;
RefCountedPtr<AdsCallState> ads_calld_;
bool sent_ = false;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
};
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
// Version, nonce, and error for this resource type.
std::string version;
std::string nonce;
grpc_error* error = GRPC_ERROR_NONE;
// Subscribed resources of this type.
std::map<std::string /* name */, OrphanablePtr<ResourceState>>
subscribed_resources;
};
void SendMessageLocked(const std::string& type_url);
void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
static void OnRequestSent(void* arg, grpc_error* error);
void OnRequestSentLocked(grpc_error* error);
static void OnResponseReceived(void* arg, grpc_error* error);
bool OnResponseReceivedLocked();
static void OnStatusReceived(void* arg, grpc_error* error);
void OnStatusReceivedLocked(grpc_error* error);
bool IsCurrentCallOnChannel() const;
std::set<absl::string_view> ResourceNamesForRequest(
const std::string& type_url);
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<AdsCallState>> parent_;
bool sent_initial_message_ = false;
bool seen_response_ = false;
// Always non-NULL.
grpc_call* call_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// Resource types for which requests need to be sent.
std::set<std::string /*type_url*/> buffered_requests_;
// State for each resource type.
std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
};
// Contains an LRS call to the xds server.
class XdsClient::ChannelState::LrsCallState
: public InternallyRefCounted<LrsCallState> {
public:
// The ctor and dtor should not be used directly.
explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
~LrsCallState() override;
void Orphan() override;
void MaybeStartReportingLocked();
RetryableCall<LrsCallState>* parent() { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }
private:
// Reports client-side load stats according to a fixed interval.
class Reporter : public InternallyRefCounted<Reporter> {
public:
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
grpc_schedule_on_exec_ctx);
ScheduleNextReportLocked();
}
void Orphan() override;
private:
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
bool OnNextReportTimerLocked(grpc_error* error);
void SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
bool OnReportDoneLocked(grpc_error* error);
bool IsCurrentReporterOnCall() const {
return this == parent_->reporter_.get();
}
XdsClient* xds_client() const { return parent_->xds_client(); }
// The owning LRS call.
RefCountedPtr<LrsCallState> parent_;
// The load reporting state.
const grpc_millis report_interval_;
bool last_report_counters_were_zero_ = false;
bool next_report_timer_callback_pending_ = false;
grpc_timer next_report_timer_;
grpc_closure on_next_report_timer_;
grpc_closure on_report_done_;
};
static void OnInitialRequestSent(void* arg, grpc_error* error);
void OnInitialRequestSentLocked();
static void OnResponseReceived(void* arg, grpc_error* error);
bool OnResponseReceivedLocked();
static void OnStatusReceived(void* arg, grpc_error* error);
void OnStatusReceivedLocked(grpc_error* error);
bool IsCurrentCallOnChannel() const;
// The owning RetryableCall<>.
RefCountedPtr<RetryableCall<LrsCallState>> parent_;
bool seen_response_ = false;
// Always non-NULL.
grpc_call* call_;
// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure on_initial_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure on_response_received_;
// recv_trailing_metadata
grpc_metadata_array trailing_metadata_recv_;
grpc_status_code status_code_;
grpc_slice status_details_;
grpc_closure on_status_received_;
// Load reporting state.
bool send_all_clusters_ = false;
std::set<std::string> cluster_names_; // Asked for by the LRS server.
grpc_millis load_reporting_interval_ = 0;
OrphanablePtr<Reporter> reporter_;
};
//
// XdsClient::ChannelState::StateWatcher
//
class XdsClient::ChannelState::StateWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(RefCountedPtr<ChannelState> parent)
: parent_(std::move(parent)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
MutexLock lock(&parent_->xds_client_->mu_);
if (!parent_->shutting_down_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Notify all watchers of error.
gpr_log(GPR_INFO,
"[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
"status_message:(%s)",
parent_->xds_client(), status.ToString().c_str());
parent_->xds_client()->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds channel in TRANSIENT_FAILURE"));
}
}
RefCountedPtr<ChannelState> parent_;
};
//
// XdsClient::ChannelState
//
namespace {
// Returns the channel args for the xds channel.
grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) {
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME,
// The service config that contains the LB config. We don't want to
// recursively use xds in the LB channel.
GRPC_ARG_SERVICE_CONFIG,
// The channel arg for the server URI, since that will be different for
// the xds channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI,
// The xds channel should use the authority indicated by the target
// authority table (see \a ModifyXdsChannelArgs),
// as opposed to the authority from the parent channel.
GRPC_ARG_DEFAULT_AUTHORITY,
// Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the xds channel should be
// treated as a stand-alone channel and not inherit this argument from the
// args of the parent channel.
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
// Don't want to pass down channelz node from parent; the balancer
// channel will get its own.
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
// Keepalive interval. We are explicitly setting our own value below.
GRPC_ARG_KEEPALIVE_TIME_MS,
};
// Channel args to add.
absl::InlinedVector<grpc_arg, 3> args_to_add = {
// Keepalive interval.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
5 * 60 * GPR_MS_PER_SEC),
// Tell channelz this is an internal channel.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
// A channel arg indicating that the target is an xds server.
// TODO(roth): Once we figure out our fallback and credentials story,
// decide whether this is actually needed. Note that it's currently
// used by the fake security connector as well.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1),
};
// Construct channel args.
return grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
}
} // namespace
XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client,
grpc_channel* channel)
: InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
xds_client_(std::move(xds_client)),
channel_(channel) {
GPR_ASSERT(channel_ != nullptr);
StartConnectivityWatchLocked();
}
XdsClient::ChannelState::~ChannelState() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
this);
}
grpc_channel_destroy(channel_);
xds_client_.reset(DEBUG_LOCATION, "ChannelState");
}
void XdsClient::ChannelState::Orphan() {
shutting_down_ = true;
CancelConnectivityWatchLocked();
ads_calld_.reset();
lrs_calld_.reset();
Unref(DEBUG_LOCATION, "ChannelState+orphaned");
}
XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
const {
return ads_calld_->calld();
}
XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
const {
return lrs_calld_->calld();
}
bool XdsClient::ChannelState::HasActiveAdsCall() const {
return ads_calld_->calld() != nullptr;
}
void XdsClient::ChannelState::MaybeStartLrsCall() {
if (lrs_calld_ != nullptr) return;
lrs_calld_.reset(
new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
}
void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
void XdsClient::ChannelState::StartConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
grpc_client_channel_start_connectivity_watch(
client_channel_elem, GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
}
void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
}
void XdsClient::ChannelState::Subscribe(const std::string& type_url,
const std::string& name) {
if (ads_calld_ == nullptr) {
// Start the ADS call if this is the first request.
ads_calld_.reset(new RetryableCall<AdsCallState>(
Ref(DEBUG_LOCATION, "ChannelState+ads")));
// Note: AdsCallState's ctor will automatically subscribe to all
// resources that the XdsClient already has watchers for, so we can
// return here.
return;
}
// If the ADS call is in backoff state, we don't need to do anything now
// because when the call is restarted it will resend all necessary requests.
if (ads_calld() == nullptr) return;
// Subscribe to this resource if the ADS call is active.
ads_calld()->Subscribe(type_url, name);
}
void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
const std::string& name,
bool delay_unsubscription) {
if (ads_calld_ != nullptr) {
auto* calld = ads_calld_->calld();
if (calld != nullptr) {
calld->Unsubscribe(type_url, name, delay_unsubscription);
if (!calld->HasSubscribedResources()) ads_calld_.reset();
}
}
}
//
// XdsClient::ChannelState::RetryableCall<>
//
template <typename T>
XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
RefCountedPtr<ChannelState> chand)
: chand_(std::move(chand)),
backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
1000)
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
// Closure Initialization
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
StartNewCallLocked();
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
shutting_down_ = true;
calld_.reset();
if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
const bool seen_response = calld_->seen_response();
calld_.reset();
if (seen_response) {
// If we lost connection to the xds server, reset backoff and restart the
// call immediately.
backoff_.Reset();
StartNewCallLocked();
} else {
// If we failed to connect to the xds server, retry later.
StartRetryTimerLocked();
}
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
if (shutting_down_) return;
GPR_ASSERT(chand_->channel_ != nullptr);
GPR_ASSERT(calld_ == nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Start new call from retryable call (chand: %p, "
"retryable call: %p)",
chand()->xds_client(), chand(), this);
}
calld_ = MakeOrphanable<T>(
this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
if (shutting_down_) return;
const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
gpr_log(GPR_INFO,
"[xds_client %p] Failed to connect to xds server (chand: %p) "
"retry timer will fire in %" PRId64 "ms.",
chand()->xds_client(), chand(), timeout);
}
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
retry_timer_callback_pending_ = true;
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
{
MutexLock lock(&calld->chand_->xds_client()->mu_);
calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
}
calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
grpc_error* error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
chand()->xds_client(), chand(), this);
}
StartNewCallLocked();
}
GRPC_ERROR_UNREF(error);
}
//
// XdsClient::ChannelState::AdsCallState
//
XdsClient::ChannelState::AdsCallState::AdsCallState(
RefCountedPtr<RetryableCall<AdsCallState>> parent)
: InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
parent_(std::move(parent)) {
// Init the ADS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
// Create a call with the specified method name.
const auto& method =
xds_client()->bootstrap_->server().ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
call_ = grpc_channel_create_pollset_set_call(
chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xds_client()->interested_parties_, method, nullptr,
GRPC_MILLIS_INF_FUTURE, nullptr);
GPR_ASSERT(call_ != nullptr);
// Init data associated with the call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
"call: %p)",
xds_client(), chand(), this, call_);
}
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: send request message.
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
for (const auto& p : xds_client()->listener_map_) {
Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
}
for (const auto& p : xds_client()->route_config_map_) {
Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
}
for (const auto& p : xds_client()->cluster_map_) {
Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
}
for (const auto& p : xds_client()->endpoint_map_) {
Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
}
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
XdsClient::ChannelState::AdsCallState::~AdsCallState() {
grpc_metadata_array_destroy(&initial_metadata_recv_);
grpc_metadata_array_destroy(&trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(status_details_);
GPR_ASSERT(call_ != nullptr);
grpc_call_unref(call_);
}
void XdsClient::ChannelState::AdsCallState::Orphan() {
GPR_ASSERT(call_ != nullptr);
// If we are here because xds_client wants to cancel the call,
// on_status_received_ will complete the cancellation and clean up. Otherwise,
// we are here because xds_client has to orphan a failed call, then the
// following cancellation will be a no-op.
grpc_call_cancel_internal(call_);
state_map_.clear();
// Note that the initial ref is hold by on_status_received_. So the
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
const std::string& type_url) {
// Buffer message sending if an existing message is in flight.
if (send_message_payload_ != nullptr) {
buffered_requests_.insert(type_url);
return;
}
auto& state = state_map_[type_url];
grpc_slice request_payload_slice;
std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
type_url, resource_names, state.version, state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url);
}
sent_initial_message_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
"error=%s resources=%s",
xds_client(), type_url.c_str(), state.version.c_str(),
state.nonce.c_str(), grpc_error_string(state.error),
absl::StrJoin(resource_names, " ").c_str());
}
GRPC_ERROR_UNREF(state.error);
state.error = GRPC_ERROR_NONE;
// Create message payload.
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Send the message.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_;
Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[xds_client %p] calld=%p call_error=%d sending ADS message",
xds_client(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
void XdsClient::ChannelState::AdsCallState::Subscribe(
const std::string& type_url, const std::string& name) {
auto& state = state_map_[type_url].subscribed_resources[name];
if (state == nullptr) {
state = MakeOrphanable<ResourceState>(type_url, name);
SendMessageLocked(type_url);
}
}
void XdsClient::ChannelState::AdsCallState::Unsubscribe(
const std::string& type_url, const std::string& name,
bool delay_unsubscription) {
state_map_[type_url].subscribed_resources.erase(name);
if (!delay_unsubscription) SendMessageLocked(type_url);
}
bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
for (const auto& p : state_map_) {
if (!p.second.subscribed_resources.empty()) return true;
}
return false;
}
void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
XdsApi::LdsUpdateMap lds_update_map) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update received containing %" PRIuPTR
" resources",
xds_client(), lds_update_map.size());
}
auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
std::set<std::string> rds_resource_names_seen;
for (auto& p : lds_update_map) {
const std::string& listener_name = p.first;
XdsApi::LdsUpdate& lds_update = p.second;
auto& state = lds_state.subscribed_resources[listener_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
xds_client(), listener_name.c_str(),
(!lds_update.route_config_name.empty()
? lds_update.route_config_name.c_str()
: "<inlined>"));
if (lds_update.rds_update.has_value()) {
gpr_log(GPR_INFO, "RouteConfiguration: %s",
lds_update.rds_update->ToString().c_str());
}
}
// Record the RDS resource names seen.
if (!lds_update.route_config_name.empty()) {
rds_resource_names_seen.insert(lds_update.route_config_name);
}
// Ignore identical update.
ListenerState& listener_state = xds_client()->listener_map_[listener_name];
if (listener_state.update.has_value() &&
*listener_state.update == lds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LDS update for %s identical to current, "
"ignoring.",
xds_client(), listener_name.c_str());
}
continue;
}
// Update the listener state.
listener_state.update = std::move(lds_update);
// Notify watchers.
for (const auto& p : listener_state.watchers) {
p.first->OnListenerChanged(*listener_state.update);
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : lds_state.subscribed_resources) {
const std::string& listener_name = p.first;
if (lds_update_map.find(listener_name) == lds_update_map.end()) {
ListenerState& listener_state =
xds_client()->listener_map_[listener_name];
// If the resource was newly requested but has not yet been received,
// we don't want to generate an error for the watchers, because this LDS
// response may be in reaction to an earlier request that did not yet
// request the new resource, so its absence from the response does not
// necessarily indicate that the resource does not exist.
// For that case, we rely on the request timeout instead.
if (!listener_state.update.has_value()) continue;
listener_state.update.reset();
for (const auto& p : listener_state.watchers) {
p.first->OnResourceDoesNotExist();
}
}
}
// For any RDS resource that is no longer referred to by any LDS
// resources, remove it from the cache and notify watchers that it
// does not exist.
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
for (const auto& p : rds_state.subscribed_resources) {
const std::string& rds_resource_name = p.first;
if (rds_resource_names_seen.find(rds_resource_name) ==
rds_resource_names_seen.end()) {
RouteConfigState& route_config_state =
xds_client()->route_config_map_[rds_resource_name];
route_config_state.update.reset();
for (const auto& p : route_config_state.watchers) {
p.first->OnResourceDoesNotExist();
}
}
}
}
void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
XdsApi::RdsUpdateMap rds_update_map) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS update received containing %" PRIuPTR
" resources",
xds_client(), rds_update_map.size());
}
auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
for (auto& p : rds_update_map) {
const std::string& route_config_name = p.first;
XdsApi::RdsUpdate& rds_update = p.second;
auto& state = rds_state.subscribed_resources[route_config_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
rds_update.ToString().c_str());
}
RouteConfigState& route_config_state =
xds_client()->route_config_map_[route_config_name];
// Ignore identical update.
if (route_config_state.update.has_value() &&
*route_config_state.update == rds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] RDS resource identical to current, ignoring",
xds_client());
}
continue;
}
// Update the cache.
route_config_state.update = std::move(rds_update);
// Notify all watchers.
for (const auto& p : route_config_state.watchers) {
p.first->OnRouteConfigChanged(*route_config_state.update);
}
}
}
void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
XdsApi::CdsUpdateMap cds_update_map) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update received containing %" PRIuPTR
" resources",
xds_client(), cds_update_map.size());
}
auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
std::set<std::string> eds_resource_names_seen;
for (auto& p : cds_update_map) {
const char* cluster_name = p.first.c_str();
XdsApi::CdsUpdate& cds_update = p.second;
auto& state = cds_state.subscribed_resources[cluster_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] cluster=%s: eds_service_name=%s, "
"lrs_load_reporting_server_name=%s",
xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
cds_update.lrs_load_reporting_server_name.has_value()
? cds_update.lrs_load_reporting_server_name.value().c_str()
: "(N/A)");
}
// Record the EDS resource names seen.
eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
? cluster_name
: cds_update.eds_service_name);
// Ignore identical update.
ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
if (cluster_state.update.has_value() &&
*cluster_state.update == cds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] CDS update identical to current, ignoring.",
xds_client());
}
continue;
}
// Update the cluster state.
cluster_state.update = std::move(cds_update);
// Notify all watchers.
for (const auto& p : cluster_state.watchers) {
p.first->OnClusterChanged(cluster_state.update.value());
}
}
// For any subscribed resource that is not present in the update,
// remove it from the cache and notify watchers that it does not exist.
for (const auto& p : cds_state.subscribed_resources) {
const std::string& cluster_name = p.first;
if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
// If the resource was newly requested but has not yet been received,
// we don't want to generate an error for the watchers, because this CDS
// response may be in reaction to an earlier request that did not yet
// request the new resource, so its absence from the response does not
// necessarily indicate that the resource does not exist.
// For that case, we rely on the request timeout instead.
if (!cluster_state.update.has_value()) continue;
cluster_state.update.reset();
for (const auto& p : cluster_state.watchers) {
p.first->OnResourceDoesNotExist();
}
}
}
// For any EDS resource that is no longer referred to by any CDS
// resources, remove it from the cache and notify watchers that it
// does not exist.
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
for (const auto& p : eds_state.subscribed_resources) {
const std::string& eds_resource_name = p.first;
if (eds_resource_names_seen.find(eds_resource_name) ==
eds_resource_names_seen.end()) {
EndpointState& endpoint_state =
xds_client()->endpoint_map_[eds_resource_name];
endpoint_state.update.reset();
for (const auto& p : endpoint_state.watchers) {
p.first->OnResourceDoesNotExist();
}
}
}
}
void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
XdsApi::EdsUpdateMap eds_update_map) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] EDS update received containing %" PRIuPTR
" resources",
xds_client(), eds_update_map.size());
}
auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
for (auto& p : eds_update_map) {
const char* eds_service_name = p.first.c_str();
XdsApi::EdsUpdate& eds_update = p.second;
auto& state = eds_state.subscribed_resources[eds_service_name];
if (state != nullptr) state->Finish();
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
eds_service_name, eds_update.ToString().c_str());
}
EndpointState& endpoint_state =
xds_client()->endpoint_map_[eds_service_name];
// Ignore identical update.
if (endpoint_state.update.has_value() &&
*endpoint_state.update == eds_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] EDS update identical to current, ignoring.",
xds_client());
}
continue;
}
// Update the cluster state.
endpoint_state.update = std::move(eds_update);
// Notify all watchers.
for (const auto& p : endpoint_state.watchers) {
p.first->OnEndpointChanged(endpoint_state.update.value());
}
}
}
void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
{
MutexLock lock(&ads_calld->xds_client()->mu_);
ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
}
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
}
void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
grpc_error* error) {
if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
// Clean up the sent message.
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
// Continue to send another pending message if any.
// TODO(roth): The current code to handle buffered messages has the
// advantage of sending only the most recent list of resource names for
// each resource type (no matter how many times that resource type has
// been requested to send while the current message sending is still
// pending). But its disadvantage is that we send the requests in fixed
// order of resource types. We need to fix this if we are seeing some
// resource type(s) starved due to frequent requests of other resource
// type(s).
auto it = buffered_requests_.begin();
if (it != buffered_requests_.end()) {
SendMessageLocked(*it);
buffered_requests_.erase(it);
}
}
GRPC_ERROR_UNREF(error);
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
void* arg, grpc_error* /* error */) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
bool done;
{
MutexLock lock(&ads_calld->xds_client()->mu_);
done = ads_calld->OnResponseReceivedLocked();
}
if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
}
bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
return true;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// Parse and validate the response.
XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
grpc_slice_unref_internal(response_slice);
if (result.type_url.empty()) {
// Ignore unparsable response.
gpr_log(GPR_ERROR,
"[xds_client %p] Error parsing ADS response (%s) -- ignoring",
xds_client(), grpc_error_string(result.parse_error));
GRPC_ERROR_UNREF(result.parse_error);
} else {
// Update nonce.
auto& state = state_map_[result.type_url];
state.nonce = std::move(result.nonce);
// NACK or ACK the response.
if (result.parse_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(state.error);
state.error = result.parse_error;
// NACK unacceptable update.
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response invalid for resource type %s "
"version %s, will NACK: nonce=%s error=%s",
xds_client(), result.type_url.c_str(), result.version.c_str(),
state.nonce.c_str(), grpc_error_string(result.parse_error));
SendMessageLocked(result.type_url);
} else {
seen_response_ = true;
// Accept the ADS response according to the type_url.
if (result.type_url == XdsApi::kLdsTypeUrl) {
AcceptLdsUpdate(std::move(result.lds_update_map));
} else if (result.type_url == XdsApi::kRdsTypeUrl) {
AcceptRdsUpdate(std::move(result.rds_update_map));
} else if (result.type_url == XdsApi::kCdsTypeUrl) {
AcceptCdsUpdate(std::move(result.cds_update_map));
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdate(std::move(result.eds_update_map));
}
state.version = std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
}
}
if (xds_client()->shutting_down_) return true;
// Keep listening for updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(call_ != nullptr);
// Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
return false;
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
{
MutexLock lock(&ads_calld->xds_client()->mu_);
ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
}
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
grpc_error* error) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] ADS call status received. Status = %d, details "
"= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
xds_client(), status_code_, status_details, chand(), this, call_,
grpc_error_string(error));
gpr_free(status_details);
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
parent_->OnCallFinishedLocked();
// Send error to all watchers.
xds_client()->NotifyOnErrorLocked(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
}
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
// If the retryable ADS call is null (which only happens when the xds channel
// is shutting down), all the ADS calls are stale.
if (chand()->ads_calld_ == nullptr) return false;
return this == chand()->ads_calld_->calld();
}
std::set<absl::string_view>
XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
const std::string& type_url) {
std::set<absl::string_view> resource_names;
auto it = state_map_.find(type_url);
if (it != state_map_.end()) {
for (auto& p : it->second.subscribed_resources) {
resource_names.insert(p.first);
OrphanablePtr<ResourceState>& state = p.second;
state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
}
}
return resource_names;
}
//
// XdsClient::ChannelState::LrsCallState::Reporter
//
void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
if (next_report_timer_callback_pending_) {
grpc_timer_cancel(&next_report_timer_);
}
}
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
grpc_timer_init(&next_report_timer_, next_report_time,
&on_next_report_timer_);
next_report_timer_callback_pending_ = true;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
bool done;
{
MutexLock lock(&self->xds_client()->mu_);
done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
}
if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
}
bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
grpc_error* error) {
next_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
GRPC_ERROR_UNREF(error);
return true;
}
SendReportLocked();
return false;
}
namespace {
bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
for (const auto& p : snapshot) {
const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
for (const auto& q : cluster_snapshot.dropped_requests) {
if (q.second > 0) return false;
}
for (const auto& q : cluster_snapshot.locality_stats) {
const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
if (!locality_snapshot.IsZero()) return false;
}
}
return true;
}
} // namespace
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
parent_->cluster_names_);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
ScheduleNextReportLocked();
return;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsRequest(std::move(snapshot));
parent_->send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Send the report.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = parent_->send_message_payload_;
grpc_call_error call_error = grpc_call_start_batch_and_execute(
parent_->call_, &op, 1, &on_report_done_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[xds_client %p] calld=%p call_error=%d sending client load report",
xds_client(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
bool done;
{
MutexLock lock(&self->xds_client()->mu_);
done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
}
if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
}
bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
grpc_error* error) {
grpc_byte_buffer_destroy(parent_->send_message_payload_);
parent_->send_message_payload_ = nullptr;
// If there are no more registered stats to report, cancel the call.
if (xds_client()->load_report_map_.empty()) {
parent_->chand()->StopLrsCall();
GRPC_ERROR_UNREF(error);
return true;
}
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
GRPC_ERROR_UNREF(error);
// If this reporter is no longer the current one on the call, the reason
// might be that it was orphaned for a new one due to config update.
if (!IsCurrentReporterOnCall()) {
parent_->MaybeStartReportingLocked();
}
return true;
}
ScheduleNextReportLocked();
return false;
}
//
// XdsClient::ChannelState::LrsCallState
//
XdsClient::ChannelState::LrsCallState::LrsCallState(
RefCountedPtr<RetryableCall<LrsCallState>> parent)
: InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
parent_(std::move(parent)) {
// Init the LRS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
const auto& method =
xds_client()->bootstrap_->server().ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
call_ = grpc_channel_create_pollset_set_call(
chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
xds_client()->interested_parties_, method, nullptr,
GRPC_MILLIS_INF_FUTURE, nullptr);
GPR_ASSERT(call_ != nullptr);
// Init the request payload.
grpc_slice request_payload_slice =
xds_client()->api_.CreateLrsInitialRequest();
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Init other data associated with the LRS call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
"call: %p)",
xds_client(), chand(), this, call_);
}
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
// Op: send request message.
GPR_ASSERT(send_message_payload_ != nullptr);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = send_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_initial_request_sent_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
XdsClient::ChannelState::LrsCallState::~LrsCallState() {
grpc_metadata_array_destroy(&initial_metadata_recv_);
grpc_metadata_array_destroy(&trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(status_details_);
GPR_ASSERT(call_ != nullptr);
grpc_call_unref(call_);
}
void XdsClient::ChannelState::LrsCallState::Orphan() {
reporter_.reset();
GPR_ASSERT(call_ != nullptr);
// If we are here because xds_client wants to cancel the call,
// on_status_received_ will complete the cancellation and clean up. Otherwise,
// we are here because xds_client has to orphan a failed call, then the
// following cancellation will be a no-op.
grpc_call_cancel_internal(call_);
// Note that the initial ref is hold by on_status_received_. So the
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
// Don't start again if already started.
if (reporter_ != nullptr) return;
// Don't start if the previous send_message op (of the initial request or the
// last report of the previous reporter) hasn't completed.
if (send_message_payload_ != nullptr) return;
// Don't start if no LRS response has arrived.
if (!seen_response()) return;
// Don't start if the ADS call hasn't received any valid response. Note that
// this must be the first channel because it is the current channel but its
// ADS call hasn't seen any response.
if (chand()->ads_calld_ == nullptr ||
chand()->ads_calld_->calld() == nullptr ||
!chand()->ads_calld_->calld()->seen_response()) {
return;
}
// Start reporting.
reporter_ = MakeOrphanable<Reporter>(
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
lrs_calld->OnInitialRequestSentLocked();
}
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
// Clear the send_message_payload_.
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
MaybeStartReportingLocked();
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
bool done;
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
done = lrs_calld->OnResponseReceivedLocked();
}
if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
}
bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
return true;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// This anonymous lambda is a hack to avoid the usage of goto.
[&]() {
// Parse the response.
bool send_all_clusters = false;
std::set<std::string> new_cluster_names;
grpc_millis new_load_reporting_interval;
grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
response_slice, &send_all_clusters, &new_cluster_names,
&new_load_reporting_interval);
if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"[xds_client %p] LRS response parsing failed. error=%s",
xds_client(), grpc_error_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
return;
}
seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] LRS response received, %" PRIuPTR
" cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
"ms",
xds_client(), new_cluster_names.size(), send_all_clusters,
new_load_reporting_interval);
size_t i = 0;
for (const auto& name : new_cluster_names) {
gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
xds_client(), i++, name.c_str());
}
}
if (new_load_reporting_interval <
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
new_load_reporting_interval =
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Increased load_report_interval to minimum "
"value %dms",
xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
}
}
// Ignore identical update.
if (send_all_clusters == send_all_clusters_ &&
cluster_names_ == new_cluster_names &&
load_reporting_interval_ == new_load_reporting_interval) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Incoming LRS response identical to current, "
"ignoring.",
xds_client());
}
return;
}
// Stop current load reporting (if any) to adopt the new config.
reporter_.reset();
// Record the new config.
send_all_clusters_ = send_all_clusters;
cluster_names_ = std::move(new_cluster_names);
load_reporting_interval_ = new_load_reporting_interval;
// Try starting sending load report.
MaybeStartReportingLocked();
}();
grpc_slice_unref_internal(response_slice);
if (xds_client()->shutting_down_) return true;
// Keep listening for LRS config updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(call_ != nullptr);
// Reuse the "OnResponseReceivedLocked" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
return false;
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
{
MutexLock lock(&lrs_calld->xds_client()->mu_);
lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
}
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
grpc_error* error) {
GPR_ASSERT(call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] LRS call status received. Status = %d, details "
"= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
xds_client(), status_code_, status_details, chand(), this, call_,
grpc_error_string(error));
gpr_free(status_details);
}
// Ignore status from a stale call.
if (IsCurrentCallOnChannel()) {
GPR_ASSERT(!xds_client()->shutting_down_);
// Try to restart the call.
parent_->OnCallFinishedLocked();
}
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
// If the retryable LRS call is null (which only happens when the xds channel
// is shutting down), all the LRS calls are stale.
if (chand()->lrs_calld_ == nullptr) return false;
return this == chand()->lrs_calld_->calld();
}
//
// XdsClient
//
namespace {
grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
return grpc_channel_args_find_integer(
&args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
{15000, 0, INT_MAX});
}
grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
const grpc_channel_args& args,
grpc_error** error) {
RefCountedPtr<grpc_channel_credentials> creds;
for (const auto& channel_creds : bootstrap.server().channel_creds) {
if (channel_creds.type == "google_default") {
creds.reset(grpc_google_default_credentials_create(nullptr));
break;
}
if (channel_creds.type == "insecure") {
return grpc_insecure_channel_create(bootstrap.server().server_uri.c_str(),
&args, nullptr);
}
if (channel_creds.type == "fake") {
creds.reset(grpc_fake_transport_security_credentials_create());
break;
}
}
if (creds == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"no supported credential types found");
return nullptr;
}
const char* arg_to_remove = GRPC_ARG_CHANNEL_CREDENTIALS;
grpc_channel_args* new_args =
grpc_channel_args_copy_and_remove(&args, &arg_to_remove, 1);
grpc_channel* channel = grpc_secure_channel_create(
creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
}
} // namespace
XdsClient::XdsClient(const grpc_channel_args& channel_args, grpc_error** error)
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout(channel_args)),
interested_parties_(grpc_pollset_set_create()),
bootstrap_(
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
api_(this, &grpc_xds_client_trace, bootstrap_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
if (*error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
this, grpc_error_string(*error));
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
bootstrap_->server().server_uri.c_str());
}
grpc_channel_args* new_args = BuildXdsChannelArgs(channel_args);
grpc_channel* channel = CreateXdsChannel(*bootstrap_, *new_args, error);
grpc_channel_args_destroy(new_args);
if (*error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
grpc_error_string(*error));
return;
}
// Add channelz linkage.
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(channel);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
&channel_args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (xds_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
}
// Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
}
XdsClient::~XdsClient() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
}
grpc_pollset_set_destroy(interested_parties_);
}
void XdsClient::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
}
{
MutexLock lock(&mu_);
shutting_down_ = true;
// Remove channelz linkage.
if (parent_channelz_node_ != nullptr) {
channelz::ChannelNode* xds_channelz_node =
grpc_channel_get_channelz_node(chand_->channel());
GPR_ASSERT(xds_channelz_node != nullptr);
parent_channelz_node_->RemoveChildChannel(xds_channelz_node->uuid());
}
// Orphan ChannelState object.
chand_.reset();
// We do not clear cluster_map_ and endpoint_map_ if the xds client was
// created by the XdsResolver because the maps contain refs for watchers
// which in turn hold refs to the loadbalancing policies. At this point, it
// is possible for ADS calls to be in progress. Unreffing the loadbalancing
// policies before those calls are done would lead to issues such as
// https://github.com/grpc/grpc/issues/20928.
if (!listener_map_.empty()) {
cluster_map_.clear();
endpoint_map_.clear();
}
}
Unref(DEBUG_LOCATION, "XdsClient::Orphan()");
}
void XdsClient::WatchListenerData(
absl::string_view listener_name,
std::unique_ptr<ListenerWatcherInterface> watcher) {
std::string listener_name_str = std::string(listener_name);
MutexLock lock(&mu_);
ListenerState& listener_state = listener_map_[listener_name_str];
ListenerWatcherInterface* w = watcher.get();
listener_state.watchers[w] = std::move(watcher);
// If we've already received an LDS update, notify the new watcher
// immediately.
if (listener_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
this, listener_name_str.c_str());
}
w->OnListenerChanged(*listener_state.update);
}
chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
}
void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
ListenerWatcherInterface* watcher,
bool delay_unsubscription) {
MutexLock lock(&mu_);
if (shutting_down_) return;
std::string listener_name_str = std::string(listener_name);
ListenerState& listener_state = listener_map_[listener_name_str];
auto it = listener_state.watchers.find(watcher);
if (it != listener_state.watchers.end()) {
listener_state.watchers.erase(it);
if (listener_state.watchers.empty()) {
listener_map_.erase(listener_name_str);
chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
delay_unsubscription);
}
}
}
void XdsClient::WatchRouteConfigData(
absl::string_view route_config_name,
std::unique_ptr<RouteConfigWatcherInterface> watcher) {
std::string route_config_name_str = std::string(route_config_name);
MutexLock lock(&mu_);
RouteConfigState& route_config_state =
route_config_map_[route_config_name_str];
RouteConfigWatcherInterface* w = watcher.get();
route_config_state.watchers[w] = std::move(watcher);
// If we've already received an RDS update, notify the new watcher
// immediately.
if (route_config_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] returning cached route config data for %s", this,
route_config_name_str.c_str());
}
w->OnRouteConfigChanged(*route_config_state.update);
}
chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
}
void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
RouteConfigWatcherInterface* watcher,
bool delay_unsubscription) {
MutexLock lock(&mu_);
if (shutting_down_) return;
std::string route_config_name_str = std::string(route_config_name);
RouteConfigState& route_config_state =
route_config_map_[route_config_name_str];
auto it = route_config_state.watchers.find(watcher);
if (it != route_config_state.watchers.end()) {
route_config_state.watchers.erase(it);
if (route_config_state.watchers.empty()) {
route_config_map_.erase(route_config_name_str);
chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
delay_unsubscription);
}
}
}
void XdsClient::WatchClusterData(
absl::string_view cluster_name,
std::unique_ptr<ClusterWatcherInterface> watcher) {
std::string cluster_name_str = std::string(cluster_name);
MutexLock lock(&mu_);
ClusterState& cluster_state = cluster_map_[cluster_name_str];
ClusterWatcherInterface* w = watcher.get();
cluster_state.watchers[w] = std::move(watcher);
// If we've already received a CDS update, notify the new watcher
// immediately.
if (cluster_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
this, cluster_name_str.c_str());
}
w->OnClusterChanged(cluster_state.update.value());
}
chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
}
void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
ClusterWatcherInterface* watcher,
bool delay_unsubscription) {
MutexLock lock(&mu_);
if (shutting_down_) return;
std::string cluster_name_str = std::string(cluster_name);
ClusterState& cluster_state = cluster_map_[cluster_name_str];
auto it = cluster_state.watchers.find(watcher);
if (it != cluster_state.watchers.end()) {
cluster_state.watchers.erase(it);
if (cluster_state.watchers.empty()) {
cluster_map_.erase(cluster_name_str);
chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
delay_unsubscription);
}
}
}
void XdsClient::WatchEndpointData(
absl::string_view eds_service_name,
std::unique_ptr<EndpointWatcherInterface> watcher) {
std::string eds_service_name_str = std::string(eds_service_name);
MutexLock lock(&mu_);
EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
EndpointWatcherInterface* w = watcher.get();
endpoint_state.watchers[w] = std::move(watcher);
// If we've already received an EDS update, notify the new watcher
// immediately.
if (endpoint_state.update.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
this, eds_service_name_str.c_str());
}
w->OnEndpointChanged(endpoint_state.update.value());
}
chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
}
void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
EndpointWatcherInterface* watcher,
bool delay_unsubscription) {
MutexLock lock(&mu_);
if (shutting_down_) return;
std::string eds_service_name_str = std::string(eds_service_name);
EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
auto it = endpoint_state.watchers.find(watcher);
if (it != endpoint_state.watchers.end()) {
endpoint_state.watchers.erase(it);
if (endpoint_state.watchers.empty()) {
endpoint_map_.erase(eds_service_name_str);
chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
delay_unsubscription);
}
}
}
RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
absl::string_view lrs_server, absl::string_view cluster_name,
absl::string_view eds_service_name) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the absl::string_views
// stored in the XdsClusterDropStats object point to the strings
// in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
it->second.drop_stats.insert(cluster_drop_stats.get());
chand_->MaybeStartLrsCall();
return cluster_drop_stats;
}
void XdsClient::RemoveClusterDropStats(
absl::string_view /*lrs_server*/, absl::string_view cluster_name,
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto it = load_report_state.drop_stats.find(cluster_drop_stats);
if (it != load_report_state.drop_stats.end()) {
// Record final drop stats in deleted_drop_stats, which will be
// added to the next load report.
for (const auto& p : cluster_drop_stats->GetSnapshotAndReset()) {
load_report_state.deleted_drop_stats[p.first] += p.second;
}
load_report_state.drop_stats.erase(it);
}
}
RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
absl::string_view lrs_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
MutexLock lock(&mu_);
// We jump through some hoops here to make sure that the absl::string_views
// stored in the XdsClusterLocalityStats object point to the strings
// in the load_report_map_ key, so that they have the same lifetime.
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
locality);
it->second.locality_stats[std::move(locality)].locality_stats.insert(
cluster_locality_stats.get());
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
}
void XdsClient::RemoveClusterLocalityStats(
absl::string_view /*lrs_server*/, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
auto load_report_it = load_report_map_.find(
std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
if (load_report_it == load_report_map_.end()) return;
LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
auto& locality_set = locality_it->second.locality_stats;
auto it = locality_set.find(cluster_locality_stats);
if (it != locality_set.end()) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_it->second.deleted_locality_stats.emplace_back(
cluster_locality_stats->GetSnapshotAndReset());
locality_set.erase(it);
}
}
void XdsClient::ResetBackoff() {
MutexLock lock(&mu_);
if (chand_ != nullptr) {
grpc_channel_reset_connect_backoff(chand_->channel());
}
}
void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
for (const auto& p : listener_map_) {
const ListenerState& listener_state = p.second;
for (const auto& p : listener_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
}
for (const auto& p : route_config_map_) {
const RouteConfigState& route_config_state = p.second;
for (const auto& p : route_config_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
}
for (const auto& p : cluster_map_) {
const ClusterState& cluster_state = p.second;
for (const auto& p : cluster_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
}
for (const auto& p : endpoint_map_) {
const EndpointState& endpoint_state = p.second;
for (const auto& p : endpoint_state.watchers) {
p.first->OnError(GRPC_ERROR_REF(error));
}
}
GRPC_ERROR_UNREF(error);
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) {
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto load_report_it = load_report_map_.begin();
load_report_it != load_report_map_.end();) {
// Cluster key is cluster and EDS service name.
const auto& cluster_key = load_report_it->first;
LoadReportState& load_report = load_report_it->second;
// If the CDS response for a cluster indicates to use LRS but the
// LRS server does not say that it wants reports for this cluster,
// then we'll have stats objects here whose data we're not going to
// include in the load report. However, we still need to clear out
// the data from the stats objects, so that if the LRS server starts
// asking for the data in the future, we don't incorrectly include
// data from previous reporting intervals in that future report.
const bool record_stats =
send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
for (auto& drop_stats : load_report.drop_stats) {
for (const auto& p : drop_stats->GetSnapshotAndReset()) {
snapshot.dropped_requests[p.first] += p.second;
}
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
it != load_report.locality_stats.end();) {
const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
for (auto& locality_stats : locality_state.locality_stats) {
locality_snapshot += locality_stats->GetSnapshotAndReset();
}
// Add final snapshots from recently deleted locality stats objects.
for (auto& deleted_locality_stats :
locality_state.deleted_locality_stats) {
locality_snapshot += deleted_locality_stats;
}
locality_state.deleted_locality_stats.clear();
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats.empty()) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
if (record_stats) {
// Compute load report interval.
const grpc_millis now = ExecCtx::Get()->Now();
snapshot.load_report_interval = now - load_report.last_report_time;
load_report.last_report_time = now;
// Record snapshot.
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
load_report_it = load_report_map_.erase(load_report_it);
} else {
++load_report_it;
}
}
return snapshot_map;
}
void* XdsClient::ChannelArgCopy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
return p;
}
void XdsClient::ChannelArgDestroy(void* p) {
XdsClient* xds_client = static_cast<XdsClient*>(p);
xds_client->Unref(DEBUG_LOCATION, "channel arg");
}
int XdsClient::ChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
const grpc_arg_pointer_vtable XdsClient::kXdsClientVtable = {
XdsClient::ChannelArgCopy, XdsClient::ChannelArgDestroy,
XdsClient::ChannelArgCmp};
grpc_arg XdsClient::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
const_cast<XdsClient*>(this),
&XdsClient::kXdsClientVtable);
}
RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
const grpc_channel_args& args) {
XdsClient* xds_client =
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
if (xds_client == nullptr) return nullptr;
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
}
grpc_channel_args* XdsClient::RemoveFromChannelArgs(
const grpc_channel_args& args) {
const char* arg_name = GRPC_ARG_XDS_CLIENT;
return grpc_channel_args_copy_and_remove(&args, &arg_name, 1);
}
} // namespace grpc_core