| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| /** Round Robin Policy. |
| * |
| * Before every pick, the \a get_next_ready_subchannel_index_locked function |
| * returns the p->subchannel_list->subchannels index for next subchannel, |
| * respecting the relative order of the addresses provided upon creation or |
| * updates. Note however that updates will start picking from the beginning of |
| * the updated list. */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| |
| #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/debug/trace.h" |
| #include "src/core/lib/gprpp/mutex_lock.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); |
| |
| namespace { |
| |
| // |
| // round_robin LB policy |
| // |
| |
| constexpr char kRoundRobin[] = "round_robin"; |
| |
| class RoundRobin : public LoadBalancingPolicy { |
| public: |
| explicit RoundRobin(Args args); |
| |
| const char* name() const override { return kRoundRobin; } |
| |
| void UpdateLocked(const grpc_channel_args& args, |
| grpc_json* lb_config) override; |
| void ResetBackoffLocked() override; |
| void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, |
| channelz::ChildRefsList* ignored) override; |
| |
| private: |
| ~RoundRobin(); |
| |
| // Forward declaration. |
| class RoundRobinSubchannelList; |
| |
| // Data for a particular subchannel in a subchannel list. |
| // This subclass adds the following functionality: |
| // - Tracks the previous connectivity state of the subchannel, so that |
| // we know how many subchannels are in each state. |
| class RoundRobinSubchannelData |
| : public SubchannelData<RoundRobinSubchannelList, |
| RoundRobinSubchannelData> { |
| public: |
| RoundRobinSubchannelData( |
| SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>* |
| subchannel_list, |
| const ServerAddress& address, Subchannel* subchannel, |
| grpc_combiner* combiner) |
| : SubchannelData(subchannel_list, address, subchannel, combiner) {} |
| |
| grpc_connectivity_state connectivity_state() const { |
| return last_connectivity_state_; |
| } |
| |
| void UpdateConnectivityStateLocked( |
| grpc_connectivity_state connectivity_state, grpc_error* error); |
| |
| private: |
| void ProcessConnectivityChangeLocked( |
| grpc_connectivity_state connectivity_state, grpc_error* error) override; |
| |
| grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; |
| }; |
| |
| // A list of subchannels. |
| class RoundRobinSubchannelList |
| : public SubchannelList<RoundRobinSubchannelList, |
| RoundRobinSubchannelData> { |
| public: |
| RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer, |
| const ServerAddressList& addresses, |
| grpc_combiner* combiner, |
| const grpc_channel_args& args) |
| : SubchannelList(policy, tracer, addresses, combiner, |
| policy->channel_control_helper(), args) { |
| // Need to maintain a ref to the LB policy as long as we maintain |
| // any references to subchannels, since the subchannels' |
| // pollset_sets will include the LB policy's pollset_set. |
| policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); |
| } |
| |
| ~RoundRobinSubchannelList() { |
| GRPC_ERROR_UNREF(last_transient_failure_error_); |
| RoundRobin* p = static_cast<RoundRobin*>(policy()); |
| p->Unref(DEBUG_LOCATION, "subchannel_list"); |
| } |
| |
| // Starts watching the subchannels in this list. |
| void StartWatchingLocked(); |
| |
| // Updates the counters of subchannels in each state when a |
| // subchannel transitions from old_state to new_state. |
| // transient_failure_error is the error that is reported when |
| // new_state is TRANSIENT_FAILURE. |
| void UpdateStateCountersLocked(grpc_connectivity_state old_state, |
| grpc_connectivity_state new_state, |
| grpc_error* transient_failure_error); |
| |
| // If this subchannel list is the RR policy's current subchannel |
| // list, updates the RR policy's connectivity state based on the |
| // subchannel list's state counters. |
| void MaybeUpdateRoundRobinConnectivityStateLocked(); |
| |
| // Updates the RR policy's overall state based on the counters of |
| // subchannels in each state. |
| void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); |
| |
| private: |
| size_t num_ready_ = 0; |
| size_t num_connecting_ = 0; |
| size_t num_transient_failure_ = 0; |
| grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE; |
| }; |
| |
| class Picker : public SubchannelPicker { |
| public: |
| Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list); |
| |
| PickResult Pick(PickState* pick, grpc_error** error) override; |
| |
| private: |
| // Using pointer value only, no ref held -- do not dereference! |
| RoundRobin* parent_; |
| |
| size_t last_picked_index_; |
| InlinedVector<RefCountedPtr<ConnectedSubchannel>, 10> subchannels_; |
| }; |
| |
| // Helper class to ensure that any function that modifies the child refs |
| // data structures will update the channelz snapshot data structures before |
| // returning. |
| class AutoChildRefsUpdater { |
| public: |
| explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {} |
| ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); } |
| |
| private: |
| RoundRobin* rr_; |
| }; |
| |
| void ShutdownLocked() override; |
| |
| void UpdateChildRefsLocked(); |
| |
| /** list of subchannels */ |
| OrphanablePtr<RoundRobinSubchannelList> subchannel_list_; |
| /** Latest version of the subchannel list. |
| * Subchannel connectivity callbacks will only promote updated subchannel |
| * lists if they equal \a latest_pending_subchannel_list. In other words, |
| * racing callbacks that reference outdated subchannel lists won't perform any |
| * update. */ |
| OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_; |
| /** are we shutting down? */ |
| bool shutdown_ = false; |
| /// Lock and data used to capture snapshots of this channel's child |
| /// channels and subchannels. This data is consumed by channelz. |
| gpr_mu child_refs_mu_; |
| channelz::ChildRefsList child_subchannels_; |
| channelz::ChildRefsList child_channels_; |
| }; |
| |
| // |
| // RoundRobin::Picker |
| // |
| |
| RoundRobin::Picker::Picker(RoundRobin* parent, |
| RoundRobinSubchannelList* subchannel_list) |
| : parent_(parent) { |
| for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { |
| auto* connected_subchannel = |
| subchannel_list->subchannel(i)->connected_subchannel(); |
| if (connected_subchannel != nullptr) { |
| subchannels_.push_back(connected_subchannel->Ref()); |
| } |
| } |
| // For discussion on why we generate a random starting index for |
| // the picker, see https://github.com/grpc/grpc-go/issues/2580. |
| // TODO(roth): rand(3) is not thread-safe. This should be replaced with |
| // something better as part of https://github.com/grpc/grpc/issues/17891. |
| last_picked_index_ = rand() % subchannels_.size(); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[RR %p picker %p] created picker from subchannel_list=%p " |
| "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR, |
| parent_, this, subchannel_list, subchannels_.size(), |
| last_picked_index_); |
| } |
| } |
| |
| RoundRobin::Picker::PickResult RoundRobin::Picker::Pick(PickState* pick, |
| grpc_error** error) { |
| last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[RR %p picker %p] returning index %" PRIuPTR |
| ", connected_subchannel=%p", |
| parent_, this, last_picked_index_, |
| subchannels_[last_picked_index_].get()); |
| } |
| pick->connected_subchannel = subchannels_[last_picked_index_]; |
| return PICK_COMPLETE; |
| } |
| |
| // |
| // RoundRobin |
| // |
| |
| RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { |
| gpr_mu_init(&child_refs_mu_); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, "[RR %p] Created", this); |
| } |
| } |
| |
| RoundRobin::~RoundRobin() { |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); |
| } |
| gpr_mu_destroy(&child_refs_mu_); |
| GPR_ASSERT(subchannel_list_ == nullptr); |
| GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); |
| } |
| |
| void RoundRobin::ShutdownLocked() { |
| AutoChildRefsUpdater guard(this); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, "[RR %p] Shutting down", this); |
| } |
| shutdown_ = true; |
| subchannel_list_.reset(); |
| latest_pending_subchannel_list_.reset(); |
| } |
| |
| void RoundRobin::ResetBackoffLocked() { |
| subchannel_list_->ResetBackoffLocked(); |
| if (latest_pending_subchannel_list_ != nullptr) { |
| latest_pending_subchannel_list_->ResetBackoffLocked(); |
| } |
| } |
| |
| void RoundRobin::FillChildRefsForChannelz( |
| channelz::ChildRefsList* child_subchannels_to_fill, |
| channelz::ChildRefsList* ignored) { |
| MutexLock lock(&child_refs_mu_); |
| for (size_t i = 0; i < child_subchannels_.size(); ++i) { |
| // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might |
| // have to implement lightweight set. For now, we don't care about |
| // performance when channelz requests are made. |
| bool found = false; |
| for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) { |
| if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| child_subchannels_to_fill->push_back(child_subchannels_[i]); |
| } |
| } |
| } |
| |
| void RoundRobin::UpdateChildRefsLocked() { |
| channelz::ChildRefsList cs; |
| if (subchannel_list_ != nullptr) { |
| subchannel_list_->PopulateChildRefsList(&cs); |
| } |
| if (latest_pending_subchannel_list_ != nullptr) { |
| latest_pending_subchannel_list_->PopulateChildRefsList(&cs); |
| } |
| // atomically update the data that channelz will actually be looking at. |
| MutexLock lock(&child_refs_mu_); |
| child_subchannels_ = std::move(cs); |
| } |
| |
| void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { |
| if (num_subchannels() == 0) return; |
| // Check current state of each subchannel synchronously, since any |
| // subchannel already used by some other channel may have a non-IDLE |
| // state. |
| for (size_t i = 0; i < num_subchannels(); ++i) { |
| grpc_error* error = GRPC_ERROR_NONE; |
| grpc_connectivity_state state = |
| subchannel(i)->CheckConnectivityStateLocked(&error); |
| if (state != GRPC_CHANNEL_IDLE) { |
| subchannel(i)->UpdateConnectivityStateLocked(state, error); |
| } |
| } |
| // Start connectivity watch for each subchannel. |
| for (size_t i = 0; i < num_subchannels(); i++) { |
| if (subchannel(i)->subchannel() != nullptr) { |
| subchannel(i)->StartConnectivityWatchLocked(); |
| } |
| } |
| // Now set the LB policy's state based on the subchannels' states. |
| UpdateRoundRobinStateFromSubchannelStateCountsLocked(); |
| } |
| |
| void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( |
| grpc_connectivity_state old_state, grpc_connectivity_state new_state, |
| grpc_error* transient_failure_error) { |
| GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); |
| GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); |
| if (old_state == GRPC_CHANNEL_READY) { |
| GPR_ASSERT(num_ready_ > 0); |
| --num_ready_; |
| } else if (old_state == GRPC_CHANNEL_CONNECTING) { |
| GPR_ASSERT(num_connecting_ > 0); |
| --num_connecting_; |
| } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| GPR_ASSERT(num_transient_failure_ > 0); |
| --num_transient_failure_; |
| } |
| if (new_state == GRPC_CHANNEL_READY) { |
| ++num_ready_; |
| } else if (new_state == GRPC_CHANNEL_CONNECTING) { |
| ++num_connecting_; |
| } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| ++num_transient_failure_; |
| } |
| GRPC_ERROR_UNREF(last_transient_failure_error_); |
| last_transient_failure_error_ = transient_failure_error; |
| } |
| |
| // Sets the RR policy's connectivity state and generates a new picker based |
| // on the current subchannel list. |
| void RoundRobin::RoundRobinSubchannelList:: |
| MaybeUpdateRoundRobinConnectivityStateLocked() { |
| RoundRobin* p = static_cast<RoundRobin*>(policy()); |
| // Only set connectivity state if this is the current subchannel list. |
| if (p->subchannel_list_.get() != this) return; |
| /* In priority order. The first rule to match terminates the search (ie, if we |
| * are on rule n, all previous rules were unfulfilled). |
| * |
| * 1) RULE: ANY subchannel is READY => policy is READY. |
| * CHECK: subchannel_list->num_ready > 0. |
| * |
| * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. |
| * CHECK: sd->curr_connectivity_state == CONNECTING. |
| * |
| * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is |
| * TRANSIENT_FAILURE. |
| * CHECK: subchannel_list->num_transient_failures == |
| * subchannel_list->num_subchannels. |
| */ |
| if (num_ready_ > 0) { |
| /* 1) READY */ |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_READY, GRPC_ERROR_NONE, |
| UniquePtr<SubchannelPicker>(New<Picker>(p, this))); |
| } else if (num_connecting_ > 0) { |
| /* 2) CONNECTING */ |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, |
| UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref()))); |
| } else if (num_transient_failure_ == num_subchannels()) { |
| /* 3) TRANSIENT_FAILURE */ |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| GRPC_ERROR_REF(last_transient_failure_error_), |
| UniquePtr<SubchannelPicker>(New<TransientFailurePicker>( |
| GRPC_ERROR_REF(last_transient_failure_error_)))); |
| } |
| } |
| |
| void RoundRobin::RoundRobinSubchannelList:: |
| UpdateRoundRobinStateFromSubchannelStateCountsLocked() { |
| RoundRobin* p = static_cast<RoundRobin*>(policy()); |
| AutoChildRefsUpdater guard(p); |
| if (num_ready_ > 0) { |
| if (p->subchannel_list_.get() != this) { |
| // Promote this list to p->subchannel_list_. |
| // This list must be p->latest_pending_subchannel_list_, because |
| // any previous update would have been shut down already and |
| // therefore we would not be receiving a notification for them. |
| GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); |
| GPR_ASSERT(!shutting_down()); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| const size_t old_num_subchannels = |
| p->subchannel_list_ != nullptr |
| ? p->subchannel_list_->num_subchannels() |
| : 0; |
| gpr_log(GPR_INFO, |
| "[RR %p] phasing out subchannel list %p (size %" PRIuPTR |
| ") in favor of %p (size %" PRIuPTR ")", |
| p, p->subchannel_list_.get(), old_num_subchannels, this, |
| num_subchannels()); |
| } |
| p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
| } |
| } |
| // Update the RR policy's connectivity state if needed. |
| MaybeUpdateRoundRobinConnectivityStateLocked(); |
| } |
| |
| void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( |
| grpc_connectivity_state connectivity_state, grpc_error* error) { |
| RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log( |
| GPR_INFO, |
| "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " |
| "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", |
| p, subchannel(), subchannel_list(), Index(), |
| subchannel_list()->num_subchannels(), |
| grpc_connectivity_state_name(last_connectivity_state_), |
| grpc_connectivity_state_name(connectivity_state)); |
| } |
| subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, |
| connectivity_state, error); |
| last_connectivity_state_ = connectivity_state; |
| } |
| |
| void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( |
| grpc_connectivity_state connectivity_state, grpc_error* error) { |
| RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy()); |
| GPR_ASSERT(subchannel() != nullptr); |
| // If the new state is TRANSIENT_FAILURE, re-resolve. |
| // Only do this if we've started watching, not at startup time. |
| // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE |
| // when the subchannel list was created, we'd wind up in a constant |
| // loop of re-resolution. |
| if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " |
| "Requesting re-resolution", |
| p, subchannel()); |
| } |
| p->channel_control_helper()->RequestReresolution(); |
| } |
| // Renew connectivity watch. |
| RenewConnectivityWatchLocked(); |
| // Update state counters. |
| UpdateConnectivityStateLocked(connectivity_state, error); |
| // Update overall state and renew notification. |
| subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); |
| } |
| |
| void RoundRobin::UpdateLocked(const grpc_channel_args& args, |
| grpc_json* lb_config) { |
| AutoChildRefsUpdater guard(this); |
| const ServerAddressList* addresses = FindServerAddressListChannelArg(&args); |
| if (addresses == nullptr) { |
| gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); |
| // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. |
| // Otherwise, keep using the current subchannel list (ignore this update). |
| if (subchannel_list_ == nullptr) { |
| grpc_error* error = |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"); |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), |
| UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); |
| } |
| return; |
| } |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", |
| this, addresses->size()); |
| } |
| // Replace latest_pending_subchannel_list_. |
| if (latest_pending_subchannel_list_ != nullptr) { |
| if (grpc_lb_round_robin_trace.enabled()) { |
| gpr_log(GPR_INFO, |
| "[RR %p] Shutting down previous pending subchannel list %p", this, |
| latest_pending_subchannel_list_.get()); |
| } |
| } |
| latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( |
| this, &grpc_lb_round_robin_trace, *addresses, combiner(), args); |
| if (latest_pending_subchannel_list_->num_subchannels() == 0) { |
| // If the new list is empty, immediately promote the new list to the |
| // current list and transition to TRANSIENT_FAILURE. |
| grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"); |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), |
| UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error))); |
| subchannel_list_ = std::move(latest_pending_subchannel_list_); |
| } else if (subchannel_list_ == nullptr) { |
| // If there is no current list, immediately promote the new list to |
| // the current list and start watching it. |
| subchannel_list_ = std::move(latest_pending_subchannel_list_); |
| subchannel_list_->StartWatchingLocked(); |
| } else { |
| // Start watching the pending list. It will get swapped into the |
| // current list when it reports READY. |
| latest_pending_subchannel_list_->StartWatchingLocked(); |
| } |
| } |
| |
| // |
| // factory |
| // |
| |
| class RoundRobinFactory : public LoadBalancingPolicyFactory { |
| public: |
| OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
| LoadBalancingPolicy::Args args) const override { |
| return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(std::move(args))); |
| } |
| |
| const char* name() const override { return kRoundRobin; } |
| }; |
| |
| } // namespace |
| |
| } // namespace grpc_core |
| |
| void grpc_lb_policy_round_robin_init() { |
| grpc_core::LoadBalancingPolicyRegistry::Builder:: |
| RegisterLoadBalancingPolicyFactory( |
| grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( |
| grpc_core::New<grpc_core::RoundRobinFactory>())); |
| } |
| |
| void grpc_lb_policy_round_robin_shutdown() {} |