| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| /// Implementation of the gRPC LB policy. |
| /// |
| /// This policy takes as input a list of resolved addresses, which must |
| /// include at least one balancer address. |
| /// |
| /// An internal channel (\a lb_channel_) is created for the addresses |
| /// from that are balancers. This channel behaves just like a regular |
| /// channel that uses pick_first to select from the list of balancer |
| /// addresses. |
| /// |
| /// When we get our initial update, we instantiate the internal *streaming* |
| /// call to the LB server (whichever address pick_first chose). The call |
| /// will be complete when either the balancer sends status or when we cancel |
| /// the call (e.g., because we are shutting down). In needed, we retry the |
| /// call. If we received at least one valid message from the server, a new |
| /// call attempt will be made immediately; otherwise, we apply back-off |
| /// delays between attempts. |
| /// |
| /// We maintain an internal child policy (round_robin) instance for distributing |
| /// requests across backends. Whenever we receive a new serverlist from |
| /// the balancer, we update the child policy with the new list of |
| /// addresses. |
| /// |
| /// Once a child policy instance is in place (and getting updated as |
| /// described), calls for a pick, or a cancellation will be serviced right away |
| /// by forwarding them to the child policy instance. Any time there's no child |
| /// policy available (i.e., right after the creation of the xDS policy), pick |
| /// requests are added to a list of pending picks to be flushed and serviced |
| /// when the child policy instance becomes available. |
| /// |
| /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the |
| /// high level design and details. |
| |
| // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when |
| // using that endpoint. Because of various transitive includes in uv.h, |
| // including windows.h on Windows, uv.h must be included before other system |
| // headers. Therefore, sockaddr.h must always be included first. |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/lib/iomgr/sockaddr.h" |
| #include "src/core/lib/iomgr/socket_utils.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <string.h> |
| |
| #include <grpc/byte_buffer_reader.h> |
| #include <grpc/grpc.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/time.h> |
| |
| #include "include/grpc/support/alloc.h" |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| #include "src/core/ext/filters/client_channel/lb_policy.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" |
| #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/parse_address.h" |
| #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" |
| #include "src/core/ext/filters/client_channel/server_address.h" |
| #include "src/core/ext/filters/client_channel/service_config.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/channel_stack.h" |
| #include "src/core/lib/gpr/string.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/gprpp/map.h" |
| #include "src/core/lib/gprpp/memory.h" |
| #include "src/core/lib/gprpp/orphanable.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/sockaddr.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/iomgr/timer.h" |
| #include "src/core/lib/slice/slice_hash_table.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/slice/slice_string_helpers.h" |
| #include "src/core/lib/surface/call.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/channel_init.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| |
| #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| #define GRPC_XDS_RECONNECT_JITTER 0.2 |
| #define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000 |
| #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000 |
| #define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000) |
| #define GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS 10000 |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_lb_xds_trace(false, "xds"); |
| |
| namespace { |
| |
| constexpr char kXds[] = "xds_experimental"; |
| |
| class ParsedXdsConfig : public LoadBalancingPolicy::Config { |
| public: |
| ParsedXdsConfig(const char* balancer_name, |
| RefCountedPtr<LoadBalancingPolicy::Config> child_policy, |
| RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy) |
| : balancer_name_(balancer_name), |
| child_policy_(std::move(child_policy)), |
| fallback_policy_(std::move(fallback_policy)) {} |
| |
| const char* name() const override { return kXds; } |
| |
| const char* balancer_name() const { return balancer_name_; }; |
| |
| RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const { |
| return child_policy_; |
| } |
| |
| RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const { |
| return fallback_policy_; |
| } |
| |
| private: |
| const char* balancer_name_ = nullptr; |
| RefCountedPtr<LoadBalancingPolicy::Config> child_policy_; |
| RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_; |
| }; |
| |
| class XdsLb : public LoadBalancingPolicy { |
| public: |
| explicit XdsLb(Args args); |
| |
| const char* name() const override { return kXds; } |
| |
| void UpdateLocked(UpdateArgs args) override; |
| void ResetBackoffLocked() override; |
| |
| private: |
| // Contains a channel to the LB server and all the data related to the |
| // channel. Holds a ref to the xds policy. |
| class LbChannelState : public InternallyRefCounted<LbChannelState> { |
| public: |
| // An LB call wrapper that can restart a call upon failure. Holds a ref to |
| // the LB channel. The template parameter is the kind of wrapped LB call. |
| template <typename T> |
| class RetryableLbCall : public InternallyRefCounted<RetryableLbCall<T>> { |
| public: |
| explicit RetryableLbCall(RefCountedPtr<LbChannelState> lb_chand); |
| |
| void Orphan() override; |
| |
| void OnCallFinishedLocked(); |
| |
| T* lb_calld() const { return lb_calld_.get(); } |
| LbChannelState* lb_chand() const { return lb_chand_.get(); } |
| |
| private: |
| void StartNewCallLocked(); |
| void StartRetryTimerLocked(); |
| static void OnRetryTimerLocked(void* arg, grpc_error* error); |
| |
| // The wrapped LB call that talks to the LB server. It's instantiated |
| // every time we start a new call. It's null during call retry backoff. |
| OrphanablePtr<T> lb_calld_; |
| // The owing LB channel. |
| RefCountedPtr<LbChannelState> lb_chand_; |
| |
| // Retry state. |
| BackOff backoff_; |
| grpc_timer retry_timer_; |
| grpc_closure on_retry_timer_; |
| bool retry_timer_callback_pending_ = false; |
| |
| bool shutting_down_ = false; |
| }; |
| |
| // Contains an EDS call to the LB server. |
| class EdsCallState : public InternallyRefCounted<EdsCallState> { |
| public: |
| // The ctor and dtor should not be used directly. |
| explicit EdsCallState( |
| RefCountedPtr<RetryableLbCall<EdsCallState>> parent); |
| ~EdsCallState() override; |
| |
| void Orphan() override; |
| |
| RetryableLbCall<EdsCallState>* parent() const { return parent_.get(); } |
| LbChannelState* lb_chand() const { return parent_->lb_chand(); } |
| XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); } |
| bool seen_response() const { return seen_response_; } |
| |
| private: |
| static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
| static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
| |
| bool IsCurrentCallOnChannel() const; |
| |
| // The owning RetryableLbCall<>. |
| RefCountedPtr<RetryableLbCall<EdsCallState>> parent_; |
| bool seen_response_ = false; |
| |
| // Always non-NULL. |
| grpc_call* lb_call_; |
| |
| // recv_initial_metadata |
| grpc_metadata_array initial_metadata_recv_; |
| |
| // send_message |
| grpc_byte_buffer* send_message_payload_ = nullptr; |
| |
| // recv_message |
| grpc_byte_buffer* recv_message_payload_ = nullptr; |
| grpc_closure on_response_received_; |
| |
| // recv_trailing_metadata |
| grpc_metadata_array trailing_metadata_recv_; |
| grpc_status_code status_code_; |
| grpc_slice status_details_; |
| grpc_closure on_status_received_; |
| }; |
| |
| // Contains an LRS call to the LB server. |
| class LrsCallState : public InternallyRefCounted<LrsCallState> { |
| public: |
| // The ctor and dtor should not be used directly. |
| explicit LrsCallState( |
| RefCountedPtr<RetryableLbCall<LrsCallState>> parent); |
| ~LrsCallState() override; |
| |
| void Orphan() override; |
| |
| void MaybeStartReportingLocked(); |
| |
| RetryableLbCall<LrsCallState>* parent() { return parent_.get(); } |
| LbChannelState* lb_chand() const { return parent_->lb_chand(); } |
| XdsLb* xdslb_policy() const { return lb_chand()->xdslb_policy(); } |
| bool seen_response() const { return seen_response_; } |
| |
| private: |
| // Reports client-side load stats according to a fixed interval. |
| class Reporter : public InternallyRefCounted<Reporter> { |
| public: |
| Reporter(RefCountedPtr<LrsCallState> parent, |
| grpc_millis report_interval) |
| : parent_(std::move(parent)), report_interval_(report_interval) { |
| GRPC_CLOSURE_INIT( |
| &on_next_report_timer_, OnNextReportTimerLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| GRPC_CLOSURE_INIT( |
| &on_report_done_, OnReportDoneLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| ScheduleNextReportLocked(); |
| } |
| |
| void Orphan() override; |
| |
| private: |
| void ScheduleNextReportLocked(); |
| static void OnNextReportTimerLocked(void* arg, grpc_error* error); |
| void SendReportLocked(); |
| static void OnReportDoneLocked(void* arg, grpc_error* error); |
| |
| bool IsCurrentReporterOnCall() const { |
| return this == parent_->reporter_.get(); |
| } |
| XdsLb* xdslb_policy() const { return parent_->xdslb_policy(); } |
| |
| // The owning LRS call. |
| RefCountedPtr<LrsCallState> parent_; |
| |
| // The load reporting state. |
| const grpc_millis report_interval_; |
| bool last_report_counters_were_zero_ = false; |
| bool next_report_timer_callback_pending_ = false; |
| grpc_timer next_report_timer_; |
| grpc_closure on_next_report_timer_; |
| grpc_closure on_report_done_; |
| }; |
| |
| static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
| static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
| static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
| |
| bool IsCurrentCallOnChannel() const; |
| |
| // The owning RetryableLbCall<>. |
| RefCountedPtr<RetryableLbCall<LrsCallState>> parent_; |
| bool seen_response_ = false; |
| |
| // Always non-NULL. |
| grpc_call* lb_call_; |
| |
| // recv_initial_metadata |
| grpc_metadata_array initial_metadata_recv_; |
| |
| // send_message |
| grpc_byte_buffer* send_message_payload_ = nullptr; |
| grpc_closure on_initial_request_sent_; |
| |
| // recv_message |
| grpc_byte_buffer* recv_message_payload_ = nullptr; |
| grpc_closure on_response_received_; |
| |
| // recv_trailing_metadata |
| grpc_metadata_array trailing_metadata_recv_; |
| grpc_status_code status_code_; |
| grpc_slice status_details_; |
| grpc_closure on_status_received_; |
| |
| // Load reporting state. |
| UniquePtr<char> cluster_name_; |
| grpc_millis load_reporting_interval_ = 0; |
| OrphanablePtr<Reporter> reporter_; |
| }; |
| |
| LbChannelState(RefCountedPtr<XdsLb> xdslb_policy, const char* balancer_name, |
| const grpc_channel_args& args); |
| ~LbChannelState(); |
| |
| void Orphan() override; |
| |
| grpc_channel* channel() const { return channel_; } |
| XdsLb* xdslb_policy() const { return xdslb_policy_.get(); } |
| EdsCallState* eds_calld() const { return eds_calld_->lb_calld(); } |
| LrsCallState* lrs_calld() const { return lrs_calld_->lb_calld(); } |
| |
| bool IsCurrentChannel() const { |
| return this == xdslb_policy_->lb_chand_.get(); |
| } |
| bool IsPendingChannel() const { |
| return this == xdslb_policy_->pending_lb_chand_.get(); |
| } |
| bool HasActiveEdsCall() const { return eds_calld_->lb_calld() != nullptr; } |
| |
| void StartConnectivityWatchLocked(); |
| void CancelConnectivityWatchLocked(); |
| static void OnConnectivityChangedLocked(void* arg, grpc_error* error); |
| |
| private: |
| // The owning LB policy. |
| RefCountedPtr<XdsLb> xdslb_policy_; |
| |
| // The channel and its status. |
| grpc_channel* channel_; |
| bool shutting_down_ = false; |
| grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE; |
| grpc_closure on_connectivity_changed_; |
| |
| // The retryable XDS calls to the LB server. |
| OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_; |
| OrphanablePtr<RetryableLbCall<LrsCallState>> lrs_calld_; |
| }; |
| |
| // We need this wrapper for the following reasons: |
| // 1. To process per-locality load reporting. |
| // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control |
| // references to it by the xds picker and the locality. |
| class PickerWrapper : public RefCounted<PickerWrapper> { |
| public: |
| PickerWrapper(UniquePtr<SubchannelPicker> picker, |
| RefCountedPtr<XdsClientStats::LocalityStats> locality_stats) |
| : picker_(std::move(picker)), |
| locality_stats_(std::move(locality_stats)) { |
| locality_stats_->RefByPicker(); |
| } |
| ~PickerWrapper() { locality_stats_->UnrefByPicker(); } |
| |
| PickResult Pick(PickArgs args); |
| |
| private: |
| static void RecordCallCompletion( |
| void* arg, grpc_error* error, |
| LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata, |
| LoadBalancingPolicy::CallState* call_state); |
| |
| UniquePtr<SubchannelPicker> picker_; |
| RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_; |
| }; |
| |
| // The picker will use a stateless weighting algorithm to pick the locality to |
| // use for each request. |
| class Picker : public SubchannelPicker { |
| public: |
| // Maintains a weighted list of pickers from each locality that is in ready |
| // state. The first element in the pair represents the end of a range |
| // proportional to the locality's weight. The start of the range is the |
| // previous value in the vector and is 0 for the first element. |
| using PickerList = |
| InlinedVector<Pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>; |
| Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers) |
| : xds_policy_(std::move(xds_policy)), |
| pickers_(std::move(pickers)), |
| drop_config_(xds_policy_->drop_config_) {} |
| |
| PickResult Pick(PickArgs args) override; |
| |
| private: |
| // Calls the picker of the locality that the key falls within. |
| PickResult PickFromLocality(const uint32_t key, PickArgs args); |
| |
| RefCountedPtr<XdsLb> xds_policy_; |
| PickerList pickers_; |
| RefCountedPtr<XdsDropConfig> drop_config_; |
| }; |
| |
| class FallbackHelper : public ChannelControlHelper { |
| public: |
| explicit FallbackHelper(RefCountedPtr<XdsLb> parent) |
| : parent_(std::move(parent)) {} |
| |
| ~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); } |
| |
| RefCountedPtr<SubchannelInterface> CreateSubchannel( |
| const grpc_channel_args& args) override; |
| void UpdateState(grpc_connectivity_state state, |
| UniquePtr<SubchannelPicker> picker) override; |
| void RequestReresolution() override; |
| void AddTraceEvent(TraceSeverity severity, StringView message) override; |
| |
| void set_child(LoadBalancingPolicy* child) { child_ = child; } |
| |
| private: |
| bool CalledByPendingFallback() const; |
| bool CalledByCurrentFallback() const; |
| |
| RefCountedPtr<XdsLb> parent_; |
| LoadBalancingPolicy* child_ = nullptr; |
| }; |
| |
| // There is only one PriorityList instance, which has the same lifetime with |
| // the XdsLb instance. |
| class PriorityList { |
| public: |
| // Each LocalityMap holds a ref to the XdsLb. |
| class LocalityMap : public InternallyRefCounted<LocalityMap> { |
| public: |
| // Each Locality holds a ref to the LocalityMap it is in. |
| class Locality : public InternallyRefCounted<Locality> { |
| public: |
| Locality(RefCountedPtr<LocalityMap> locality_map, |
| RefCountedPtr<XdsLocalityName> name); |
| ~Locality(); |
| |
| void UpdateLocked(uint32_t locality_weight, |
| ServerAddressList serverlist); |
| void ShutdownLocked(); |
| void ResetBackoffLocked(); |
| void DeactivateLocked(); |
| void Orphan() override; |
| |
| grpc_connectivity_state connectivity_state() const { |
| return connectivity_state_; |
| } |
| uint32_t weight() const { return weight_; } |
| RefCountedPtr<PickerWrapper> picker_wrapper() const { |
| return picker_wrapper_; |
| } |
| |
| void set_locality_map(RefCountedPtr<LocalityMap> locality_map) { |
| locality_map_ = std::move(locality_map); |
| } |
| |
| private: |
| class Helper : public ChannelControlHelper { |
| public: |
| explicit Helper(RefCountedPtr<Locality> locality) |
| : locality_(std::move(locality)) {} |
| |
| ~Helper() { locality_.reset(DEBUG_LOCATION, "Helper"); } |
| |
| RefCountedPtr<SubchannelInterface> CreateSubchannel( |
| const grpc_channel_args& args) override; |
| void UpdateState(grpc_connectivity_state state, |
| UniquePtr<SubchannelPicker> picker) override; |
| void RequestReresolution() override; |
| void AddTraceEvent(TraceSeverity severity, |
| StringView message) override; |
| void set_child(LoadBalancingPolicy* child) { child_ = child; } |
| |
| private: |
| bool CalledByPendingChild() const; |
| bool CalledByCurrentChild() const; |
| |
| RefCountedPtr<Locality> locality_; |
| LoadBalancingPolicy* child_ = nullptr; |
| }; |
| |
| // Methods for dealing with the child policy. |
| OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
| const char* name, const grpc_channel_args* args); |
| grpc_channel_args* CreateChildPolicyArgsLocked( |
| const grpc_channel_args* args); |
| |
| static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
| |
| XdsLb* xds_policy() const { return locality_map_->xds_policy(); } |
| |
| // The owning locality map. |
| RefCountedPtr<LocalityMap> locality_map_; |
| |
| RefCountedPtr<XdsLocalityName> name_; |
| OrphanablePtr<LoadBalancingPolicy> child_policy_; |
| OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; |
| RefCountedPtr<PickerWrapper> picker_wrapper_; |
| grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; |
| uint32_t weight_; |
| |
| // States for delayed removal. |
| grpc_timer delayed_removal_timer_; |
| grpc_closure on_delayed_removal_timer_; |
| bool delayed_removal_timer_callback_pending_ = false; |
| bool shutdown_ = false; |
| }; |
| |
| LocalityMap(RefCountedPtr<XdsLb> xds_policy, uint32_t priority); |
| |
| void UpdateLocked( |
| const XdsPriorityListUpdate::LocalityMap& locality_map_update); |
| void ResetBackoffLocked(); |
| void UpdateXdsPickerLocked(); |
| OrphanablePtr<Locality> ExtractLocalityLocked( |
| const RefCountedPtr<XdsLocalityName>& name); |
| void DeactivateLocked(); |
| // Returns true if this locality map becomes the currently used one (i.e., |
| // its priority is selected) after reactivation. |
| bool MaybeReactivateLocked(); |
| void MaybeCancelFailoverTimerLocked(); |
| |
| void Orphan() override; |
| |
| XdsLb* xds_policy() const { return xds_policy_.get(); } |
| uint32_t priority() const { return priority_; } |
| grpc_connectivity_state connectivity_state() const { |
| return connectivity_state_; |
| } |
| bool failover_timer_callback_pending() const { |
| return failover_timer_callback_pending_; |
| } |
| |
| private: |
| void OnLocalityStateUpdateLocked(); |
| void UpdateConnectivityStateLocked(); |
| static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
| static void OnFailoverTimerLocked(void* arg, grpc_error* error); |
| |
| PriorityList* priority_list() const { |
| return &xds_policy_->priority_list_; |
| } |
| const XdsPriorityListUpdate& priority_list_update() const { |
| return xds_policy_->priority_list_update_; |
| } |
| const XdsPriorityListUpdate::LocalityMap* locality_map_update() const { |
| return xds_policy_->priority_list_update_.Find(priority_); |
| } |
| |
| RefCountedPtr<XdsLb> xds_policy_; |
| |
| Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<Locality>, |
| XdsLocalityName::Less> |
| localities_; |
| const uint32_t priority_; |
| grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; |
| |
| // States for delayed removal. |
| grpc_timer delayed_removal_timer_; |
| grpc_closure on_delayed_removal_timer_; |
| bool delayed_removal_timer_callback_pending_ = false; |
| |
| // States of failover. |
| grpc_timer failover_timer_; |
| grpc_closure on_failover_timer_; |
| bool failover_timer_callback_pending_ = false; |
| }; |
| |
| explicit PriorityList(XdsLb* xds_policy) : xds_policy_(xds_policy) {} |
| |
| void UpdateLocked(); |
| void ResetBackoffLocked(); |
| void ShutdownLocked(); |
| void UpdateXdsPickerLocked(); |
| |
| const XdsPriorityListUpdate& priority_list_update() const { |
| return xds_policy_->priority_list_update_; |
| } |
| uint32_t current_priority() const { return current_priority_; } |
| |
| private: |
| void MaybeCreateLocalityMapLocked(uint32_t priority); |
| void FailoverOnConnectionFailureLocked(); |
| void FailoverOnDisconnectionLocked(uint32_t failed_priority); |
| void SwitchToHigherPriorityLocked(uint32_t priority); |
| void DeactivatePrioritiesLowerThan(uint32_t priority); |
| OrphanablePtr<LocalityMap::Locality> ExtractLocalityLocked( |
| const RefCountedPtr<XdsLocalityName>& name, uint32_t exclude_priority); |
| // Callers should make sure the priority list is non-empty. |
| uint32_t LowestPriority() const { |
| return static_cast<uint32_t>(priorities_.size()) - 1; |
| } |
| bool Contains(uint32_t priority) { return priority < priorities_.size(); } |
| |
| XdsLb* xds_policy_; |
| |
| // The list of locality maps, indexed by priority. P0 is the highest |
| // priority. |
| InlinedVector<OrphanablePtr<LocalityMap>, 2> priorities_; |
| // The priority that is being used. |
| uint32_t current_priority_ = UINT32_MAX; |
| }; |
| |
| ~XdsLb(); |
| |
| void ShutdownLocked() override; |
| |
| // Helper function used in UpdateLocked(). |
| void ProcessAddressesAndChannelArgsLocked(ServerAddressList addresses, |
| const grpc_channel_args& args); |
| |
| // Parses the xds config given the JSON node of the first child of XdsConfig. |
| // If parsing succeeds, updates \a balancer_name, and updates \a |
| // child_policy_config_ and \a fallback_policy_config_ if they are also |
| // found. Does nothing upon failure. |
| void ParseLbConfig(const ParsedXdsConfig* xds_config); |
| |
| LbChannelState* LatestLbChannel() const { |
| return pending_lb_chand_ != nullptr ? pending_lb_chand_.get() |
| : lb_chand_.get(); |
| } |
| |
| // Methods for dealing with fallback state. |
| void MaybeCancelFallbackAtStartupChecks(); |
| static void OnFallbackTimerLocked(void* arg, grpc_error* error); |
| void UpdateFallbackPolicyLocked(); |
| OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked( |
| const char* name, const grpc_channel_args* args); |
| void MaybeExitFallbackMode(); |
| |
| // Name of the backend server to connect to. |
| const char* server_name_ = nullptr; |
| |
| // Name of the balancer to connect to. |
| UniquePtr<char> balancer_name_; |
| |
| // Current channel args from the resolver. |
| grpc_channel_args* args_ = nullptr; |
| |
| // Internal state. |
| bool shutting_down_ = false; |
| |
| // The channel for communicating with the LB server. |
| OrphanablePtr<LbChannelState> lb_chand_; |
| OrphanablePtr<LbChannelState> pending_lb_chand_; |
| |
| // Timeout in milliseconds for the LB call. 0 means no deadline. |
| const grpc_millis lb_call_timeout_ms_; |
| |
| // Whether the checks for fallback at startup are ALL pending. There are |
| // several cases where this can be reset: |
| // 1. The fallback timer fires, we enter fallback mode. |
| // 2. Before the fallback timer fires, the LB channel becomes |
| // TRANSIENT_FAILURE or the LB call fails, we enter fallback mode. |
| // 3. Before the fallback timer fires, if any child policy in the locality map |
| // becomes READY, we cancel the fallback timer. |
| bool fallback_at_startup_checks_pending_ = false; |
| // Timeout in milliseconds for before using fallback backend addresses. |
| // 0 means not using fallback. |
| const grpc_millis lb_fallback_timeout_ms_; |
| // The backend addresses from the resolver. |
| ServerAddressList fallback_backend_addresses_; |
| // Fallback timer. |
| grpc_timer lb_fallback_timer_; |
| grpc_closure lb_on_fallback_; |
| |
| // The policy to use for the fallback backends. |
| RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_; |
| // Non-null iff we are in fallback mode. |
| OrphanablePtr<LoadBalancingPolicy> fallback_policy_; |
| OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_; |
| |
| // The policy to use for the backends. |
| RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_; |
| const grpc_millis locality_retention_interval_ms_; |
| const grpc_millis locality_map_failover_timeout_ms_; |
| // A list of locality maps indexed by priority. |
| PriorityList priority_list_; |
| // The update for priority_list_. |
| XdsPriorityListUpdate priority_list_update_; |
| |
| // The config for dropping calls. |
| RefCountedPtr<XdsDropConfig> drop_config_; |
| |
| // The stats for client-side load reporting. |
| XdsClientStats client_stats_; |
| }; |
| |
| // |
| // XdsLb::PickerWrapper::Pick |
| // |
| |
| LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick( |
| LoadBalancingPolicy::PickArgs args) { |
| // Forward the pick to the picker returned from the child policy. |
| PickResult result = picker_->Pick(args); |
| if (result.type != PickResult::PICK_COMPLETE || |
| result.subchannel == nullptr || locality_stats_ == nullptr) { |
| return result; |
| } |
| // Record a call started. |
| locality_stats_->AddCallStarted(); |
| // Intercept the recv_trailing_metadata op to record call completion. |
| result.recv_trailing_metadata_ready = RecordCallCompletion; |
| result.recv_trailing_metadata_ready_user_data = |
| locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release(); |
| return result; |
| } |
| |
| // Note that the following callback does not run in either the control plane |
| // combiner or the data plane combiner. |
| void XdsLb::PickerWrapper::RecordCallCompletion( |
| void* arg, grpc_error* error, |
| LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata, |
| LoadBalancingPolicy::CallState* call_state) { |
| XdsClientStats::LocalityStats* locality_stats = |
| static_cast<XdsClientStats::LocalityStats*>(arg); |
| const bool call_failed = error != GRPC_ERROR_NONE; |
| locality_stats->AddCallFinished(call_failed); |
| locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call"); |
| } |
| |
| // |
| // XdsLb::Picker |
| // |
| |
| XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) { |
| // Handle drop. |
| const UniquePtr<char>* drop_category; |
| if (drop_config_->ShouldDrop(&drop_category)) { |
| xds_policy_->client_stats_.AddCallDropped(*drop_category); |
| PickResult result; |
| result.type = PickResult::PICK_COMPLETE; |
| return result; |
| } |
| // Generate a random number in [0, total weight). |
| const uint32_t key = rand() % pickers_[pickers_.size() - 1].first; |
| // Forward pick to whichever locality maps to the range in which the |
| // random number falls in. |
| return PickFromLocality(key, args); |
| } |
| |
| XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, |
| PickArgs args) { |
| size_t mid = 0; |
| size_t start_index = 0; |
| size_t end_index = pickers_.size() - 1; |
| size_t index = 0; |
| while (end_index > start_index) { |
| mid = (start_index + end_index) / 2; |
| if (pickers_[mid].first > key) { |
| end_index = mid; |
| } else if (pickers_[mid].first < key) { |
| start_index = mid + 1; |
| } else { |
| index = mid + 1; |
| break; |
| } |
| } |
| if (index == 0) index = start_index; |
| GPR_ASSERT(pickers_[index].first > key); |
| return pickers_[index].second->Pick(args); |
| } |
| |
| // |
| // XdsLb::FallbackHelper |
| // |
| |
| bool XdsLb::FallbackHelper::CalledByPendingFallback() const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == parent_->pending_fallback_policy_.get(); |
| } |
| |
| bool XdsLb::FallbackHelper::CalledByCurrentFallback() const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == parent_->fallback_policy_.get(); |
| } |
| |
| RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel( |
| const grpc_channel_args& args) { |
| if (parent_->shutting_down_ || |
| (!CalledByPendingFallback() && !CalledByCurrentFallback())) { |
| return nullptr; |
| } |
| return parent_->channel_control_helper()->CreateSubchannel(args); |
| } |
| |
| void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state, |
| UniquePtr<SubchannelPicker> picker) { |
| if (parent_->shutting_down_) return; |
| // If this request is from the pending fallback policy, ignore it until |
| // it reports READY, at which point we swap it into place. |
| if (CalledByPendingFallback()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log( |
| GPR_INFO, |
| "[xdslb %p helper %p] pending fallback policy %p reports state=%s", |
| parent_.get(), this, parent_->pending_fallback_policy_.get(), |
| grpc_connectivity_state_name(state)); |
| } |
| if (state != GRPC_CHANNEL_READY) return; |
| grpc_pollset_set_del_pollset_set( |
| parent_->fallback_policy_->interested_parties(), |
| parent_->interested_parties()); |
| parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_); |
| } else if (!CalledByCurrentFallback()) { |
| // This request is from an outdated fallback policy, so ignore it. |
| return; |
| } |
| parent_->channel_control_helper()->UpdateState(state, std::move(picker)); |
| } |
| |
| void XdsLb::FallbackHelper::RequestReresolution() { |
| if (parent_->shutting_down_) return; |
| const LoadBalancingPolicy* latest_fallback_policy = |
| parent_->pending_fallback_policy_ != nullptr |
| ? parent_->pending_fallback_policy_.get() |
| : parent_->fallback_policy_.get(); |
| if (child_ != latest_fallback_policy) return; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Re-resolution requested from the fallback policy (%p).", |
| parent_.get(), child_); |
| } |
| GPR_ASSERT(parent_->lb_chand_ != nullptr); |
| parent_->channel_control_helper()->RequestReresolution(); |
| } |
| |
| void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity, |
| StringView message) { |
| if (parent_->shutting_down_ || |
| (!CalledByPendingFallback() && !CalledByCurrentFallback())) { |
| return; |
| } |
| parent_->channel_control_helper()->AddTraceEvent(severity, message); |
| } |
| |
| // |
| // XdsLb::LbChannelState |
| // |
| |
| XdsLb::LbChannelState::LbChannelState(RefCountedPtr<XdsLb> xdslb_policy, |
| const char* balancer_name, |
| const grpc_channel_args& args) |
| : InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace), |
| xdslb_policy_(std::move(xdslb_policy)) { |
| GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked, |
| this, grpc_combiner_scheduler(xdslb_policy_->combiner())); |
| channel_ = CreateXdsBalancerChannel(balancer_name, args); |
| GPR_ASSERT(channel_ != nullptr); |
| eds_calld_.reset(New<RetryableLbCall<EdsCallState>>( |
| Ref(DEBUG_LOCATION, "LbChannelState+eds"))); |
| lrs_calld_.reset(New<RetryableLbCall<LrsCallState>>( |
| Ref(DEBUG_LOCATION, "LbChannelState+lrs"))); |
| } |
| |
| XdsLb::LbChannelState::~LbChannelState() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Destroying LB channel %p", xdslb_policy(), |
| this); |
| } |
| grpc_channel_destroy(channel_); |
| } |
| |
| void XdsLb::LbChannelState::Orphan() { |
| shutting_down_ = true; |
| eds_calld_.reset(); |
| lrs_calld_.reset(); |
| Unref(DEBUG_LOCATION, "LbChannelState+orphaned"); |
| } |
| |
| void XdsLb::LbChannelState::StartConnectivityWatchLocked() { |
| grpc_channel_element* client_channel_elem = |
| grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); |
| GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
| // Ref held by callback. |
| Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release(); |
| grpc_client_channel_watch_connectivity_state( |
| client_channel_elem, |
| grpc_polling_entity_create_from_pollset_set( |
| xdslb_policy_->interested_parties()), |
| &connectivity_, &on_connectivity_changed_, nullptr); |
| } |
| |
| void XdsLb::LbChannelState::CancelConnectivityWatchLocked() { |
| grpc_channel_element* client_channel_elem = |
| grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); |
| GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
| grpc_client_channel_watch_connectivity_state( |
| client_channel_elem, |
| grpc_polling_entity_create_from_pollset_set( |
| xdslb_policy_->interested_parties()), |
| nullptr, &on_connectivity_changed_, nullptr); |
| } |
| |
| void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg, |
| grpc_error* error) { |
| LbChannelState* self = static_cast<LbChannelState*>(arg); |
| if (!self->shutting_down_ && |
| self->xdslb_policy_->fallback_at_startup_checks_pending_) { |
| if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| // Not in TRANSIENT_FAILURE. Renew connectivity watch. |
| grpc_channel_element* client_channel_elem = |
| grpc_channel_stack_last_element( |
| grpc_channel_get_channel_stack(self->channel_)); |
| GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); |
| grpc_client_channel_watch_connectivity_state( |
| client_channel_elem, |
| grpc_polling_entity_create_from_pollset_set( |
| self->xdslb_policy_->interested_parties()), |
| &self->connectivity_, &self->on_connectivity_changed_, nullptr); |
| return; // Early out so we don't drop the ref below. |
| } |
| // In TRANSIENT_FAILURE. Cancel the fallback timer and go into |
| // fallback mode immediately. |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; " |
| "entering fallback mode", |
| self); |
| self->xdslb_policy_->fallback_at_startup_checks_pending_ = false; |
| grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_); |
| self->xdslb_policy_->UpdateFallbackPolicyLocked(); |
| } |
| // Done watching connectivity state, so drop ref. |
| self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done"); |
| } |
| |
| // |
| // XdsLb::LbChannelState::RetryableLbCall<> |
| // |
| |
| template <typename T> |
| XdsLb::LbChannelState::RetryableLbCall<T>::RetryableLbCall( |
| RefCountedPtr<LbChannelState> lb_chand) |
| : lb_chand_(std::move(lb_chand)), |
| backoff_( |
| BackOff::Options() |
| .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * |
| 1000) |
| .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) |
| .set_jitter(GRPC_XDS_RECONNECT_JITTER) |
| .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { |
| GRPC_CLOSURE_INIT( |
| &on_retry_timer_, OnRetryTimerLocked, this, |
| grpc_combiner_scheduler(lb_chand_->xdslb_policy()->combiner())); |
| StartNewCallLocked(); |
| } |
| |
| template <typename T> |
| void XdsLb::LbChannelState::RetryableLbCall<T>::Orphan() { |
| shutting_down_ = true; |
| lb_calld_.reset(); |
| if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_); |
| this->Unref(DEBUG_LOCATION, "RetryableLbCall+orphaned"); |
| } |
| |
| template <typename T> |
| void XdsLb::LbChannelState::RetryableLbCall<T>::OnCallFinishedLocked() { |
| const bool seen_response = lb_calld_->seen_response(); |
| lb_calld_.reset(); |
| if (seen_response) { |
| // If we lost connection to the LB server, reset backoff and restart the LB |
| // call immediately. |
| backoff_.Reset(); |
| StartNewCallLocked(); |
| } else { |
| // If we failed to connect to the LB server, retry later. |
| StartRetryTimerLocked(); |
| } |
| } |
| |
| template <typename T> |
| void XdsLb::LbChannelState::RetryableLbCall<T>::StartNewCallLocked() { |
| if (shutting_down_) return; |
| GPR_ASSERT(lb_chand_->channel_ != nullptr); |
| GPR_ASSERT(lb_calld_ == nullptr); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Start new call from retryable call (lb_chand: %p, " |
| "retryable call: %p)", |
| lb_chand()->xdslb_policy(), lb_chand(), this); |
| } |
| lb_calld_ = MakeOrphanable<T>( |
| this->Ref(DEBUG_LOCATION, "RetryableLbCall+start_new_call")); |
| } |
| |
| template <typename T> |
| void XdsLb::LbChannelState::RetryableLbCall<T>::StartRetryTimerLocked() { |
| if (shutting_down_) return; |
| const grpc_millis next_attempt_time = backoff_.NextAttemptTime(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0); |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Failed to connect to LB server (lb_chand: %p) " |
| "retry timer will fire in %" PRId64 "ms.", |
| lb_chand()->xdslb_policy(), lb_chand(), timeout); |
| } |
| this->Ref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_start").release(); |
| grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); |
| retry_timer_callback_pending_ = true; |
| } |
| |
| template <typename T> |
| void XdsLb::LbChannelState::RetryableLbCall<T>::OnRetryTimerLocked( |
| void* arg, grpc_error* error) { |
| RetryableLbCall* lb_calld = static_cast<RetryableLbCall*>(arg); |
| lb_calld->retry_timer_callback_pending_ = false; |
| if (!lb_calld->shutting_down_ && error == GRPC_ERROR_NONE) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Retry timer fires (lb_chand: %p, retryable call: %p)", |
| lb_calld->lb_chand()->xdslb_policy(), lb_calld->lb_chand(), |
| lb_calld); |
| } |
| lb_calld->StartNewCallLocked(); |
| } |
| lb_calld->Unref(DEBUG_LOCATION, "RetryableLbCall+retry_timer_done"); |
| } |
| |
| // |
| // XdsLb::LbChannelState::EdsCallState |
| // |
| |
| XdsLb::LbChannelState::EdsCallState::EdsCallState( |
| RefCountedPtr<RetryableLbCall<EdsCallState>> parent) |
| : InternallyRefCounted<EdsCallState>(&grpc_lb_xds_trace), |
| parent_(std::move(parent)) { |
| // Init the LB call. Note that the LB call will progress every time there's |
| // activity in xdslb_policy()->interested_parties(), which is comprised of |
| // the polling entities from client_channel. |
| GPR_ASSERT(xdslb_policy() != nullptr); |
| GPR_ASSERT(xdslb_policy()->server_name_ != nullptr); |
| GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0'); |
| const grpc_millis deadline = |
| xdslb_policy()->lb_call_timeout_ms_ == 0 |
| ? GRPC_MILLIS_INF_FUTURE |
| : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; |
| // Create an LB call with the specified method name. |
| lb_call_ = grpc_channel_create_pollset_set_call( |
| lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
| xdslb_policy()->interested_parties(), |
| GRPC_MDSTR_SLASH_ENVOY_DOT_API_DOT_V2_DOT_ENDPOINTDISCOVERYSERVICE_SLASH_STREAMENDPOINTS, |
| nullptr, deadline, nullptr); |
| GPR_ASSERT(lb_call_ != nullptr); |
| // Init the LB call request payload. |
| grpc_slice request_payload_slice = |
| XdsEdsRequestCreateAndEncode(xdslb_policy()->server_name_); |
| send_message_payload_ = |
| grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
| grpc_slice_unref_internal(request_payload_slice); |
| // Init other data associated with the LB call. |
| grpc_metadata_array_init(&initial_metadata_recv_); |
| grpc_metadata_array_init(&trailing_metadata_recv_); |
| GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| // Start the call. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Starting EDS call (lb_chand: %p, lb_calld: %p, " |
| "lb_call: %p)", |
| xdslb_policy(), lb_chand(), this, lb_call_); |
| } |
| // Create the ops. |
| grpc_call_error call_error; |
| grpc_op ops[3]; |
| memset(ops, 0, sizeof(ops)); |
| // Op: send initial metadata. |
| grpc_op* op = ops; |
| op->op = GRPC_OP_SEND_INITIAL_METADATA; |
| op->data.send_initial_metadata.count = 0; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: send request message. |
| GPR_ASSERT(send_message_payload_ != nullptr); |
| op->op = GRPC_OP_SEND_MESSAGE; |
| op->data.send_message.send_message = send_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| call_error = grpc_call_start_batch_and_execute(lb_call_, ops, |
| (size_t)(op - ops), nullptr); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv initial metadata. |
| op = ops; |
| op->op = GRPC_OP_RECV_INITIAL_METADATA; |
| op->data.recv_initial_metadata.recv_initial_metadata = |
| &initial_metadata_recv_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: recv response. |
| op->op = GRPC_OP_RECV_MESSAGE; |
| op->data.recv_message.recv_message = &recv_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| Ref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked").release(); |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &on_response_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv server status. |
| op = ops; |
| op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
| op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; |
| op->data.recv_status_on_client.status = &status_code_; |
| op->data.recv_status_on_client.status_details = &status_details_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // This callback signals the end of the LB call, so it relies on the initial |
| // ref instead of a new ref. When it's invoked, it's the initial ref that is |
| // unreffed. |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &on_status_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| |
| XdsLb::LbChannelState::EdsCallState::~EdsCallState() { |
| grpc_metadata_array_destroy(&initial_metadata_recv_); |
| grpc_metadata_array_destroy(&trailing_metadata_recv_); |
| grpc_byte_buffer_destroy(send_message_payload_); |
| grpc_byte_buffer_destroy(recv_message_payload_); |
| grpc_slice_unref_internal(status_details_); |
| GPR_ASSERT(lb_call_ != nullptr); |
| grpc_call_unref(lb_call_); |
| } |
| |
| void XdsLb::LbChannelState::EdsCallState::Orphan() { |
| GPR_ASSERT(lb_call_ != nullptr); |
| // If we are here because xdslb_policy wants to cancel the call, |
| // on_status_received_ will complete the cancellation and clean up. Otherwise, |
| // we are here because xdslb_policy has to orphan a failed call, then the |
| // following cancellation will be a no-op. |
| grpc_call_cancel(lb_call_, nullptr); |
| // Note that the initial ref is hold by on_status_received_. So the |
| // corresponding unref happens in on_status_received_ instead of here. |
| } |
| |
| void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( |
| void* arg, grpc_error* error) { |
| EdsCallState* eds_calld = static_cast<EdsCallState*>(arg); |
| LbChannelState* lb_chand = eds_calld->lb_chand(); |
| XdsLb* xdslb_policy = eds_calld->xdslb_policy(); |
| // Empty payload means the LB call was cancelled. |
| if (!eds_calld->IsCurrentCallOnChannel() || |
| eds_calld->recv_message_payload_ == nullptr) { |
| eds_calld->Unref(DEBUG_LOCATION, "EDS+OnResponseReceivedLocked"); |
| return; |
| } |
| // Read the response. |
| grpc_byte_buffer_reader bbr; |
| grpc_byte_buffer_reader_init(&bbr, eds_calld->recv_message_payload_); |
| grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
| grpc_byte_buffer_reader_destroy(&bbr); |
| grpc_byte_buffer_destroy(eds_calld->recv_message_payload_); |
| eds_calld->recv_message_payload_ = nullptr; |
| // TODO(juanlishen): When we convert this to use the xds protocol, the |
| // balancer will send us a fallback timeout such that we should go into |
| // fallback mode if we have lost contact with the balancer after a certain |
| // period of time. We will need to save the timeout value here, and then |
| // when the balancer call ends, we will need to start a timer for the |
| // specified period of time, and if the timer fires, we go into fallback |
| // mode. We will also need to cancel the timer when we receive a serverlist |
| // from the balancer. |
| // This anonymous lambda is a hack to avoid the usage of goto. |
| [&]() { |
| // Parse the response. |
| XdsUpdate update; |
| grpc_error* parse_error = |
| XdsEdsResponseDecodeAndParse(response_slice, &update); |
| if (parse_error != GRPC_ERROR_NONE) { |
| gpr_log(GPR_ERROR, "[xdslb %p] EDS response parsing failed. error=%s", |
| xdslb_policy, grpc_error_string(parse_error)); |
| GRPC_ERROR_UNREF(parse_error); |
| return; |
| } |
| if (update.priority_list_update.empty() && !update.drop_all) { |
| char* response_slice_str = |
| grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); |
| gpr_log(GPR_ERROR, |
| "[xdslb %p] EDS response '%s' doesn't contain any valid locality " |
| "but doesn't require to drop all calls. Ignoring.", |
| xdslb_policy, response_slice_str); |
| gpr_free(response_slice_str); |
| return; |
| } |
| eds_calld->seen_response_ = true; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] EDS response with %" PRIuPTR |
| " priorities and %" PRIuPTR |
| " drop categories received (drop_all=%d)", |
| xdslb_policy, update.priority_list_update.size(), |
| update.drop_config->drop_category_list().size(), update.drop_all); |
| for (size_t priority = 0; priority < update.priority_list_update.size(); |
| ++priority) { |
| const auto* locality_map_update = |
| update.priority_list_update.Find(static_cast<uint32_t>(priority)); |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Priority %" PRIuPTR " contains %" PRIuPTR |
| " localities", |
| xdslb_policy, priority, locality_map_update->size()); |
| size_t locality_count = 0; |
| for (const auto& p : locality_map_update->localities) { |
| const auto& locality = p.second; |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Priority %" PRIuPTR ", locality %" PRIuPTR |
| " %s contains %" PRIuPTR " server addresses", |
| xdslb_policy, priority, locality_count, |
| locality.name->AsHumanReadableString(), |
| locality.serverlist.size()); |
| for (size_t i = 0; i < locality.serverlist.size(); ++i) { |
| char* ipport; |
| grpc_sockaddr_to_string(&ipport, &locality.serverlist[i].address(), |
| false); |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Priority %" PRIuPTR ", locality %" PRIuPTR |
| " %s, server address %" PRIuPTR ": %s", |
| xdslb_policy, priority, locality_count, |
| locality.name->AsHumanReadableString(), i, ipport); |
| gpr_free(ipport); |
| } |
| ++locality_count; |
| } |
| for (size_t i = 0; i < update.drop_config->drop_category_list().size(); |
| ++i) { |
| const XdsDropConfig::DropCategory& drop_category = |
| update.drop_config->drop_category_list()[i]; |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Drop category %s has drop rate %d per million", |
| xdslb_policy, drop_category.name.get(), |
| drop_category.parts_per_million); |
| } |
| } |
| } |
| // Pending LB channel receives a response; promote it. |
| // Note that this call can't be on a discarded pending channel, because |
| // such channels don't have any current call but we have checked this call |
| // is a current call. |
| if (!lb_chand->IsCurrentChannel()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Pending LB channel %p receives EDS response; " |
| "promoting it to replace current LB channel %p", |
| xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get()); |
| } |
| // TODO(juanlishen): Maybe promote the pending LB channel when the |
| // response results a READY locality map. |
| xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_); |
| } |
| // At this point, lb_chand must be the current LB channel, so try to start |
| // load reporting. |
| LrsCallState* lrs_calld = lb_chand->lrs_calld_->lb_calld(); |
| if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); |
| // If the balancer tells us to drop all the calls, we should exit fallback |
| // mode immediately. |
| if (update.drop_all) xdslb_policy->MaybeExitFallbackMode(); |
| // Update the drop config. |
| const bool drop_config_changed = |
| xdslb_policy->drop_config_ == nullptr || |
| *xdslb_policy->drop_config_ != *update.drop_config; |
| xdslb_policy->drop_config_ = std::move(update.drop_config); |
| // Ignore identical locality update. |
| if (xdslb_policy->priority_list_update_ == update.priority_list_update) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Incoming locality update identical to current, " |
| "ignoring. (drop_config_changed=%d)", |
| xdslb_policy, drop_config_changed); |
| } |
| if (drop_config_changed) { |
| xdslb_policy->priority_list_.UpdateXdsPickerLocked(); |
| } |
| return; |
| } |
| // Update the priority list. |
| xdslb_policy->priority_list_update_ = |
| std::move(update.priority_list_update); |
| xdslb_policy->priority_list_.UpdateLocked(); |
| }(); |
| grpc_slice_unref_internal(response_slice); |
| if (xdslb_policy->shutting_down_) { |
| eds_calld->Unref(DEBUG_LOCATION, |
| "EDS+OnResponseReceivedLocked+xds_shutdown"); |
| return; |
| } |
| // Keep listening for serverlist updates. |
| grpc_op op; |
| memset(&op, 0, sizeof(op)); |
| op.op = GRPC_OP_RECV_MESSAGE; |
| op.data.recv_message.recv_message = &eds_calld->recv_message_payload_; |
| op.flags = 0; |
| op.reserved = nullptr; |
| GPR_ASSERT(eds_calld->lb_call_ != nullptr); |
| // Reuse the "EDS+OnResponseReceivedLocked" ref taken in ctor. |
| const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
| eds_calld->lb_call_, &op, 1, &eds_calld->on_response_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| |
| void XdsLb::LbChannelState::EdsCallState::OnStatusReceivedLocked( |
| void* arg, grpc_error* error) { |
| EdsCallState* eds_calld = static_cast<EdsCallState*>(arg); |
| LbChannelState* lb_chand = eds_calld->lb_chand(); |
| XdsLb* xdslb_policy = eds_calld->xdslb_policy(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| char* status_details = grpc_slice_to_c_string(eds_calld->status_details_); |
| gpr_log(GPR_INFO, |
| "[xdslb %p] EDS call status received. Status = %d, details " |
| "= '%s', (lb_chand: %p, eds_calld: %p, lb_call: %p), error '%s'", |
| xdslb_policy, eds_calld->status_code_, status_details, lb_chand, |
| eds_calld, eds_calld->lb_call_, grpc_error_string(error)); |
| gpr_free(status_details); |
| } |
| // Ignore status from a stale call. |
| if (eds_calld->IsCurrentCallOnChannel()) { |
| // Because this call is the current one on the channel, the channel can't |
| // have been swapped out; otherwise, the call should have been reset. |
| GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel()); |
| if (lb_chand != xdslb_policy->LatestLbChannel()) { |
| // This channel must be the current one and there is a pending one. Swap |
| // in the pending one and we are done. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Promoting pending LB channel %p to replace " |
| "current LB channel %p", |
| xdslb_policy, lb_chand, xdslb_policy->lb_chand_.get()); |
| } |
| xdslb_policy->lb_chand_ = std::move(xdslb_policy->pending_lb_chand_); |
| } else { |
| // This channel is the most recently created one. Try to restart the call |
| // and reresolve. |
| eds_calld->parent_->OnCallFinishedLocked(); |
| xdslb_policy->channel_control_helper()->RequestReresolution(); |
| // If the fallback-at-startup checks are pending, go into fallback mode |
| // immediately. This short-circuits the timeout for the |
| // fallback-at-startup case. |
| if (xdslb_policy->fallback_at_startup_checks_pending_) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Balancer call finished; entering fallback mode", |
| xdslb_policy); |
| xdslb_policy->fallback_at_startup_checks_pending_ = false; |
| grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); |
| lb_chand->CancelConnectivityWatchLocked(); |
| xdslb_policy->UpdateFallbackPolicyLocked(); |
| } |
| } |
| } |
| eds_calld->Unref(DEBUG_LOCATION, "EDS+OnStatusReceivedLocked"); |
| } |
| |
| bool XdsLb::LbChannelState::EdsCallState::IsCurrentCallOnChannel() const { |
| // If the retryable EDS call is null (which only happens when the LB channel |
| // is shutting down), all the EDS calls are stale. |
| if (lb_chand()->eds_calld_ == nullptr) return false; |
| return this == lb_chand()->eds_calld_->lb_calld(); |
| } |
| |
| // |
| // XdsLb::LbChannelState::LrsCallState::Reporter |
| // |
| |
| void XdsLb::LbChannelState::LrsCallState::Reporter::Orphan() { |
| if (next_report_timer_callback_pending_) { |
| grpc_timer_cancel(&next_report_timer_); |
| } |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::Reporter::ScheduleNextReportLocked() { |
| const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_; |
| grpc_timer_init(&next_report_timer_, next_report_time, |
| &on_next_report_timer_); |
| next_report_timer_callback_pending_ = true; |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( |
| void* arg, grpc_error* error) { |
| Reporter* self = static_cast<Reporter*>(arg); |
| self->next_report_timer_callback_pending_ = false; |
| if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { |
| self->Unref(DEBUG_LOCATION, "Reporter+timer"); |
| return; |
| } |
| self->SendReportLocked(); |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::Reporter::SendReportLocked() { |
| // Create a request that contains the load report. |
| grpc_slice request_payload_slice = XdsLrsRequestCreateAndEncode( |
| parent_->cluster_name_.get(), &xdslb_policy()->client_stats_); |
| // Skip client load report if the counters were all zero in the last |
| // report and they are still zero in this one. |
| const bool old_val = last_report_counters_were_zero_; |
| last_report_counters_were_zero_ = static_cast<bool>( |
| grpc_slice_eq(request_payload_slice, grpc_empty_slice())); |
| if (old_val && last_report_counters_were_zero_) { |
| ScheduleNextReportLocked(); |
| return; |
| } |
| parent_->send_message_payload_ = |
| grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
| grpc_slice_unref_internal(request_payload_slice); |
| // Send the report. |
| grpc_op op; |
| memset(&op, 0, sizeof(op)); |
| op.op = GRPC_OP_SEND_MESSAGE; |
| op.data.send_message.send_message = parent_->send_message_payload_; |
| grpc_call_error call_error = grpc_call_start_batch_and_execute( |
| parent_->lb_call_, &op, 1, &on_report_done_); |
| if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { |
| gpr_log(GPR_ERROR, |
| "[xdslb %p] lb_calld=%p call_error=%d sending client load report", |
| xdslb_policy(), this, call_error); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::Reporter::OnReportDoneLocked( |
| void* arg, grpc_error* error) { |
| Reporter* self = static_cast<Reporter*>(arg); |
| grpc_byte_buffer_destroy(self->parent_->send_message_payload_); |
| self->parent_->send_message_payload_ = nullptr; |
| if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { |
| // If this reporter is no longer the current one on the call, the reason |
| // might be that it was orphaned for a new one due to config update. |
| if (!self->IsCurrentReporterOnCall()) { |
| self->parent_->MaybeStartReportingLocked(); |
| } |
| self->Unref(DEBUG_LOCATION, "Reporter+report_done"); |
| return; |
| } |
| self->ScheduleNextReportLocked(); |
| } |
| |
| // |
| // XdsLb::LbChannelState::LrsCallState |
| // |
| |
| XdsLb::LbChannelState::LrsCallState::LrsCallState( |
| RefCountedPtr<RetryableLbCall<LrsCallState>> parent) |
| : InternallyRefCounted<LrsCallState>(&grpc_lb_xds_trace), |
| parent_(std::move(parent)) { |
| // Init the LB call. Note that the LB call will progress every time there's |
| // activity in xdslb_policy()->interested_parties(), which is comprised of |
| // the polling entities from client_channel. |
| GPR_ASSERT(xdslb_policy() != nullptr); |
| GPR_ASSERT(xdslb_policy()->server_name_ != nullptr); |
| GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0'); |
| const grpc_millis deadline = |
| xdslb_policy()->lb_call_timeout_ms_ == 0 |
| ? GRPC_MILLIS_INF_FUTURE |
| : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; |
| lb_call_ = grpc_channel_create_pollset_set_call( |
| lb_chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
| xdslb_policy()->interested_parties(), |
| GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS, |
| nullptr, deadline, nullptr); |
| GPR_ASSERT(lb_call_ != nullptr); |
| // Init the LB call request payload. |
| grpc_slice request_payload_slice = |
| XdsLrsRequestCreateAndEncode(xdslb_policy()->server_name_); |
| send_message_payload_ = |
| grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
| grpc_slice_unref_internal(request_payload_slice); |
| // Init other data associated with the LRS call. |
| grpc_metadata_array_init(&initial_metadata_recv_); |
| grpc_metadata_array_init(&trailing_metadata_recv_); |
| GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, |
| grpc_combiner_scheduler(xdslb_policy()->combiner())); |
| // Start the call. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Starting LRS call (lb_chand: %p, lb_calld: %p, " |
| "lb_call: %p)", |
| xdslb_policy(), lb_chand(), this, lb_call_); |
| } |
| // Create the ops. |
| grpc_call_error call_error; |
| grpc_op ops[3]; |
| memset(ops, 0, sizeof(ops)); |
| // Op: send initial metadata. |
| grpc_op* op = ops; |
| op->op = GRPC_OP_SEND_INITIAL_METADATA; |
| op->data.send_initial_metadata.count = 0; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: send request message. |
| GPR_ASSERT(send_message_payload_ != nullptr); |
| op->op = GRPC_OP_SEND_MESSAGE; |
| op->data.send_message.send_message = send_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release(); |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &on_initial_request_sent_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv initial metadata. |
| op = ops; |
| op->op = GRPC_OP_RECV_INITIAL_METADATA; |
| op->data.recv_initial_metadata.recv_initial_metadata = |
| &initial_metadata_recv_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // Op: recv response. |
| op->op = GRPC_OP_RECV_MESSAGE; |
| op->data.recv_message.recv_message = &recv_message_payload_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release(); |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &on_response_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| // Op: recv server status. |
| op = ops; |
| op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
| op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; |
| op->data.recv_status_on_client.status = &status_code_; |
| op->data.recv_status_on_client.status_details = &status_details_; |
| op->flags = 0; |
| op->reserved = nullptr; |
| op++; |
| // This callback signals the end of the LB call, so it relies on the initial |
| // ref instead of a new ref. When it's invoked, it's the initial ref that is |
| // unreffed. |
| call_error = grpc_call_start_batch_and_execute( |
| lb_call_, ops, (size_t)(op - ops), &on_status_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| |
| XdsLb::LbChannelState::LrsCallState::~LrsCallState() { |
| grpc_metadata_array_destroy(&initial_metadata_recv_); |
| grpc_metadata_array_destroy(&trailing_metadata_recv_); |
| grpc_byte_buffer_destroy(send_message_payload_); |
| grpc_byte_buffer_destroy(recv_message_payload_); |
| grpc_slice_unref_internal(status_details_); |
| GPR_ASSERT(lb_call_ != nullptr); |
| grpc_call_unref(lb_call_); |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::Orphan() { |
| reporter_.reset(); |
| GPR_ASSERT(lb_call_ != nullptr); |
| // If we are here because xdslb_policy wants to cancel the call, |
| // on_status_received_ will complete the cancellation and clean up. Otherwise, |
| // we are here because xdslb_policy has to orphan a failed call, then the |
| // following cancellation will be a no-op. |
| grpc_call_cancel(lb_call_, nullptr); |
| // Note that the initial ref is hold by on_status_received_. So the |
| // corresponding unref happens in on_status_received_ instead of here. |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::MaybeStartReportingLocked() { |
| // Don't start if this is not the current call on the current channel. |
| if (!IsCurrentCallOnChannel() || !lb_chand()->IsCurrentChannel()) return; |
| // Don't start again if already started. |
| if (reporter_ != nullptr) return; |
| // Don't start if the previous send_message op (of the initial request or the |
| // last report of the previous reporter) hasn't completed. |
| if (send_message_payload_ != nullptr) return; |
| // Don't start if no LRS response has arrived. |
| if (!seen_response()) return; |
| // Don't start if the EDS call hasn't received any valid response. Note that |
| // this must be the first channel because it is the current channel but its |
| // EDS call hasn't seen any response. |
| EdsCallState* eds_calld = lb_chand()->eds_calld_->lb_calld(); |
| if (eds_calld == nullptr || !eds_calld->seen_response()) return; |
| // Start reporting. |
| lb_chand()->xdslb_policy_->client_stats_.MaybeInitLastReportTime(); |
| reporter_ = MakeOrphanable<Reporter>( |
| Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::OnInitialRequestSentLocked( |
| void* arg, grpc_error* error) { |
| LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
| // Clear the send_message_payload_. |
| grpc_byte_buffer_destroy(lrs_calld->send_message_payload_); |
| lrs_calld->send_message_payload_ = nullptr; |
| lrs_calld->MaybeStartReportingLocked(); |
| lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::OnResponseReceivedLocked( |
| void* arg, grpc_error* error) { |
| LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
| XdsLb* xdslb_policy = lrs_calld->xdslb_policy(); |
| // Empty payload means the LB call was cancelled. |
| if (!lrs_calld->IsCurrentCallOnChannel() || |
| lrs_calld->recv_message_payload_ == nullptr) { |
| lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); |
| return; |
| } |
| // Read the response. |
| grpc_byte_buffer_reader bbr; |
| grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_); |
| grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
| grpc_byte_buffer_reader_destroy(&bbr); |
| grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_); |
| lrs_calld->recv_message_payload_ = nullptr; |
| // This anonymous lambda is a hack to avoid the usage of goto. |
| [&]() { |
| // Parse the response. |
| UniquePtr<char> new_cluster_name; |
| grpc_millis new_load_reporting_interval; |
| grpc_error* parse_error = XdsLrsResponseDecodeAndParse( |
| response_slice, &new_cluster_name, &new_load_reporting_interval); |
| if (parse_error != GRPC_ERROR_NONE) { |
| gpr_log(GPR_ERROR, "[xdslb %p] LRS response parsing failed. error=%s", |
| xdslb_policy, grpc_error_string(parse_error)); |
| GRPC_ERROR_UNREF(parse_error); |
| return; |
| } |
| lrs_calld->seen_response_ = true; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] LRS response received, cluster_name=%s, " |
| "load_report_interval=%" PRId64 "ms", |
| xdslb_policy, new_cluster_name.get(), |
| new_load_reporting_interval); |
| } |
| if (new_load_reporting_interval < |
| GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) { |
| new_load_reporting_interval = |
| GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log( |
| GPR_INFO, |
| "[xdslb %p] Increased load_report_interval to minimum value %dms", |
| xdslb_policy, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); |
| } |
| } |
| // Ignore identical update. |
| if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval && |
| strcmp(lrs_calld->cluster_name_.get(), new_cluster_name.get()) == 0) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Incoming LRS response identical to current, " |
| "ignoring.", |
| xdslb_policy); |
| } |
| return; |
| } |
| // Stop current load reporting (if any) to adopt the new config. |
| lrs_calld->reporter_.reset(); |
| // Record the new config. |
| lrs_calld->cluster_name_ = std::move(new_cluster_name); |
| lrs_calld->load_reporting_interval_ = new_load_reporting_interval; |
| // Try starting sending load report. |
| lrs_calld->MaybeStartReportingLocked(); |
| }(); |
| grpc_slice_unref_internal(response_slice); |
| if (xdslb_policy->shutting_down_) { |
| lrs_calld->Unref(DEBUG_LOCATION, |
| "LRS+OnResponseReceivedLocked+xds_shutdown"); |
| return; |
| } |
| // Keep listening for LRS config updates. |
| grpc_op op; |
| memset(&op, 0, sizeof(op)); |
| op.op = GRPC_OP_RECV_MESSAGE; |
| op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_; |
| op.flags = 0; |
| op.reserved = nullptr; |
| GPR_ASSERT(lrs_calld->lb_call_ != nullptr); |
| // Reuse the "OnResponseReceivedLocked" ref taken in ctor. |
| const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
| lrs_calld->lb_call_, &op, 1, &lrs_calld->on_response_received_); |
| GPR_ASSERT(GRPC_CALL_OK == call_error); |
| } |
| |
| void XdsLb::LbChannelState::LrsCallState::OnStatusReceivedLocked( |
| void* arg, grpc_error* error) { |
| LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
| XdsLb* xdslb_policy = lrs_calld->xdslb_policy(); |
| LbChannelState* lb_chand = lrs_calld->lb_chand(); |
| GPR_ASSERT(lrs_calld->lb_call_ != nullptr); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_); |
| gpr_log(GPR_INFO, |
| "[xdslb %p] LRS call status received. Status = %d, details " |
| "= '%s', (lb_chand: %p, lb_calld: %p, lb_call: %p), error '%s'", |
| xdslb_policy, lrs_calld->status_code_, status_details, lb_chand, |
| lrs_calld, lrs_calld->lb_call_, grpc_error_string(error)); |
| gpr_free(status_details); |
| } |
| // Ignore status from a stale call. |
| if (lrs_calld->IsCurrentCallOnChannel()) { |
| // Because this call is the current one on the channel, the channel can't |
| // have been swapped out; otherwise, the call should have been reset. |
| GPR_ASSERT(lb_chand->IsCurrentChannel() || lb_chand->IsPendingChannel()); |
| GPR_ASSERT(!xdslb_policy->shutting_down_); |
| if (lb_chand == xdslb_policy->LatestLbChannel()) { |
| // This channel is the most recently created one. Try to restart the call |
| // and reresolve. |
| lrs_calld->parent_->OnCallFinishedLocked(); |
| xdslb_policy->channel_control_helper()->RequestReresolution(); |
| } |
| } |
| lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); |
| } |
| |
| bool XdsLb::LbChannelState::LrsCallState::IsCurrentCallOnChannel() const { |
| // If the retryable LRS call is null (which only happens when the LB channel |
| // is shutting down), all the LRS calls are stale. |
| if (lb_chand()->lrs_calld_ == nullptr) return false; |
| return this == lb_chand()->lrs_calld_->lb_calld(); |
| } |
| |
| // |
| // helper code for creating balancer channel |
| // |
| |
| // Returns the channel args for the LB channel, used to create a bidirectional |
| // stream for the reception of load balancing updates. |
| grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { |
| static const char* args_to_remove[] = { |
| // LB policy name, since we want to use the default (pick_first) in |
| // the LB channel. |
| GRPC_ARG_LB_POLICY_NAME, |
| // The service config that contains the LB config. We don't want to |
| // recursively use xds in the LB channel. |
| GRPC_ARG_SERVICE_CONFIG, |
| // The channel arg for the server URI, since that will be different for |
| // the LB channel than for the parent channel. The client channel |
| // factory will re-add this arg with the right value. |
| GRPC_ARG_SERVER_URI, |
| // The LB channel should use the authority indicated by the target |
| // authority table (see \a ModifyXdsBalancerChannelArgs), |
| // as opposed to the authority from the parent channel. |
| GRPC_ARG_DEFAULT_AUTHORITY, |
| // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be |
| // treated as a stand-alone channel and not inherit this argument from the |
| // args of the parent channel. |
| GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, |
| // Don't want to pass down channelz node from parent; the balancer |
| // channel will get its own. |
| GRPC_ARG_CHANNELZ_CHANNEL_NODE, |
| }; |
| // Channel args to add. |
| InlinedVector<grpc_arg, 2> args_to_add; |
| // A channel arg indicating the target is a xds load balancer. |
| args_to_add.emplace_back(grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1)); |
| // The parent channel's channelz uuid. |
| channelz::ChannelNode* channelz_node = nullptr; |
| const grpc_arg* arg = |
| grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE); |
| if (arg != nullptr && arg->type == GRPC_ARG_POINTER && |
| arg->value.pointer.p != nullptr) { |
| channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p); |
| args_to_add.emplace_back( |
| channelz::MakeParentUuidArg(channelz_node->uuid())); |
| } |
| // Construct channel args. |
| grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
| args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(), |
| args_to_add.size()); |
| // Make any necessary modifications for security. |
| return ModifyXdsBalancerChannelArgs(new_args); |
| } |
| |
| // |
| // ctor and dtor |
| // |
| |
| XdsLb::XdsLb(Args args) |
| : LoadBalancingPolicy(std::move(args)), |
| lb_call_timeout_ms_(grpc_channel_args_find_integer( |
| args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX})), |
| lb_fallback_timeout_ms_(grpc_channel_args_find_integer( |
| args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, |
| {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})), |
| locality_retention_interval_ms_(grpc_channel_args_find_integer( |
| args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, |
| {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})), |
| locality_map_failover_timeout_ms_(grpc_channel_args_find_integer( |
| args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, |
| {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})), |
| priority_list_(this) { |
| // Record server name. |
| const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
| const char* server_uri = grpc_channel_arg_get_string(arg); |
| GPR_ASSERT(server_uri != nullptr); |
| grpc_uri* uri = grpc_uri_parse(server_uri, true); |
| GPR_ASSERT(uri->path[0] != '\0'); |
| server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Will use '%s' as the server name for LB request.", this, |
| server_name_); |
| } |
| grpc_uri_destroy(uri); |
| } |
| |
| XdsLb::~XdsLb() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this); |
| } |
| gpr_free((void*)server_name_); |
| grpc_channel_args_destroy(args_); |
| } |
| |
| void XdsLb::ShutdownLocked() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] shutting down", this); |
| } |
| shutting_down_ = true; |
| if (fallback_at_startup_checks_pending_) { |
| grpc_timer_cancel(&lb_fallback_timer_); |
| } |
| priority_list_.ShutdownLocked(); |
| if (fallback_policy_ != nullptr) { |
| grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), |
| interested_parties()); |
| } |
| if (pending_fallback_policy_ != nullptr) { |
| grpc_pollset_set_del_pollset_set( |
| pending_fallback_policy_->interested_parties(), interested_parties()); |
| } |
| fallback_policy_.reset(); |
| pending_fallback_policy_.reset(); |
| // We reset the LB channels here instead of in our destructor because they |
| // hold refs to XdsLb. |
| lb_chand_.reset(); |
| pending_lb_chand_.reset(); |
| } |
| |
| // |
| // public methods |
| // |
| |
| void XdsLb::ResetBackoffLocked() { |
| if (lb_chand_ != nullptr) { |
| grpc_channel_reset_connect_backoff(lb_chand_->channel()); |
| } |
| if (pending_lb_chand_ != nullptr) { |
| grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); |
| } |
| priority_list_.ResetBackoffLocked(); |
| if (fallback_policy_ != nullptr) { |
| fallback_policy_->ResetBackoffLocked(); |
| } |
| if (pending_fallback_policy_ != nullptr) { |
| pending_fallback_policy_->ResetBackoffLocked(); |
| } |
| } |
| |
| void XdsLb::ProcessAddressesAndChannelArgsLocked( |
| ServerAddressList addresses, const grpc_channel_args& args) { |
| // Update fallback address list. |
| fallback_backend_addresses_ = std::move(addresses); |
| // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, |
| // since we use this to trigger the client_load_reporting filter. |
| static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; |
| grpc_arg new_arg = grpc_channel_arg_string_create( |
| (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"xds"); |
| grpc_channel_args_destroy(args_); |
| args_ = grpc_channel_args_copy_and_add_and_remove( |
| &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); |
| // Construct args for balancer channel. |
| grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(&args); |
| // Create an LB channel if we don't have one yet or the balancer name has |
| // changed from the last received one. |
| bool create_lb_channel = lb_chand_ == nullptr; |
| if (lb_chand_ != nullptr) { |
| UniquePtr<char> last_balancer_name( |
| grpc_channel_get_target(LatestLbChannel()->channel())); |
| create_lb_channel = |
| strcmp(last_balancer_name.get(), balancer_name_.get()) != 0; |
| } |
| if (create_lb_channel) { |
| OrphanablePtr<LbChannelState> lb_chand = MakeOrphanable<LbChannelState>( |
| Ref(DEBUG_LOCATION, "XdsLb+LbChannelState"), balancer_name_.get(), |
| *lb_channel_args); |
| if (lb_chand_ == nullptr || !lb_chand_->HasActiveEdsCall()) { |
| GPR_ASSERT(pending_lb_chand_ == nullptr); |
| // If we do not have a working LB channel yet, use the newly created one. |
| lb_chand_ = std::move(lb_chand); |
| } else { |
| // Otherwise, wait until the new LB channel to be ready to swap it in. |
| pending_lb_chand_ = std::move(lb_chand); |
| } |
| } |
| grpc_channel_args_destroy(lb_channel_args); |
| } |
| |
| void XdsLb::ParseLbConfig(const ParsedXdsConfig* xds_config) { |
| if (xds_config == nullptr || xds_config->balancer_name() == nullptr) return; |
| // TODO(yashykt) : does this need to be a gpr_strdup |
| // TODO(juanlishen): Read balancer name from bootstrap file. |
| balancer_name_ = UniquePtr<char>(gpr_strdup(xds_config->balancer_name())); |
| child_policy_config_ = xds_config->child_policy(); |
| fallback_policy_config_ = xds_config->fallback_policy(); |
| } |
| |
| void XdsLb::UpdateLocked(UpdateArgs args) { |
| const bool is_initial_update = lb_chand_ == nullptr; |
| ParseLbConfig(static_cast<const ParsedXdsConfig*>(args.config.get())); |
| if (balancer_name_ == nullptr) { |
| gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this); |
| return; |
| } |
| ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args); |
| priority_list_.UpdateLocked(); |
| // Update the existing fallback policy. The fallback policy config and/or the |
| // fallback addresses may be new. |
| if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); |
| // If this is the initial update, start the fallback-at-startup checks. |
| if (is_initial_update) { |
| grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; |
| Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure |
| GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, |
| grpc_combiner_scheduler(combiner())); |
| fallback_at_startup_checks_pending_ = true; |
| grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); |
| // Start watching the channel's connectivity state. If the channel |
| // goes into state TRANSIENT_FAILURE, we go into fallback mode even if |
| // the fallback timeout has not elapsed. |
| lb_chand_->StartConnectivityWatchLocked(); |
| } |
| } |
| |
| // |
| // fallback-related methods |
| // |
| |
| void XdsLb::MaybeCancelFallbackAtStartupChecks() { |
| if (!fallback_at_startup_checks_pending_) return; |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Cancelling fallback timer and LB channel connectivity " |
| "watch", |
| this); |
| grpc_timer_cancel(&lb_fallback_timer_); |
| lb_chand_->CancelConnectivityWatchLocked(); |
| fallback_at_startup_checks_pending_ = false; |
| } |
| |
| void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { |
| XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); |
| // If some fallback-at-startup check is done after the timer fires but before |
| // this callback actually runs, don't fall back. |
| if (xdslb_policy->fallback_at_startup_checks_pending_ && |
| !xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Child policy not ready after fallback timeout; " |
| "entering fallback mode", |
| xdslb_policy); |
| } |
| xdslb_policy->fallback_at_startup_checks_pending_ = false; |
| xdslb_policy->UpdateFallbackPolicyLocked(); |
| xdslb_policy->lb_chand_->CancelConnectivityWatchLocked(); |
| } |
| xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); |
| } |
| |
| void XdsLb::UpdateFallbackPolicyLocked() { |
| if (shutting_down_) return; |
| // Construct update args. |
| UpdateArgs update_args; |
| update_args.addresses = fallback_backend_addresses_; |
| update_args.config = fallback_policy_config_ == nullptr |
| ? nullptr |
| : fallback_policy_config_->Ref(); |
| update_args.args = grpc_channel_args_copy(args_); |
| // If the child policy name changes, we need to create a new child |
| // policy. When this happens, we leave child_policy_ as-is and store |
| // the new child policy in pending_child_policy_. Once the new child |
| // policy transitions into state READY, we swap it into child_policy_, |
| // replacing the original child policy. So pending_child_policy_ is |
| // non-null only between when we apply an update that changes the child |
| // policy name and when the new child reports state READY. |
| // |
| // Updates can arrive at any point during this transition. We always |
| // apply updates relative to the most recently created child policy, |
| // even if the most recent one is still in pending_child_policy_. This |
| // is true both when applying the updates to an existing child policy |
| // and when determining whether we need to create a new policy. |
| // |
| // As a result of this, there are several cases to consider here: |
| // |
| // 1. We have no existing child policy (i.e., we have started up but |
| // have not yet received a serverlist from the balancer or gone |
| // into fallback mode; in this case, both child_policy_ and |
| // pending_child_policy_ are null). In this case, we create a |
| // new child policy and store it in child_policy_. |
| // |
| // 2. We have an existing child policy and have no pending child policy |
| // from a previous update (i.e., either there has not been a |
| // previous update that changed the policy name, or we have already |
| // finished swapping in the new policy; in this case, child_policy_ |
| // is non-null but pending_child_policy_ is null). In this case: |
| // a. If child_policy_->name() equals child_policy_name, then we |
| // update the existing child policy. |
| // b. If child_policy_->name() does not equal child_policy_name, |
| // we create a new policy. The policy will be stored in |
| // pending_child_policy_ and will later be swapped into |
| // child_policy_ by the helper when the new child transitions |
| // into state READY. |
| // |
| // 3. We have an existing child policy and have a pending child policy |
| // from a previous update (i.e., a previous update set |
| // pending_child_policy_ as per case 2b above and that policy has |
| // not yet transitioned into state READY and been swapped into |
| // child_policy_; in this case, both child_policy_ and |
| // pending_child_policy_ are non-null). In this case: |
| // a. If pending_child_policy_->name() equals child_policy_name, |
| // then we update the existing pending child policy. |
| // b. If pending_child_policy->name() does not equal |
| // child_policy_name, then we create a new policy. The new |
| // policy is stored in pending_child_policy_ (replacing the one |
| // that was there before, which will be immediately shut down) |
| // and will later be swapped into child_policy_ by the helper |
| // when the new child transitions into state READY. |
| const char* fallback_policy_name = fallback_policy_config_ == nullptr |
| ? "round_robin" |
| : fallback_policy_config_->name(); |
| const bool create_policy = |
| // case 1 |
| fallback_policy_ == nullptr || |
| // case 2b |
| (pending_fallback_policy_ == nullptr && |
| strcmp(fallback_policy_->name(), fallback_policy_name) != 0) || |
| // case 3b |
| (pending_fallback_policy_ != nullptr && |
| strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0); |
| LoadBalancingPolicy* policy_to_update = nullptr; |
| if (create_policy) { |
| // Cases 1, 2b, and 3b: create a new child policy. |
| // If child_policy_ is null, we set it (case 1), else we set |
| // pending_child_policy_ (cases 2b and 3b). |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this, |
| fallback_policy_ == nullptr ? "" : "pending ", |
| fallback_policy_name); |
| } |
| auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_ |
| : pending_fallback_policy_; |
| lb_policy = |
| CreateFallbackPolicyLocked(fallback_policy_name, update_args.args); |
| policy_to_update = lb_policy.get(); |
| } else { |
| // Cases 2a and 3a: update an existing policy. |
| // If we have a pending child policy, send the update to the pending |
| // policy (case 3a), else send it to the current policy (case 2a). |
| policy_to_update = pending_fallback_policy_ != nullptr |
| ? pending_fallback_policy_.get() |
| : fallback_policy_.get(); |
| } |
| GPR_ASSERT(policy_to_update != nullptr); |
| // Update the policy. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log( |
| GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this, |
| policy_to_update == pending_fallback_policy_.get() ? "pending " : "", |
| policy_to_update); |
| } |
| policy_to_update->UpdateLocked(std::move(update_args)); |
| } |
| |
| OrphanablePtr<LoadBalancingPolicy> XdsLb::CreateFallbackPolicyLocked( |
| const char* name, const grpc_channel_args* args) { |
| FallbackHelper* helper = |
| New<FallbackHelper>(Ref(DEBUG_LOCATION, "FallbackHelper")); |
| LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.combiner = combiner(); |
| lb_policy_args.args = args; |
| lb_policy_args.channel_control_helper = |
| UniquePtr<ChannelControlHelper>(helper); |
| OrphanablePtr<LoadBalancingPolicy> lb_policy = |
| LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
| name, std::move(lb_policy_args)); |
| if (GPR_UNLIKELY(lb_policy == nullptr)) { |
| gpr_log(GPR_ERROR, "[xdslb %p] Failure creating fallback policy %s", this, |
| name); |
| return nullptr; |
| } |
| helper->set_child(lb_policy.get()); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Created new fallback policy %s (%p)", this, |
| name, lb_policy.get()); |
| } |
| // Add the xDS's interested_parties pollset_set to that of the newly created |
| // child policy. This will make the child policy progress upon activity on xDS |
| // LB, which in turn is tied to the application's call. |
| grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
| interested_parties()); |
| return lb_policy; |
| } |
| |
| void XdsLb::MaybeExitFallbackMode() { |
| if (fallback_policy_ == nullptr) return; |
| gpr_log(GPR_INFO, "[xdslb %p] Exiting fallback mode", this); |
| fallback_policy_.reset(); |
| pending_fallback_policy_.reset(); |
| } |
| |
| // |
| // XdsLb::PriorityList |
| // |
| |
| void XdsLb::PriorityList::UpdateLocked() { |
| const auto& priority_list_update = xds_policy_->priority_list_update_; |
| // 1. Remove from the priority list the priorities that are not in the update. |
| DeactivatePrioritiesLowerThan(priority_list_update.LowestPriority()); |
| // 2. Update all the existing priorities. |
| for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { |
| LocalityMap* locality_map = priorities_[priority].get(); |
| const auto* locality_map_update = priority_list_update.Find(priority); |
| // Propagate locality_map_update. |
| // TODO(juanlishen): Find a clean way to skip duplicate update for a |
| // priority. |
| locality_map->UpdateLocked(*locality_map_update); |
| } |
| // 3. Only create a new locality map if all the existing ones have failed. |
| if (priorities_.empty() || |
| !priorities_[priorities_.size() - 1]->failover_timer_callback_pending()) { |
| const uint32_t new_priority = static_cast<uint32_t>(priorities_.size()); |
| // Create a new locality map. Note that in some rare cases (e.g., the |
| // locality map reports TRANSIENT_FAILURE synchronously due to subchannel |
| // sharing), the following invocation may result in multiple locality maps |
| // to be created. |
| MaybeCreateLocalityMapLocked(new_priority); |
| } |
| } |
| |
| void XdsLb::PriorityList::ResetBackoffLocked() { |
| for (size_t i = 0; i < priorities_.size(); ++i) { |
| priorities_[i]->ResetBackoffLocked(); |
| } |
| } |
| |
| void XdsLb::PriorityList::ShutdownLocked() { priorities_.clear(); } |
| |
| void XdsLb::PriorityList::UpdateXdsPickerLocked() { |
| // If we are in fallback mode, don't generate an xds picker from localities. |
| if (xds_policy_->fallback_policy_ != nullptr) return; |
| if (current_priority() == UINT32_MAX) { |
| grpc_error* error = grpc_error_set_int( |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready locality map"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
| xds_policy_->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); |
| return; |
| } |
| priorities_[current_priority_]->UpdateXdsPickerLocked(); |
| } |
| |
| void XdsLb::PriorityList::MaybeCreateLocalityMapLocked(uint32_t priority) { |
| // Exhausted priorities in the update. |
| if (!priority_list_update().Contains(priority)) return; |
| auto new_locality_map = New<LocalityMap>( |
| xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+LocalityMap"), priority); |
| priorities_.emplace_back(OrphanablePtr<LocalityMap>(new_locality_map)); |
| new_locality_map->UpdateLocked(*priority_list_update().Find(priority)); |
| } |
| |
| void XdsLb::PriorityList::FailoverOnConnectionFailureLocked() { |
| const uint32_t failed_priority = LowestPriority(); |
| // If we're failing over from the lowest priority, report TRANSIENT_FAILURE. |
| if (failed_priority == priority_list_update().LowestPriority()) { |
| UpdateXdsPickerLocked(); |
| } |
| MaybeCreateLocalityMapLocked(failed_priority + 1); |
| } |
| |
| void XdsLb::PriorityList::FailoverOnDisconnectionLocked( |
| uint32_t failed_priority) { |
| current_priority_ = UINT32_MAX; |
| for (uint32_t next_priority = failed_priority + 1; |
| next_priority <= priority_list_update().LowestPriority(); |
| ++next_priority) { |
| if (!Contains(next_priority)) { |
| MaybeCreateLocalityMapLocked(next_priority); |
| return; |
| } |
| if (priorities_[next_priority]->MaybeReactivateLocked()) return; |
| } |
| } |
| |
| void XdsLb::PriorityList::SwitchToHigherPriorityLocked(uint32_t priority) { |
| current_priority_ = priority; |
| DeactivatePrioritiesLowerThan(current_priority_); |
| UpdateXdsPickerLocked(); |
| } |
| |
| void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) { |
| if (priorities_.empty()) return; |
| // Deactivate the locality maps from the lowest priority. |
| for (uint32_t p = LowestPriority(); p > priority; --p) { |
| if (xds_policy_->locality_retention_interval_ms_ == 0) { |
| priorities_.pop_back(); |
| } else { |
| priorities_[p]->DeactivateLocked(); |
| } |
| } |
| } |
| |
| OrphanablePtr<XdsLb::PriorityList::LocalityMap::Locality> |
| XdsLb::PriorityList::ExtractLocalityLocked( |
| const RefCountedPtr<XdsLocalityName>& name, uint32_t exclude_priority) { |
| for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { |
| if (priority == exclude_priority) continue; |
| LocalityMap* locality_map = priorities_[priority].get(); |
| auto locality = locality_map->ExtractLocalityLocked(name); |
| if (locality != nullptr) return locality; |
| } |
| return nullptr; |
| } |
| |
| // |
| // XdsLb::PriorityList::LocalityMap |
| // |
| |
| XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy, |
| uint32_t priority) |
| : xds_policy_(std::move(xds_policy)), priority_(priority) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, |
| xds_policy_.get(), priority_); |
| } |
| GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, |
| this, grpc_combiner_scheduler(xds_policy_->combiner())); |
| GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this, |
| grpc_combiner_scheduler(xds_policy_->combiner())); |
| // Start the failover timer. |
| Ref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked").release(); |
| grpc_timer_init( |
| &failover_timer_, |
| ExecCtx::Get()->Now() + xds_policy_->locality_map_failover_timeout_ms_, |
| &on_failover_timer_); |
| failover_timer_callback_pending_ = true; |
| // This is the first locality map ever created, report CONNECTING. |
| if (priority_ == 0) { |
| xds_policy_->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_CONNECTING, |
| UniquePtr<SubchannelPicker>( |
| New<QueuePicker>(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); |
| } |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::UpdateLocked( |
| const XdsPriorityListUpdate::LocalityMap& locality_map_update) { |
| if (xds_policy_->shutting_down_) return; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Start Updating priority %" PRIu32, |
| xds_policy(), priority_); |
| } |
| // Maybe reactivate the locality map in case all the active locality maps have |
| // failed. |
| MaybeReactivateLocked(); |
| // Remove (later) the localities not in locality_map_update. |
| for (auto iter = localities_.begin(); iter != localities_.end();) { |
| const auto& name = iter->first; |
| Locality* locality = iter->second.get(); |
| if (locality_map_update.Contains(name)) { |
| ++iter; |
| continue; |
| } |
| if (xds_policy()->locality_retention_interval_ms_ == 0) { |
| iter = localities_.erase(iter); |
| } else { |
| locality->DeactivateLocked(); |
| ++iter; |
| } |
| } |
| // Add or update the localities in locality_map_update. |
| for (const auto& p : locality_map_update.localities) { |
| const auto& name = p.first; |
| const auto& locality_update = p.second; |
| OrphanablePtr<Locality>& locality = localities_[name]; |
| if (locality == nullptr) { |
| // Move from another locality map if possible. |
| locality = priority_list()->ExtractLocalityLocked(name, priority_); |
| if (locality != nullptr) { |
| locality->set_locality_map( |
| Ref(DEBUG_LOCATION, "LocalityMap+Locality_move")); |
| } else { |
| locality = MakeOrphanable<Locality>( |
| Ref(DEBUG_LOCATION, "LocalityMap+Locality"), name); |
| } |
| } |
| // Keep a copy of serverlist in the update so that we can compare it |
| // with the future ones. |
| locality->UpdateLocked(locality_update.lb_weight, |
| locality_update.serverlist); |
| } |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::ResetBackoffLocked() { |
| for (auto& p : localities_) p.second->ResetBackoffLocked(); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { |
| // Construct a new xds picker which maintains a map of all locality pickers |
| // that are ready. Each locality is represented by a portion of the range |
| // proportional to its weight, such that the total range is the sum of the |
| // weights of all localities. |
| Picker::PickerList picker_list; |
| uint32_t end = 0; |
| for (const auto& p : localities_) { |
| const auto& locality_name = p.first; |
| const Locality* locality = p.second.get(); |
| // Skip the localities that are not in the latest locality map update. |
| if (!locality_map_update()->Contains(locality_name)) continue; |
| if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue; |
| end += locality->weight(); |
| picker_list.push_back(MakePair(end, locality->picker_wrapper())); |
| } |
| xds_policy()->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>( |
| xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), |
| std::move(picker_list)))); |
| } |
| |
| OrphanablePtr<XdsLb::PriorityList::LocalityMap::Locality> |
| XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked( |
| const RefCountedPtr<XdsLocalityName>& name) { |
| for (auto iter = localities_.begin(); iter != localities_.end(); ++iter) { |
| const auto& name_in_map = iter->first; |
| if (*name_in_map == *name) { |
| auto locality = std::move(iter->second); |
| localities_.erase(iter); |
| return locality; |
| } |
| } |
| return nullptr; |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { |
| // If already deactivated, don't do it again. |
| if (delayed_removal_timer_callback_pending_) return; |
| MaybeCancelFailoverTimerLocked(); |
| // Start a timer to delete the locality. |
| Ref(DEBUG_LOCATION, "LocalityMap+timer").release(); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Will remove priority %" PRIu32 " in %" PRId64 " ms.", |
| xds_policy(), priority_, |
| xds_policy()->locality_retention_interval_ms_); |
| } |
| grpc_timer_init( |
| &delayed_removal_timer_, |
| ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, |
| &on_delayed_removal_timer_); |
| delayed_removal_timer_callback_pending_ = true; |
| } |
| |
| bool XdsLb::PriorityList::LocalityMap::MaybeReactivateLocked() { |
| // Don't reactivate a priority that is not higher than the current one. |
| if (priority_ >= priority_list()->current_priority()) return false; |
| // Reactivate this priority by cancelling deletion timer. |
| if (delayed_removal_timer_callback_pending_) { |
| grpc_timer_cancel(&delayed_removal_timer_); |
| } |
| // Switch to this higher priority if it's READY. |
| if (connectivity_state_ != GRPC_CHANNEL_READY) return false; |
| priority_list()->SwitchToHigherPriorityLocked(priority_); |
| return true; |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::MaybeCancelFailoverTimerLocked() { |
| if (failover_timer_callback_pending_) grpc_timer_cancel(&failover_timer_); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Orphan() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " orphaned.", xds_policy(), |
| priority_); |
| } |
| MaybeCancelFailoverTimerLocked(); |
| if (delayed_removal_timer_callback_pending_) { |
| grpc_timer_cancel(&delayed_removal_timer_); |
| } |
| localities_.clear(); |
| Unref(DEBUG_LOCATION, "LocalityMap+Orphan"); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { |
| UpdateConnectivityStateLocked(); |
| // Ignore priorities not in priority_list_update. |
| if (!priority_list_update().Contains(priority_)) return; |
| const uint32_t current_priority = priority_list()->current_priority(); |
| // Ignore lower-than-current priorities. |
| if (priority_ > current_priority) return; |
| // Maybe update fallback state. |
| if (connectivity_state_ == GRPC_CHANNEL_READY) { |
| xds_policy_->MaybeCancelFallbackAtStartupChecks(); |
| xds_policy_->MaybeExitFallbackMode(); |
| } |
| // Update is for a higher-than-current priority. (Special case: update is for |
| // any active priority if there is no current priority.) |
| if (priority_ < current_priority) { |
| if (connectivity_state_ == GRPC_CHANNEL_READY) { |
| MaybeCancelFailoverTimerLocked(); |
| // If a higher-than-current priority becomes READY, switch to use it. |
| priority_list()->SwitchToHigherPriorityLocked(priority_); |
| } else if (connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| // If a higher-than-current priority becomes TRANSIENT_FAILURE, only |
| // handle it if it's the priority that is still in failover timeout. |
| if (failover_timer_callback_pending_) { |
| MaybeCancelFailoverTimerLocked(); |
| priority_list()->FailoverOnConnectionFailureLocked(); |
| } |
| } |
| return; |
| } |
| // Update is for current priority. |
| if (connectivity_state_ != GRPC_CHANNEL_READY) { |
| // Fail over if it's no longer READY. |
| priority_list()->FailoverOnDisconnectionLocked(priority_); |
| } |
| // At this point, one of the following things has happened to the current |
| // priority. |
| // 1. It remained the same (but received picker update from its localities). |
| // 2. It changed to a lower priority due to failover. |
| // 3. It became invalid because failover didn't yield a READY priority. |
| // In any case, update the xds picker. |
| priority_list()->UpdateXdsPickerLocked(); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { |
| size_t num_ready = 0; |
| size_t num_connecting = 0; |
| size_t num_idle = 0; |
| size_t num_transient_failures = 0; |
| for (const auto& p : localities_) { |
| const auto& locality_name = p.first; |
| const Locality* locality = p.second.get(); |
| // Skip the localities that are not in the latest locality map update. |
| if (!locality_map_update()->Contains(locality_name)) continue; |
| switch (locality->connectivity_state()) { |
| case GRPC_CHANNEL_READY: { |
| ++num_ready; |
| break; |
| } |
| case GRPC_CHANNEL_CONNECTING: { |
| ++num_connecting; |
| break; |
| } |
| case GRPC_CHANNEL_IDLE: { |
| ++num_idle; |
| break; |
| } |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
| ++num_transient_failures; |
| break; |
| } |
| default: |
| GPR_UNREACHABLE_CODE(return ); |
| } |
| } |
| if (num_ready > 0) { |
| connectivity_state_ = GRPC_CHANNEL_READY; |
| } else if (num_connecting > 0) { |
| connectivity_state_ = GRPC_CHANNEL_CONNECTING; |
| } else if (num_idle > 0) { |
| connectivity_state_ = GRPC_CHANNEL_IDLE; |
| } else { |
| connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; |
| } |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s", |
| xds_policy(), priority_, this, |
| grpc_connectivity_state_name(connectivity_state_)); |
| } |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( |
| void* arg, grpc_error* error) { |
| LocalityMap* self = static_cast<LocalityMap*>(arg); |
| self->delayed_removal_timer_callback_pending_ = false; |
| if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { |
| auto* priority_list = self->priority_list(); |
| const bool keep = self->priority_list_update().Contains(self->priority_) && |
| self->priority_ <= priority_list->current_priority(); |
| if (!keep) { |
| // This check is to make sure we always delete the locality maps from the |
| // lowest priority even if the closures of the back-to-back timers are not |
| // run in FIFO order. |
| // TODO(juanlishen): Eliminate unnecessary maintenance overhead for some |
| // deactivated locality maps when out-of-order closures are run. |
| // TODO(juanlishen): Check the timer implementation to see if this defense |
| // is necessary. |
| if (self->priority_ == priority_list->LowestPriority()) { |
| priority_list->priorities_.pop_back(); |
| } else { |
| gpr_log(GPR_ERROR, |
| "[xdslb %p] Priority %" PRIu32 |
| " is not the lowest priority (highest numeric value) but is " |
| "attempted to be deleted.", |
| self->xds_policy(), self->priority_); |
| } |
| } |
| } |
| self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( |
| void* arg, grpc_error* error) { |
| LocalityMap* self = static_cast<LocalityMap*>(arg); |
| self->failover_timer_callback_pending_ = false; |
| if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { |
| self->priority_list()->FailoverOnConnectionFailureLocked(); |
| } |
| self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); |
| } |
| |
| // |
| // XdsLb::PriorityList::LocalityMap::Locality |
| // |
| |
| XdsLb::PriorityList::LocalityMap::Locality::Locality( |
| RefCountedPtr<LocalityMap> locality_map, |
| RefCountedPtr<XdsLocalityName> name) |
| : locality_map_(std::move(locality_map)), name_(std::move(name)) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), |
| this, name_->AsHumanReadableString()); |
| } |
| GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, |
| this, grpc_combiner_scheduler(xds_policy()->combiner())); |
| } |
| |
| XdsLb::PriorityList::LocalityMap::Locality::~Locality() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: destroying locality", |
| xds_policy(), this, name_->AsHumanReadableString()); |
| } |
| locality_map_.reset(DEBUG_LOCATION, "Locality"); |
| } |
| |
| grpc_channel_args* |
| XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked( |
| const grpc_channel_args* args_in) { |
| const grpc_arg args_to_add[] = { |
| // A channel arg indicating if the target is a backend inferred from a |
| // grpclb load balancer. |
| grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), |
| 1), |
| // Inhibit client-side health checking, since the balancer does |
| // this for us. |
| grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1), |
| }; |
| return grpc_channel_args_copy_and_add(args_in, args_to_add, |
| GPR_ARRAY_SIZE(args_to_add)); |
| } |
| |
| OrphanablePtr<LoadBalancingPolicy> |
| XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( |
| const char* name, const grpc_channel_args* args) { |
| Helper* helper = New<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
| LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.combiner = xds_policy()->combiner(); |
| lb_policy_args.args = args; |
| lb_policy_args.channel_control_helper = |
| UniquePtr<ChannelControlHelper>(helper); |
| OrphanablePtr<LoadBalancingPolicy> lb_policy = |
| LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
| name, std::move(lb_policy_args)); |
| if (GPR_UNLIKELY(lb_policy == nullptr)) { |
| gpr_log(GPR_ERROR, |
| "[xdslb %p] Locality %p %s: failure creating child policy %s", |
| locality_map_.get(), this, name_->AsHumanReadableString(), name); |
| return nullptr; |
| } |
| helper->set_child(lb_policy.get()); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Locality %p %s: Created new child policy %s (%p)", |
| locality_map_.get(), this, name_->AsHumanReadableString(), name, |
| lb_policy.get()); |
| } |
| // Add the xDS's interested_parties pollset_set to that of the newly created |
| // child policy. This will make the child policy progress upon activity on xDS |
| // LB, which in turn is tied to the application's call. |
| grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
| xds_policy()->interested_parties()); |
| return lb_policy; |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( |
| uint32_t locality_weight, ServerAddressList serverlist) { |
| if (xds_policy()->shutting_down_) return; |
| // Update locality weight. |
| weight_ = locality_weight; |
| if (delayed_removal_timer_callback_pending_) { |
| grpc_timer_cancel(&delayed_removal_timer_); |
| } |
| // Construct update args. |
| UpdateArgs update_args; |
| update_args.addresses = std::move(serverlist); |
| update_args.config = xds_policy()->child_policy_config_ == nullptr |
| ? nullptr |
| : xds_policy()->child_policy_config_->Ref(); |
| update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_); |
| // If the child policy name changes, we need to create a new child |
| // policy. When this happens, we leave child_policy_ as-is and store |
| // the new child policy in pending_child_policy_. Once the new child |
| // policy transitions into state READY, we swap it into child_policy_, |
| // replacing the original child policy. So pending_child_policy_ is |
| // non-null only between when we apply an update that changes the child |
| // policy name and when the new child reports state READY. |
| // |
| // Updates can arrive at any point during this transition. We always |
| // apply updates relative to the most recently created child policy, |
| // even if the most recent one is still in pending_child_policy_. This |
| // is true both when applying the updates to an existing child policy |
| // and when determining whether we need to create a new policy. |
| // |
| // As a result of this, there are several cases to consider here: |
| // |
| // 1. We have no existing child policy (i.e., we have started up but |
| // have not yet received a serverlist from the balancer or gone |
| // into fallback mode; in this case, both child_policy_ and |
| // pending_child_policy_ are null). In this case, we create a |
| // new child policy and store it in child_policy_. |
| // |
| // 2. We have an existing child policy and have no pending child policy |
| // from a previous update (i.e., either there has not been a |
| // previous update that changed the policy name, or we have already |
| // finished swapping in the new policy; in this case, child_policy_ |
| // is non-null but pending_child_policy_ is null). In this case: |
| // a. If child_policy_->name() equals child_policy_name, then we |
| // update the existing child policy. |
| // b. If child_policy_->name() does not equal child_policy_name, |
| // we create a new policy. The policy will be stored in |
| // pending_child_policy_ and will later be swapped into |
| // child_policy_ by the helper when the new child transitions |
| // into state READY. |
| // |
| // 3. We have an existing child policy and have a pending child policy |
| // from a previous update (i.e., a previous update set |
| // pending_child_policy_ as per case 2b above and that policy has |
| // not yet transitioned into state READY and been swapped into |
| // child_policy_; in this case, both child_policy_ and |
| // pending_child_policy_ are non-null). In this case: |
| // a. If pending_child_policy_->name() equals child_policy_name, |
| // then we update the existing pending child policy. |
| // b. If pending_child_policy->name() does not equal |
| // child_policy_name, then we create a new policy. The new |
| // policy is stored in pending_child_policy_ (replacing the one |
| // that was there before, which will be immediately shut down) |
| // and will later be swapped into child_policy_ by the helper |
| // when the new child transitions into state READY. |
| // TODO(juanlishen): If the child policy is not configured via service config, |
| // use whatever algorithm is specified by the balancer. |
| const char* child_policy_name = |
| xds_policy()->child_policy_config_ == nullptr |
| ? "round_robin" |
| : xds_policy()->child_policy_config_->name(); |
| const bool create_policy = |
| // case 1 |
| child_policy_ == nullptr || |
| // case 2b |
| (pending_child_policy_ == nullptr && |
| strcmp(child_policy_->name(), child_policy_name) != 0) || |
| // case 3b |
| (pending_child_policy_ != nullptr && |
| strcmp(pending_child_policy_->name(), child_policy_name) != 0); |
| LoadBalancingPolicy* policy_to_update = nullptr; |
| if (create_policy) { |
| // Cases 1, 2b, and 3b: create a new child policy. |
| // If child_policy_ is null, we set it (case 1), else we set |
| // pending_child_policy_ (cases 2b and 3b). |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Locality %p %s: Creating new %schild policy %s", |
| locality_map_.get(), this, name_->AsHumanReadableString(), |
| child_policy_ == nullptr ? "" : "pending ", child_policy_name); |
| } |
| auto& lb_policy = |
| child_policy_ == nullptr ? child_policy_ : pending_child_policy_; |
| lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args); |
| policy_to_update = lb_policy.get(); |
| } else { |
| // Cases 2a and 3a: update an existing policy. |
| // If we have a pending child policy, send the update to the pending |
| // policy (case 3a), else send it to the current policy (case 2a). |
| policy_to_update = pending_child_policy_ != nullptr |
| ? pending_child_policy_.get() |
| : child_policy_.get(); |
| } |
| GPR_ASSERT(policy_to_update != nullptr); |
| // Update the policy. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: Updating %schild policy %p", |
| locality_map_.get(), this, name_->AsHumanReadableString(), |
| policy_to_update == pending_child_policy_.get() ? "pending " : "", |
| policy_to_update); |
| } |
| policy_to_update->UpdateLocked(std::move(update_args)); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality", |
| locality_map_.get(), this, name_->AsHumanReadableString()); |
| } |
| // Remove the child policy's interested_parties pollset_set from the |
| // xDS policy. |
| grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
| xds_policy()->interested_parties()); |
| child_policy_.reset(); |
| if (pending_child_policy_ != nullptr) { |
| grpc_pollset_set_del_pollset_set( |
| pending_child_policy_->interested_parties(), |
| xds_policy()->interested_parties()); |
| pending_child_policy_.reset(); |
| } |
| // Drop our ref to the child's picker, in case it's holding a ref to |
| // the child. |
| picker_wrapper_.reset(); |
| if (delayed_removal_timer_callback_pending_) { |
| grpc_timer_cancel(&delayed_removal_timer_); |
| } |
| shutdown_ = true; |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::ResetBackoffLocked() { |
| child_policy_->ResetBackoffLocked(); |
| if (pending_child_policy_ != nullptr) { |
| pending_child_policy_->ResetBackoffLocked(); |
| } |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::Orphan() { |
| ShutdownLocked(); |
| Unref(); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { |
| // If already deactivated, don't do that again. |
| if (weight_ == 0) return; |
| // Set the locality weight to 0 so that future xds picker won't contain this |
| // locality. |
| weight_ = 0; |
| // Start a timer to delete the locality. |
| Ref(DEBUG_LOCATION, "Locality+timer").release(); |
| grpc_timer_init( |
| &delayed_removal_timer_, |
| ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, |
| &on_delayed_removal_timer_); |
| delayed_removal_timer_callback_pending_ = true; |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( |
| void* arg, grpc_error* error) { |
| Locality* self = static_cast<Locality*>(arg); |
| self->delayed_removal_timer_callback_pending_ = false; |
| if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { |
| self->locality_map_->localities_.erase(self->name_); |
| } |
| self->Unref(DEBUG_LOCATION, "Locality+timer"); |
| } |
| |
| // |
| // XdsLb::Locality::Helper |
| // |
| |
| bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByPendingChild() |
| const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == locality_->pending_child_policy_.get(); |
| } |
| |
| bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByCurrentChild() |
| const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == locality_->child_policy_.get(); |
| } |
| |
| RefCountedPtr<SubchannelInterface> |
| XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel( |
| const grpc_channel_args& args) { |
| if (locality_->xds_policy()->shutting_down_ || |
| (!CalledByPendingChild() && !CalledByCurrentChild())) { |
| return nullptr; |
| } |
| return locality_->xds_policy()->channel_control_helper()->CreateSubchannel( |
| args); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( |
| grpc_connectivity_state state, UniquePtr<SubchannelPicker> picker) { |
| if (locality_->xds_policy()->shutting_down_) return; |
| // If this request is from the pending child policy, ignore it until |
| // it reports READY, at which point we swap it into place. |
| if (CalledByPendingChild()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p helper %p] pending child policy %p reports state=%s", |
| locality_->xds_policy(), this, |
| locality_->pending_child_policy_.get(), |
| grpc_connectivity_state_name(state)); |
| } |
| if (state != GRPC_CHANNEL_READY) return; |
| grpc_pollset_set_del_pollset_set( |
| locality_->child_policy_->interested_parties(), |
| locality_->xds_policy()->interested_parties()); |
| locality_->child_policy_ = std::move(locality_->pending_child_policy_); |
| } else if (!CalledByCurrentChild()) { |
| // This request is from an outdated child, so ignore it. |
| return; |
| } |
| GPR_ASSERT(locality_->xds_policy()->lb_chand_ != nullptr); |
| // Cache the picker and its state in the locality. |
| locality_->picker_wrapper_ = MakeRefCounted<PickerWrapper>( |
| std::move(picker), |
| locality_->xds_policy()->client_stats_.FindLocalityStats( |
| locality_->name_)); |
| locality_->connectivity_state_ = state; |
| // Notify the locality map. |
| locality_->locality_map_->OnLocalityStateUpdateLocked(); |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::Helper::RequestReresolution() { |
| if (locality_->xds_policy()->shutting_down_) return; |
| // If there is a pending child policy, ignore re-resolution requests |
| // from the current child policy (or any outdated child). |
| if (locality_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { |
| return; |
| } |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
| gpr_log(GPR_INFO, |
| "[xdslb %p] Re-resolution requested from the internal RR policy " |
| "(%p).", |
| locality_->xds_policy(), locality_->child_policy_.get()); |
| } |
| GPR_ASSERT(locality_->xds_policy()->lb_chand_ != nullptr); |
| // If we are talking to a balancer, we expect to get updated addresses |
| // from the balancer, so we can ignore the re-resolution request from |
| // the child policy. Otherwise, pass the re-resolution request up to the |
| // channel. |
| if (locality_->xds_policy()->lb_chand_->eds_calld() == nullptr || |
| !locality_->xds_policy()->lb_chand_->eds_calld()->seen_response()) { |
| locality_->xds_policy()->channel_control_helper()->RequestReresolution(); |
| } |
| } |
| |
| void XdsLb::PriorityList::LocalityMap::Locality::Helper::AddTraceEvent( |
| TraceSeverity severity, StringView message) { |
| if (locality_->xds_policy()->shutting_down_ || |
| (!CalledByPendingChild() && !CalledByCurrentChild())) { |
| return; |
| } |
| locality_->xds_policy()->channel_control_helper()->AddTraceEvent(severity, |
| message); |
| } |
| |
| // |
| // factory |
| // |
| |
| class XdsFactory : public LoadBalancingPolicyFactory { |
| public: |
| OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
| LoadBalancingPolicy::Args args) const override { |
| return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args))); |
| } |
| |
| const char* name() const override { return kXds; } |
| |
| RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
| const grpc_json* json, grpc_error** error) const override { |
| GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
| if (json == nullptr) { |
| // xds was mentioned as a policy in the deprecated loadBalancingPolicy |
| // field or in the client API. |
| *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "field:loadBalancingPolicy error:Xds Parser has required field - " |
| "balancerName. Please use loadBalancingConfig field of service " |
| "config instead."); |
| return nullptr; |
| } |
| GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0); |
| |
| InlinedVector<grpc_error*, 3> error_list; |
| const char* balancer_name = nullptr; |
| RefCountedPtr<LoadBalancingPolicy::Config> child_policy; |
| RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy; |
| for (const grpc_json* field = json->child; field != nullptr; |
| field = field->next) { |
| if (field->key == nullptr) continue; |
| if (strcmp(field->key, "balancerName") == 0) { |
| if (balancer_name != nullptr) { |
| error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "field:balancerName error:Duplicate entry")); |
| } |
| if (field->type != GRPC_JSON_STRING) { |
| error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "field:balancerName error:type should be string")); |
| continue; |
| } |
| balancer_name = field->value; |
| } else if (strcmp(field->key, "childPolicy") == 0) { |
| if (child_policy != nullptr) { |
| error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "field:childPolicy error:Duplicate entry")); |
| } |
| grpc_error* parse_error = GRPC_ERROR_NONE; |
| child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
| field, &parse_error); |
| if (child_policy == nullptr) { |
| GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
| error_list.push_back(parse_error); |
| } |
| } else if (strcmp(field->key, "fallbackPolicy") == 0) { |
| if (fallback_policy != nullptr) { |
| error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "field:fallbackPolicy error:Duplicate entry")); |
| } |
| grpc_error* parse_error = GRPC_ERROR_NONE; |
| fallback_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
| field, &parse_error); |
| if (fallback_policy == nullptr) { |
| GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
| error_list.push_back(parse_error); |
| } |
| } |
| } |
| if (error_list.empty()) { |
| return RefCountedPtr<LoadBalancingPolicy::Config>(New<ParsedXdsConfig>( |
| balancer_name, std::move(child_policy), std::move(fallback_policy))); |
| } else { |
| *error = GRPC_ERROR_CREATE_FROM_VECTOR("Xds Parser", &error_list); |
| return nullptr; |
| } |
| } |
| }; |
| |
| } // namespace |
| |
| } // namespace grpc_core |
| |
| // |
| // Plugin registration |
| // |
| |
| void grpc_lb_policy_xds_init() { |
| grpc_core::LoadBalancingPolicyRegistry::Builder:: |
| RegisterLoadBalancingPolicyFactory( |
| grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( |
| grpc_core::New<grpc_core::XdsFactory>())); |
| } |
| |
| void grpc_lb_policy_xds_shutdown() {} |