blob: 55ee8ebb62732ca6e09861958e4f21d60002e353 [file] [log] [blame]
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
/// Implementation of the gRPC LB policy.
///
/// This policy takes as input a list of resolved addresses, which must
/// include at least one balancer address.
///
/// An internal channel (\a lb_channel_) is created for the addresses
/// from that are balancers. This channel behaves just like a regular
/// channel that uses pick_first to select from the list of balancer
/// addresses.
///
/// When we get our initial update, we instantiate the internal *streaming*
/// call to the LB server (whichever address pick_first chose). The call
/// will be complete when either the balancer sends status or when we cancel
/// the call (e.g., because we are shutting down). In needed, we retry the
/// call. If we received at least one valid message from the server, a new
/// call attempt will be made immediately; otherwise, we apply back-off
/// delays between attempts.
///
/// We maintain an internal round_robin policy instance for distributing
/// requests across backends. Whenever we receive a new serverlist from
/// the balancer, we update the round_robin policy with the new list of
/// addresses. If we cannot communicate with the balancer on startup,
/// however, we may enter fallback mode, in which case we will populate
/// the child policy's addresses from the backend addresses returned by the
/// resolver.
///
/// Once a child policy instance is in place (and getting updated as described),
/// calls for a pick, a ping, or a cancellation will be serviced right
/// away by forwarding them to the child policy instance. Any time there's no
/// child policy available (i.e., right after the creation of the gRPCLB
/// policy), pick requests are queued.
///
/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
/// high level design and details.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include "absl/strings/strip.h"
#include "upb/upb.hpp"
#include <grpc/byte_buffer_reader.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
#define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
namespace grpc_core {
TraceFlag grpc_lb_glb_trace(false, "glb");
const char kGrpcLbAddressAttributeKey[] = "grpclb";
namespace {
constexpr char kGrpclb[] = "grpclb";
class GrpcLbConfig : public LoadBalancingPolicy::Config {
public:
GrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
std::string service_name)
: child_policy_(std::move(child_policy)),
service_name_(std::move(service_name)) {}
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
const std::string& service_name() const { return service_name_; }
private:
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string service_name_;
};
class GrpcLb : public LoadBalancingPolicy {
public:
explicit GrpcLb(Args args);
const char* name() const override { return kGrpclb; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
/// Contains a call to the LB server and all the data related to the call.
class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
public:
explicit BalancerCallState(
RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
~BalancerCallState() override;
// It's the caller's responsibility to ensure that Orphan() is called from
// inside the combiner.
void Orphan() override;
void StartQuery();
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
bool seen_initial_response() const { return seen_initial_response_; }
bool seen_serverlist() const { return seen_serverlist_; }
private:
GrpcLb* grpclb_policy() const {
return static_cast<GrpcLb*>(grpclb_policy_.get());
}
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
static void MaybeSendClientLoadReport(void* arg, grpc_error_handle error);
static void ClientLoadReportDone(void* arg, grpc_error_handle error);
static void OnInitialRequestSent(void* arg, grpc_error_handle error);
static void OnBalancerMessageReceived(void* arg, grpc_error_handle error);
static void OnBalancerStatusReceived(void* arg, grpc_error_handle error);
void MaybeSendClientLoadReportLocked(grpc_error_handle error);
void ClientLoadReportDoneLocked(grpc_error_handle error);
void OnInitialRequestSentLocked();
void OnBalancerMessageReceivedLocked();
void OnBalancerStatusReceivedLocked(grpc_error_handle error);
// The owning LB policy.
RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
// The streaming call to the LB server. Always non-NULL.
grpc_call* lb_call_ = nullptr;
// recv_initial_metadata
grpc_metadata_array lb_initial_metadata_recv_;
// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
grpc_closure lb_on_initial_request_sent_;
// recv_message
grpc_byte_buffer* recv_message_payload_ = nullptr;
grpc_closure lb_on_balancer_message_received_;
bool seen_initial_response_ = false;
bool seen_serverlist_ = false;
// recv_trailing_metadata
grpc_closure lb_on_balancer_status_received_;
grpc_metadata_array lb_trailing_metadata_recv_;
grpc_status_code lb_call_status_;
grpc_slice lb_call_status_details_;
// The stats for client-side load reporting associated with this LB call.
// Created after the first serverlist is received.
RefCountedPtr<GrpcLbClientStats> client_stats_;
Duration client_stats_report_interval_;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
bool last_client_load_report_counters_were_zero_ = false;
bool client_load_report_is_due_ = false;
// The closure used for either the load report timer or the callback for
// completion of sending the load report.
grpc_closure client_load_report_closure_;
};
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: DelegatingSubchannel(std::move(subchannel)),
lb_policy_(std::move(lb_policy)),
lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
~SubchannelWrapper() override {
if (!lb_policy_->shutting_down_) {
lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
}
}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
RefCountedPtr<GrpcLb> lb_policy_;
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class TokenAndClientStatsAttribute
: public ServerAddress::AttributeInterface {
public:
TokenAndClientStatsAttribute(std::string lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<TokenAndClientStatsAttribute>(lb_token_,
client_stats_);
}
int Cmp(const AttributeInterface* other_base) const override {
const TokenAndClientStatsAttribute* other =
static_cast<const TokenAndClientStatsAttribute*>(other_base);
int r = lb_token_.compare(other->lb_token_);
if (r != 0) return r;
return QsortCompare(client_stats_.get(), other->client_stats_.get());
}
std::string ToString() const override {
return absl::StrFormat("lb_token=\"%s\" client_stats=%p", lb_token_,
client_stats_.get());
}
const std::string& lb_token() const { return lb_token_; }
RefCountedPtr<GrpcLbClientStats> client_stats() const {
return client_stats_;
}
private:
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class Serverlist : public RefCounted<Serverlist> {
public:
// Takes ownership of serverlist.
explicit Serverlist(std::vector<GrpcLbServer> serverlist)
: serverlist_(std::move(serverlist)) {}
bool operator==(const Serverlist& other) const;
const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
// Returns a text representation suitable for logging.
std::string AsText() const;
// Extracts all non-drop entries into a ServerAddressList.
ServerAddressList GetServerAddressList(
GrpcLbClientStats* client_stats) const;
// Returns true if the serverlist contains at least one drop entry and
// no backend address entries.
bool ContainsAllDropEntries() const;
// Returns the LB token to use for a drop, or null if the call
// should not be dropped.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane mutex, NOT the control plane
// work_serializer. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop();
private:
std::vector<GrpcLbServer> serverlist_;
// Guarded by the channel's data plane mutex, NOT the control
// plane work_serializer. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<Serverlist> serverlist,
std::unique_ptr<SubchannelPicker> child_picker,
RefCountedPtr<GrpcLbClientStats> client_stats)
: serverlist_(std::move(serverlist)),
child_picker_(std::move(child_picker)),
client_stats_(std::move(client_stats)) {}
PickResult Pick(PickArgs args) override;
private:
// Serverlist to be used for determining drops.
RefCountedPtr<Serverlist> serverlist_;
std::unique_ptr<SubchannelPicker> child_picker_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<GrpcLb> parent)
: parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override;
absl::string_view GetAuthority() override;
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override;
private:
RefCountedPtr<GrpcLb> parent_;
};
class StateWatcher : public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(RefCountedPtr<GrpcLb> parent)
: AsyncConnectivityStateWatcherInterface(parent->work_serializer()),
parent_(std::move(parent)) {}
~StateWatcher() override { parent_.reset(DEBUG_LOCATION, "StateWatcher"); }
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
if (parent_->fallback_at_startup_checks_pending_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[grpclb %p] balancer channel in state:TRANSIENT_FAILURE (%s); "
"entering fallback mode",
parent_.get(), status.ToString().c_str());
parent_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->lb_fallback_timer_);
parent_->fallback_mode_ = true;
parent_->CreateOrUpdateChildPolicyLocked();
// Cancel the watch, since we don't care about the channel state once we
// go into fallback mode.
parent_->CancelBalancerChannelConnectivityWatchLocked();
}
}
RefCountedPtr<GrpcLb> parent_;
};
~GrpcLb() override;
void ShutdownLocked() override;
// Helper functions used in UpdateLocked().
void UpdateBalancerChannelLocked(const grpc_channel_args& args);
void CancelBalancerChannelConnectivityWatchLocked();
// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimer(void* arg, grpc_error_handle error);
void OnFallbackTimerLocked(grpc_error_handle error);
// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
// Methods for dealing with the child policy.
grpc_channel_args* CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer);
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const grpc_channel_args* args);
void CreateOrUpdateChildPolicyLocked();
// Subchannel caching.
void CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel);
void StartSubchannelCacheTimerLocked();
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
void OnSubchannelCacheTimerLocked(grpc_error_handle error);
// Who the client is trying to communicate with.
std::string server_name_;
// Configurations for the policy.
RefCountedPtr<GrpcLbConfig> config_;
// Current channel args from the resolver.
grpc_channel_args* args_ = nullptr;
// Internal state.
bool shutting_down_ = false;
// The channel for communicating with the LB server.
grpc_channel* lb_channel_ = nullptr;
StateWatcher* watcher_ = nullptr;
// Response generator to inject address updates into lb_channel_.
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// Parent channelz node.
RefCountedPtr<channelz::ChannelNode> parent_channelz_node_;
// The data associated with the current LB call. It holds a ref to this LB
// policy. It's initialized every time we query for backends. It's reset to
// NULL whenever the current LB call is no longer needed (e.g., the LB policy
// is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
// contains a non-NULL lb_call_.
OrphanablePtr<BalancerCallState> lb_calld_;
// Timeout for the LB call. 0 means no deadline.
const Duration lb_call_timeout_;
// Balancer call retry state.
BackOff lb_call_backoff_;
bool retry_timer_callback_pending_ = false;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
RefCountedPtr<Serverlist> serverlist_;
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// The backend addresses from the resolver.
absl::StatusOr<ServerAddressList> fallback_backend_addresses_;
// The last resolution note from our parent.
// To be passed to child policy when fallback_backend_addresses_ is empty.
std::string resolution_note_;
// State for fallback-at-startup checks.
// Timeout after startup after which we will go into fallback mode if
// we have not received a serverlist from the balancer.
const Duration fallback_at_startup_timeout_;
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Child policy in state READY.
bool child_policy_ready_ = false;
// Deleted subchannel caching.
const Duration subchannel_cache_interval_;
std::map<Timestamp /*deletion time*/,
std::vector<RefCountedPtr<SubchannelInterface>>>
cached_subchannels_;
grpc_timer subchannel_cache_timer_;
grpc_closure on_subchannel_cache_timer_;
bool subchannel_cache_timer_pending_ = false;
};
//
// GrpcLb::Serverlist
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return serverlist_ == other.serverlist_;
}
void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server.drop) return;
const uint16_t netorder_port = grpc_htons(static_cast<uint16_t>(server.port));
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
if (server.ip_size == 4) {
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
addr4->sin_family = GRPC_AF_INET;
memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
addr4->sin_port = netorder_port;
} else if (server.ip_size == 16) {
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
grpc_sockaddr_in6* addr6 =
reinterpret_cast<grpc_sockaddr_in6*>(&addr->addr);
addr6->sin6_family = GRPC_AF_INET6;
memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
addr6->sin6_port = netorder_port;
}
}
std::string GrpcLb::Serverlist::AsText() const {
std::vector<std::string> entries;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
std::string ipport;
if (server.drop) {
ipport = "(drop)";
} else {
grpc_resolved_address addr;
ParseServer(server, &addr);
ipport = grpc_sockaddr_to_string(&addr, false);
}
entries.push_back(absl::StrFormat(" %" PRIuPTR ": %s token=%s\n", i,
ipport, server.load_balance_token));
}
return absl::StrJoin(entries, "");
}
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %" PRIuPTR
" of serverlist. Ignoring.",
server.port, idx);
}
return false;
}
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
" of serverlist. Ignoring",
server.ip_size, idx);
}
return false;
}
return true;
}
// Returns addresses extracted from the serverlist.
ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
RefCountedPtr<GrpcLbClientStats> stats;
if (client_stats != nullptr) stats = client_stats->Ref();
ServerAddressList addresses;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
grpc_sockaddr_to_uri(&addr).c_str());
}
// Attach attribute to address containing LB token and stats object.
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attributes;
attributes[kGrpcLbAddressAttributeKey] =
absl::make_unique<TokenAndClientStatsAttribute>(std::move(lb_token),
stats);
// Add address.
addresses.emplace_back(addr, /*args=*/nullptr, std::move(attributes));
}
return addresses;
}
bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
if (serverlist_.empty()) return false;
for (const GrpcLbServer& server : serverlist_) {
if (!server.drop) return false;
}
return true;
}
const char* GrpcLb::Serverlist::ShouldDrop() {
if (serverlist_.empty()) return nullptr;
GrpcLbServer& server = serverlist_[drop_index_];
drop_index_ = (drop_index_ + 1) % serverlist_.size();
return server.drop ? server.load_balance_token : nullptr;
}
//
// GrpcLb::Picker
//
GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// Check if we should drop the call.
const char* drop_token =
serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
if (drop_token != nullptr) {
// Update client load reporting stats to indicate the number of
// dropped calls. Note that we have to do this here instead of in
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
if (client_stats_ != nullptr) {
client_stats_->AddCallDropped(drop_token);
}
return PickResult::Drop(
absl::UnavailableError("drop directed by grpclb balancer"));
}
// Forward pick to child policy.
PickResult result = child_picker_->Pick(args);
// If pick succeeded, add LB token to initial metadata.
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
const SubchannelWrapper* subchannel_wrapper =
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
// Encode client stats object into metadata for use by
// client_load_reporting filter.
GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
if (client_stats != nullptr) {
client_stats->Ref().release(); // Ref passed via metadata.
// The metadata value is a hack: we pretend the pointer points to
// a string and rely on the client_load_reporting filter to know
// how to interpret it.
args.initial_metadata->Add(
GrpcLbClientStatsMetadata::key(),
absl::string_view(reinterpret_cast<const char*>(client_stats), 0));
// Update calls-started.
client_stats->AddCallStarted();
}
// Encode the LB token in metadata.
// Create a new copy on the call arena, since the subchannel list
// may get refreshed between when we return this pick and when the
// initial metadata goes out on the wire.
if (!subchannel_wrapper->lb_token().empty()) {
char* lb_token = static_cast<char*>(
args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1));
strcpy(lb_token, subchannel_wrapper->lb_token().c_str());
args.initial_metadata->Add(LbTokenMetadata::key(), lb_token);
}
// Unwrap subchannel to pass up to the channel.
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
return result;
}
//
// GrpcLb::Helper
//
RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr;
const TokenAndClientStatsAttribute* attribute =
static_cast<const TokenAndClientStatsAttribute*>(
address.GetAttribute(kGrpcLbAddressAttributeKey));
if (attribute == nullptr) {
gpr_log(GPR_ERROR,
"[grpclb %p] no TokenAndClientStatsAttribute for address %p",
parent_.get(), address.ToString().c_str());
abort();
}
std::string lb_token = attribute->lb_token();
RefCountedPtr<GrpcLbClientStats> client_stats = attribute->client_stats();
return MakeRefCounted<SubchannelWrapper>(
parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args),
parent_->Ref(DEBUG_LOCATION, "SubchannelWrapper"), std::move(lb_token),
std::move(client_stats));
}
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// Record whether child policy reports READY.
parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
// Enter fallback mode if needed.
parent_->MaybeEnterFallbackModeAfterStartup();
// We pass the serverlist to the picker so that it can handle drops.
// However, we don't want to handle drops in the case where the child
// policy is reporting a state other than READY (unless we are
// dropping *all* calls), because we don't want to process drops for picks
// that yield a QUEUE result; this would result in dropping too many calls,
// since we will see the queued picks multiple times, and we'd consider each
// one a separate call for the drop calculation. So in this case, we pass
// a null serverlist to the picker, which tells it not to do drops.
RefCountedPtr<Serverlist> serverlist;
if (state == GRPC_CHANNEL_READY ||
(parent_->serverlist_ != nullptr &&
parent_->serverlist_->ContainsAllDropEntries())) {
serverlist = parent_->serverlist_;
}
RefCountedPtr<GrpcLbClientStats> client_stats;
if (parent_->lb_calld_ != nullptr &&
parent_->lb_calld_->client_stats() != nullptr) {
client_stats = parent_->lb_calld_->client_stats()->Ref();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p helper %p] state=%s (%s) wrapping child "
"picker %p (serverlist=%p, client_stats=%p)",
parent_.get(), this, ConnectivityStateName(state),
status.ToString().c_str(), picker.get(), serverlist.get(),
client_stats.get());
}
parent_->channel_control_helper()->UpdateState(
state, status,
absl::make_unique<Picker>(std::move(serverlist), std::move(picker),
std::move(client_stats)));
}
void GrpcLb::Helper::RequestReresolution() {
if (parent_->shutting_down_) return;
// If we are talking to a balancer, we expect to get updated addresses
// from the balancer, so we can ignore the re-resolution request from
// the child policy. Otherwise, pass the re-resolution request up to the
// channel.
if (parent_->lb_calld_ == nullptr ||
!parent_->lb_calld_->seen_initial_response()) {
parent_->channel_control_helper()->RequestReresolution();
}
}
absl::string_view GrpcLb::Helper::GetAuthority() {
return parent_->channel_control_helper()->GetAuthority();
}
void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
absl::string_view message) {
if (parent_->shutting_down_) return;
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// GrpcLb::BalancerCallState
//
GrpcLb::BalancerCallState::BalancerCallState(
RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
: InternallyRefCounted<BalancerCallState>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace) ? "BalancerCallState"
: nullptr),
grpclb_policy_(std::move(parent_grpclb_policy)) {
GPR_ASSERT(grpclb_policy_ != nullptr);
GPR_ASSERT(!grpclb_policy()->shutting_down_);
// Init the LB call. Note that the LB call will progress every time there's
// activity in grpclb_policy_->interested_parties(), which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(!grpclb_policy()->server_name_.empty());
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
OnBalancerMessageReceived, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, OnBalancerStatusReceived,
this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx);
const Timestamp deadline =
grpclb_policy()->lb_call_timeout_ == Duration::Zero()
? Timestamp::InfFuture()
: ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_;
lb_call_ = grpc_channel_create_pollset_set_call(
grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
grpclb_policy_->interested_parties(),
Slice::FromStaticString("/grpc.lb.v1.LoadBalancer/BalanceLoad").c_slice(),
nullptr, deadline, nullptr);
// Init the LB call request payload.
upb::Arena arena;
grpc_slice request_payload_slice = GrpcLbRequestCreate(
grpclb_policy()->config_->service_name().empty()
? grpclb_policy()->server_name_.c_str()
: grpclb_policy()->config_->service_name().c_str(),
arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Init other data associated with the LB call.
grpc_metadata_array_init(&lb_initial_metadata_recv_);
grpc_metadata_array_init(&lb_trailing_metadata_recv_);
}
GrpcLb::BalancerCallState::~BalancerCallState() {
GPR_ASSERT(lb_call_ != nullptr);
grpc_call_unref(lb_call_);
grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(lb_call_status_details_);
}
void GrpcLb::BalancerCallState::Orphan() {
GPR_ASSERT(lb_call_ != nullptr);
// If we are here because grpclb_policy wants to cancel the call,
// lb_on_balancer_status_received_ will complete the cancellation and clean
// up. Otherwise, we are here because grpclb_policy has to orphan a failed
// call, then the following cancellation will be a no-op.
grpc_call_cancel_internal(lb_call_);
if (client_load_report_timer_callback_pending_) {
grpc_timer_cancel(&client_load_report_timer_);
}
// Note that the initial ref is hold by lb_on_balancer_status_received_
// instead of the caller of this function. So the corresponding unref happens
// in lb_on_balancer_status_received_ instead of here.
}
void GrpcLb::BalancerCallState::StartQuery() {
GPR_ASSERT(lb_call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
grpclb_policy_.get(), this, lb_call_);
}
// Create the ops.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
// Op: send initial metadata.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
// Op: send request message.
GPR_ASSERT(send_message_payload_ != nullptr);
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = send_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
self.release();
call_error = grpc_call_start_batch_and_execute(lb_call_, ops,
static_cast<size_t>(op - ops),
&lb_on_initial_request_sent_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv initial metadata.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&lb_initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Op: recv response.
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_message_payload_;
op->flags = 0;
op->reserved = nullptr;
op++;
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
self = Ref(DEBUG_LOCATION, "on_message_received");
self.release();
call_error = grpc_call_start_batch_and_execute(
lb_call_, ops, static_cast<size_t>(op - ops),
&lb_on_balancer_message_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Op: recv server status.
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata =
&lb_trailing_metadata_recv_;
op->data.recv_status_on_client.status = &lb_call_status_;
op->data.recv_status_on_client.status_details = &lb_call_status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the LB call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
call_error = grpc_call_start_batch_and_execute(
lb_call_, ops, static_cast<size_t>(op - ops),
&lb_on_balancer_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp next_client_load_report_time =
ExecCtx::Get()->Now() + client_stats_report_interval_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, MaybeSendClientLoadReport,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
&client_load_report_closure_);
client_load_report_timer_callback_pending_ = true;
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(
void* arg, grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld, error]() { lb_calld->MaybeSendClientLoadReportLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
grpc_error_handle error) {
client_load_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
Unref(DEBUG_LOCATION, "client_load_report");
GRPC_ERROR_UNREF(error);
return;
}
// If we've already sent the initial request, then we can go ahead and send
// the load report. Otherwise, we need to wait until the initial request has
// been sent to send this (see OnInitialRequestSentLocked()).
if (send_message_payload_ == nullptr) {
SendClientLoadReportLocked();
} else {
client_load_report_is_due_ = true;
}
}
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
// Get snapshot of stats.
int64_t num_calls_started;
int64_t num_calls_finished;
int64_t num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_known_received;
std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
client_stats_->Get(&num_calls_started, &num_calls_finished,
&num_calls_finished_with_client_failed_to_send,
&num_calls_finished_known_received, &drop_token_counts);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (num_calls_started == 0 && num_calls_finished == 0 &&
num_calls_finished_with_client_failed_to_send == 0 &&
num_calls_finished_known_received == 0 &&
(drop_token_counts == nullptr || drop_token_counts->empty())) {
if (last_client_load_report_counters_were_zero_) {
ScheduleNextClientLoadReportLocked();
return;
}
last_client_load_report_counters_were_zero_ = true;
} else {
last_client_load_report_counters_were_zero_ = false;
}
// Populate load report.
upb::Arena arena;
grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
num_calls_started, num_calls_finished,
num_calls_finished_with_client_failed_to_send,
num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
// Send the report.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = send_message_payload_;
GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &client_load_report_closure_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
gpr_log(GPR_ERROR,
"[grpclb %p] lb_calld=%p call_error=%d sending client load report",
grpclb_policy_.get(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
}
void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld, error]() { lb_calld->ClientLoadReportDoneLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(
grpc_error_handle error) {
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
if (error != GRPC_ERROR_NONE || this != grpclb_policy()->lb_calld_.get()) {
Unref(DEBUG_LOCATION, "client_load_report");
GRPC_ERROR_UNREF(error);
return;
}
ScheduleNextClientLoadReportLocked();
}
void GrpcLb::BalancerCallState::OnInitialRequestSent(
void* arg, grpc_error_handle /*error*/) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld]() { lb_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnInitialRequestSentLocked() {
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
// If we attempted to send a client load report before the initial request was
// sent (and this lb_calld is still in use), send the load report now.
if (client_load_report_is_due_ && this == grpclb_policy()->lb_calld_.get()) {
SendClientLoadReportLocked();
client_load_report_is_due_ = false;
}
Unref(DEBUG_LOCATION, "on_initial_request_sent");
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceived(
void* arg, grpc_error_handle /*error*/) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld]() { lb_calld->OnBalancerMessageReceivedLocked(); },
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
// Null payload means the LB call was cancelled.
if (this != grpclb_policy()->lb_calld_.get() ||
recv_message_payload_ == nullptr) {
Unref(DEBUG_LOCATION, "on_message_received");
return;
}
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
GrpcLbResponse response;
upb::Arena arena;
if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
(response.type == response.INITIAL && seen_initial_response_)) {
char* response_slice_str =
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
gpr_log(GPR_ERROR,
"[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
"Ignoring.",
grpclb_policy(), this, response_slice_str);
gpr_free(response_slice_str);
} else {
switch (response.type) {
case response.INITIAL: {
if (response.client_stats_report_interval != Duration::Zero()) {
client_stats_report_interval_ = std::max(
Duration::Seconds(1), response.client_stats_report_interval);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response "
"message; client load reporting interval = %" PRId64
" milliseconds",
grpclb_policy(), this,
client_stats_report_interval_.millis());
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response "
"message; client load reporting NOT enabled",
grpclb_policy(), this);
}
seen_initial_response_ = true;
break;
}
case response.SERVERLIST: {
GPR_ASSERT(lb_call_ != nullptr);
auto serverlist_wrapper =
MakeRefCounted<Serverlist>(std::move(response.serverlist));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
" servers received:\n%s",
grpclb_policy(), this,
serverlist_wrapper->serverlist().size(),
serverlist_wrapper->AsText().c_str());
}
seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (client_stats_report_interval_ > Duration::Zero() &&
client_stats_ == nullptr) {
client_stats_ = MakeRefCounted<GrpcLbClientStats>();
// Ref held by callback.
Ref(DEBUG_LOCATION, "client_load_report").release();
ScheduleNextClientLoadReportLocked();
}
// Check if the serverlist differs from the previous one.
if (grpclb_policy()->serverlist_ != nullptr &&
*grpclb_policy()->serverlist_ == *serverlist_wrapper) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Incoming server list identical "
"to current, ignoring.",
grpclb_policy(), this);
}
} else { // New serverlist.
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
// know that we can reach at least one of the backends in the new
// serverlist. Unfortunately, we can't do that, since we need to
// send the new addresses to the child policy in order to determine
// if they are reachable, and if we don't exit fallback mode now,
// CreateOrUpdateChildPolicyLocked() will use the fallback
// addresses instead of the addresses from the new serverlist.
// However, if we can't reach any of the servers in the new
// serverlist, then the child policy will never switch away from
// the fallback addresses, but the grpclb policy will still think
// that we're not in fallback mode, which means that we won't send
// updates to the child policy when the fallback addresses are
// updated by the resolver. This is sub-optimal, but the only way
// to fix it is to maintain a completely separate child policy for
// fallback mode, and that's more work than we want to put into
// the grpclb implementation at this point, since we're deprecating
// it in favor of the xds policy. We will implement this the
// right way in the xds policy instead.
if (grpclb_policy()->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy());
grpclb_policy()->fallback_mode_ = false;
}
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
// GrpcLb instance is destroyed.
grpclb_policy()->serverlist_ = std::move(serverlist_wrapper);
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
}
break;
}
case response.FALLBACK: {
if (!grpclb_policy()->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Entering fallback mode as requested by balancer",
grpclb_policy());
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
grpclb_policy()->fallback_mode_ = true;
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
// Reset serverlist, so that if the balancer exits fallback
// mode by sending the same serverlist we were previously
// using, we don't incorrectly ignore it as a duplicate.
grpclb_policy()->serverlist_.reset();
}
break;
}
}
}
grpc_slice_unref_internal(response_slice);
if (!grpclb_policy()->shutting_down_) {
// Keep listening for serverlist updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
lb_call_, &op, 1, &lb_on_balancer_message_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
} else {
Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
}
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceived(
void* arg, grpc_error_handle error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
(void)GRPC_ERROR_REF(error); // owned by lambda
lb_calld->grpclb_policy()->work_serializer()->Run(
[lb_calld, error]() { lb_calld->OnBalancerStatusReceivedLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
grpc_error_handle error) {
GPR_ASSERT(lb_call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
char* status_details = grpc_slice_to_c_string(lb_call_status_details_);
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Status from LB server received. "
"Status = %d, details = '%s', (lb_call: %p), error '%s'",
grpclb_policy(), this, lb_call_status_, status_details, lb_call_,
grpc_error_std_string(error).c_str());
gpr_free(status_details);
}
GRPC_ERROR_UNREF(error);
// If this lb_calld is still in use, this call ended because of a failure so
// we want to retry connecting. Otherwise, we have deliberately ended this
// call and no further action is required.
if (this == grpclb_policy()->lb_calld_.get()) {
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the fallback-at-startup
// case.
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
GPR_ASSERT(!seen_serverlist_);
gpr_log(GPR_INFO,
"[grpclb %p] Balancer call finished without receiving "
"serverlist; entering fallback mode",
grpclb_policy());
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy()->fallback_mode_ = true;
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
} else {
// This handles the fallback-after-startup case.
grpclb_policy()->MaybeEnterFallbackModeAfterStartup();
}
grpclb_policy()->lb_calld_.reset();
GPR_ASSERT(!grpclb_policy()->shutting_down_);
grpclb_policy()->channel_control_helper()->RequestReresolution();
if (seen_initial_response_) {
// If we lose connection to the LB server, reset the backoff and restart
// the LB call immediately.
grpclb_policy()->lb_call_backoff_.Reset();
grpclb_policy()->StartBalancerCallLocked();
} else {
// If this LB call fails establishing any connection to the LB server,
// retry later.
grpclb_policy()->StartBalancerCallRetryTimerLocked();
}
}
Unref(DEBUG_LOCATION, "lb_call_ended");
}
//
// helper code for creating balancer channel
//
ServerAddressList ExtractBalancerAddresses(const grpc_channel_args& args) {
const ServerAddressList* addresses =
FindGrpclbBalancerAddressesInChannelArgs(args);
if (addresses != nullptr) return *addresses;
return ServerAddressList();
}
/* Returns the channel args for the LB channel, used to create a bidirectional
* stream for the reception of load balancing updates.
*
* Inputs:
* - \a response_generator: in order to propagate updates from the resolver
* above the grpclb policy.
* - \a args: other args inherited from the grpclb policy. */
grpc_channel_args* BuildBalancerChannelArgs(
FakeResolverResponseGenerator* response_generator,
const grpc_channel_args* args) {
// Channel args to remove.
static const char* args_to_remove[] = {
// LB policy name, since we want to use the default (pick_first) in
// the LB channel.
GRPC_ARG_LB_POLICY_NAME,
// Strip out the service config, since we don't want the LB policy
// config specified for the parent channel to affect the LB channel.
GRPC_ARG_SERVICE_CONFIG,
// The channel arg for the server URI, since that will be different for
// the LB channel than for the parent channel. The client channel
// factory will re-add this arg with the right value.
GRPC_ARG_SERVER_URI,
// The fake resolver response generator, because we are replacing it
// with the one from the grpclb policy, used to propagate updates to
// the LB channel.
GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
// The LB channel should use the authority indicated by the target
// authority table (see \a ModifyGrpclbBalancerChannelArgs),
// as opposed to the authority from the parent channel.
GRPC_ARG_DEFAULT_AUTHORITY,
// Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
// treated as a stand-alone channel and not inherit this argument from the
// args of the parent channel.
GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
// Don't want to pass down channelz node from parent; the balancer
// channel will get its own.
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
// Remove the channel args for channel credentials and replace it
// with a version that does not contain call credentials. The loadbalancer
// is not necessarily trusted to handle bearer token credentials.
GRPC_ARG_CHANNEL_CREDENTIALS,
};
// Create channel args for channel credentials that does not contain bearer
// token credentials.
grpc_channel_credentials* channel_credentials =
grpc_channel_credentials_find_in_args(args);
GPR_ASSERT(channel_credentials != nullptr);
RefCountedPtr<grpc_channel_credentials> creds_sans_call_creds =
channel_credentials->duplicate_without_call_credentials();
GPR_ASSERT(creds_sans_call_creds != nullptr);
// Channel args to add.
absl::InlinedVector<grpc_arg, 4> args_to_add = {
// The fake resolver response generator, which we use to inject
// address updates into the LB channel.
FakeResolverResponseGenerator::MakeChannelArg(response_generator),
// A channel arg indicating the target is a grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
// Tells channelz that this is an internal channel.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
// A channel args for new channel credentials that does not contain bearer
// tokens.
grpc_channel_credentials_to_arg(creds_sans_call_creds.get()),
};
return grpc_channel_args_copy_and_add_and_remove(
args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
args_to_add.size());
}
//
// ctor and dtor
//
std::string GetServerNameFromChannelArgs(const grpc_channel_args* args) {
const char* server_uri =
grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(server_uri != nullptr);
absl::StatusOr<URI> uri = URI::Parse(server_uri);
GPR_ASSERT(uri.ok() && !uri->path().empty());
return std::string(absl::StripPrefix(uri->path(), "/"));
}
GrpcLb::GrpcLb(Args args)
: LoadBalancingPolicy(std::move(args)),
server_name_(GetServerNameFromChannelArgs(args.args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_timeout_(Duration::Milliseconds(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX}))),
lb_call_backoff_(
BackOff::Options()
.set_initial_backoff(Duration::Seconds(
GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS))
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(Duration::Seconds(
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS))),
fallback_at_startup_timeout_(
Duration::Milliseconds(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS,
{GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}))),
subchannel_cache_interval_(
Duration::Milliseconds(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS,
{GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS, 0,
INT_MAX}))) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] Will use '%s' as the server name for LB request.",
this, server_name_.c_str());
}
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
nullptr);
}
GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); }
void GrpcLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (subchannel_cache_timer_pending_) {
subchannel_cache_timer_pending_ = false;
grpc_timer_cancel(&subchannel_cache_timer_);
}
cached_subchannels_.clear();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
}
if (fallback_at_startup_checks_pending_) {
fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&lb_fallback_timer_);
CancelBalancerChannelConnectivityWatchLocked();
}
if (child_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
interested_parties());
child_policy_.reset();
}
// We destroy the LB channel here instead of in our destructor because
// destroying the channel triggers a last callback to
// OnBalancerChannelConnectivityChangedLocked(), and we need to be
// alive when that callback is invoked.
if (lb_channel_ != nullptr) {
if (parent_channelz_node_ != nullptr) {
channelz::ChannelNode* child_channelz_node =
grpc_channel_get_channelz_node(lb_channel_);
GPR_ASSERT(child_channelz_node != nullptr);
parent_channelz_node_->RemoveChildChannel(child_channelz_node->uuid());
}
grpc_channel_destroy(lb_channel_);
lb_channel_ = nullptr;
}
}
//
// public methods
//
void GrpcLb::ResetBackoffLocked() {
if (lb_channel_ != nullptr) {
grpc_channel_reset_connect_backoff(lb_channel_);
}
if (child_policy_ != nullptr) {
child_policy_->ResetBackoffLocked();
}
}
void GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
for (ServerAddress& address : *fallback_backend_addresses_) {
address = address.WithAttribute(
kGrpcLbAddressAttributeKey,
absl::make_unique<TokenAndClientStatsAttribute>("", nullptr));
}
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
UpdateBalancerChannelLocked(*args.args);
// Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks
// and the balancer call.
if (is_initial_update) {
fallback_at_startup_checks_pending_ = true;
// Start timer.
Timestamp deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
GPR_ASSERT(client_channel != nullptr);
// Ref held by callback.
watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "StateWatcher"));
client_channel->AddConnectivityWatcher(
GRPC_CHANNEL_IDLE,
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
// Start balancer call.
StartBalancerCallLocked();
}
}
//
// helpers for UpdateLocked()
//
void GrpcLb::UpdateBalancerChannelLocked(const grpc_channel_args& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
grpc_arg new_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_LB_POLICY_NAME), const_cast<char*>("grpclb"));
grpc_channel_args_destroy(args_);
args_ = grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
// Construct args for balancer channel.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
grpc_channel_args* lb_channel_args =
BuildBalancerChannelArgs(response_generator_.get(), &args);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
std::string uri_str = absl::StrCat("fake:///", server_name_);
grpc_channel_credentials* creds =
grpc_channel_credentials_find_in_args(lb_channel_args);
GPR_ASSERT(creds != nullptr);
const char* arg_to_remove = GRPC_ARG_CHANNEL_CREDENTIALS;
grpc_channel_args* new_args =
grpc_channel_args_copy_and_remove(lb_channel_args, &arg_to_remove, 1);
lb_channel_ = grpc_channel_create(uri_str.c_str(), creds, new_args);
GPR_ASSERT(lb_channel_ != nullptr);
grpc_channel_args_destroy(new_args);
// Set up channelz linkage.
channelz::ChannelNode* child_channelz_node =
grpc_channel_get_channelz_node(lb_channel_);
channelz::ChannelNode* parent_channelz_node =
grpc_channel_args_find_pointer<channelz::ChannelNode>(
&args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
}
}
// Propagate updates to the LB channel (pick_first) through the fake
// resolver.
Resolver::Result result;
result.addresses = std::move(balancer_addresses);
result.args = lb_channel_args;
response_generator_->SetResponse(std::move(result));
}
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
ClientChannel* client_channel = ClientChannel::GetFromChannel(lb_channel_);
GPR_ASSERT(client_channel != nullptr);
client_channel->RemoveConnectivityWatcher(watcher_);
}
//
// code for balancer channel and call
//
void GrpcLb::StartBalancerCallLocked() {
GPR_ASSERT(lb_channel_ != nullptr);
if (shutting_down_) return;
// Init the LB call data.
GPR_ASSERT(lb_calld_ == nullptr);
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
this, lb_channel_, lb_calld_.get());
}
lb_calld_->StartQuery();
}
void GrpcLb::StartBalancerCallRetryTimerLocked() {
Timestamp next_try = lb_call_backoff_.NextAttemptTime();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Duration timeout = next_try - ExecCtx::Get()->Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout.millis());
} else {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
this);
}
}
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() {
grpclb_policy->OnBalancerCallRetryTimerLocked(error);
},
DEBUG_LOCATION);
}
void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
}
StartBalancerCallLocked();
}
Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
GRPC_ERROR_UNREF(error);
}
//
// code for handling fallback mode
//
void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
// Enter fallback mode if all of the following are true:
// - We are not currently in fallback mode.
// - We are not currently waiting for the initial fallback timeout.
// - We are not currently in contact with the balancer.
// - The child policy is not in state READY.
if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
(lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
!child_policy_ready_) {
gpr_log(GPR_INFO,
"[grpclb %p] lost contact with balancer and backends from "
"most recent serverlist; entering fallback mode",
this);
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
}
void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
(void)GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (fallback_at_startup_checks_pending_ && !shutting_down_ &&
error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
this);
fallback_at_startup_checks_pending_ = false;
CancelBalancerChannelConnectivityWatchLocked();
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
Unref(DEBUG_LOCATION, "on_fallback_timer");
GRPC_ERROR_UNREF(error);
}
//
// code for interacting with the child policy
//
grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
absl::InlinedVector<grpc_arg, 2> args_to_add;
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer));
if (is_backend_from_grpclb_load_balancer) {
args_to_add.emplace_back(grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
}
return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
args_to_add.size());
}
OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
const grpc_channel_args* args) {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper = absl::make_unique<Helper>(Ref());
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_glb_trace);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Created new child policy handler (%p)", this,
lb_policy.get());
}
// Add the gRPC LB's interested_parties pollset_set to that of the newly
// created child policy. This will make the child policy progress upon
// activity on gRPC LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
interested_parties());
return lb_policy;
}
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new child policy will fail the picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
update_args.resolution_note = absl::StrCat(
"grpclb in fallback mode without any balancer addresses: ",
resolution_note_);
}
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
}
update_args.args =
CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
GPR_ASSERT(update_args.args != nullptr);
update_args.config = config_->child_policy();
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(update_args.args);
}
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Updating child policy handler %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(std::move(update_args));
}
//
// subchannel caching
//
void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
Timestamp deletion_time = ExecCtx::Get()->Now() + subchannel_cache_interval_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
subchannel_cache_timer_pending_ = true;
StartSubchannelCacheTimerLocked();
}
}
void GrpcLb::StartSubchannelCacheTimerLocked() {
GPR_ASSERT(!cached_subchannels_.empty());
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first,
&on_subchannel_cache_timer_);
}
void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcLb*>(arg);
(void)GRPC_ERROR_REF(error);
self->work_serializer()->Run(
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
if (subchannel_cache_timer_pending_ && error == GRPC_ERROR_NONE) {
auto it = cached_subchannels_.begin();
if (it != cached_subchannels_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] removing %" PRIuPTR " subchannels from cache",
this, it->second.size());
}
cached_subchannels_.erase(it);
}
if (!cached_subchannels_.empty()) {
StartSubchannelCacheTimerLocked();
return;
}
subchannel_cache_timer_pending_ = false;
}
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
GRPC_ERROR_UNREF(error);
}
//
// factory
//
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<GrpcLb>(std::move(args));
}
const char* name() const override { return kGrpclb; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error_handle* error) const override {
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
if (json.type() == Json::Type::JSON_NULL) {
return MakeRefCounted<GrpcLbConfig>(nullptr, "");
}
std::vector<grpc_error_handle> error_list;
Json child_policy_config_json_tmp;
const Json* child_policy_config_json;
std::string service_name;
auto it = json.object_value().find("serviceName");
if (it != json.object_value().end()) {
const Json& service_name_json = it->second;
if (service_name_json.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:serviceName error:type should be string"));
} else {
service_name = service_name_json.string_value();
}
}
it = json.object_value().find("childPolicy");
if (it == json.object_value().end()) {
child_policy_config_json_tmp = Json::Array{Json::Object{
{"round_robin", Json::Object()},
}};
child_policy_config_json = &child_policy_config_json_tmp;
} else {
child_policy_config_json = &it->second;
}
grpc_error_handle parse_error = GRPC_ERROR_NONE;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config =
LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
*child_policy_config_json, &parse_error);
if (parse_error != GRPC_ERROR_NONE) {
std::vector<grpc_error_handle> child_errors;
child_errors.push_back(parse_error);
error_list.push_back(
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
}
if (error_list.empty()) {
return MakeRefCounted<GrpcLbConfig>(std::move(child_policy_config),
std::move(service_name));
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
return nullptr;
}
}
};
} // namespace
} // namespace grpc_core
//
// Plugin registration
//
void grpc_lb_policy_grpclb_init() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::GrpcLbFactory>());
}
void grpc_lb_policy_grpclb_shutdown() {}
namespace grpc_core {
void RegisterGrpcLbLoadReportingFilter(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
const grpc_channel_args* args = builder->channel_args();
const grpc_arg* channel_arg =
grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
strcmp(channel_arg->value.string, "grpclb") == 0) {
// TODO(roth): When we get around to re-attempting
// https://github.com/grpc/grpc/pull/16214, we should try to keep
// this filter at the very top of the subchannel stack, since that
// will minimize the number of metadata elements that the filter
// needs to iterate through to find the ClientStats object.
builder->PrependFilter(&grpc_client_load_reporting_filter, nullptr);
}
return true;
});
}
} // namespace grpc_core