| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| |
| #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/server_address.h" |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_lb_pick_first_trace(false, "pick_first"); |
| |
| namespace { |
| |
| // |
| // pick_first LB policy |
| // |
| |
| constexpr char kPickFirst[] = "pick_first"; |
| |
| class PickFirst : public LoadBalancingPolicy { |
| public: |
| explicit PickFirst(Args args); |
| |
| const char* name() const override { return kPickFirst; } |
| |
| void UpdateLocked(UpdateArgs args) override; |
| void ExitIdleLocked() override; |
| void ResetBackoffLocked() override; |
| |
| private: |
| ~PickFirst(); |
| |
| class PickFirstSubchannelList; |
| |
| class PickFirstSubchannelData |
| : public SubchannelData<PickFirstSubchannelList, |
| PickFirstSubchannelData> { |
| public: |
| PickFirstSubchannelData( |
| SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>* |
| subchannel_list, |
| const ServerAddress& address, |
| RefCountedPtr<SubchannelInterface> subchannel) |
| : SubchannelData(subchannel_list, address, std::move(subchannel)) {} |
| |
| void ProcessConnectivityChangeLocked( |
| grpc_connectivity_state connectivity_state) override; |
| |
| // Processes the connectivity change to READY for an unselected subchannel. |
| void ProcessUnselectedReadyLocked(); |
| |
| void CheckConnectivityStateAndStartWatchingLocked(); |
| }; |
| |
| class PickFirstSubchannelList |
| : public SubchannelList<PickFirstSubchannelList, |
| PickFirstSubchannelData> { |
| public: |
| PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, |
| ServerAddressList addresses, |
| const grpc_channel_args& args) |
| : SubchannelList(policy, tracer, std::move(addresses), |
| policy->channel_control_helper(), args) { |
| // Need to maintain a ref to the LB policy as long as we maintain |
| // any references to subchannels, since the subchannels' |
| // pollset_sets will include the LB policy's pollset_set. |
| policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); |
| } |
| |
| ~PickFirstSubchannelList() { |
| PickFirst* p = static_cast<PickFirst*>(policy()); |
| p->Unref(DEBUG_LOCATION, "subchannel_list"); |
| } |
| |
| bool in_transient_failure() const { return in_transient_failure_; } |
| void set_in_transient_failure(bool in_transient_failure) { |
| in_transient_failure_ = in_transient_failure; |
| } |
| |
| private: |
| bool in_transient_failure_ = false; |
| }; |
| |
| class Picker : public SubchannelPicker { |
| public: |
| explicit Picker(RefCountedPtr<SubchannelInterface> subchannel) |
| : subchannel_(std::move(subchannel)) {} |
| |
| PickResult Pick(PickArgs /*args*/) override { |
| PickResult result; |
| result.type = PickResult::PICK_COMPLETE; |
| result.subchannel = subchannel_; |
| return result; |
| } |
| |
| private: |
| RefCountedPtr<SubchannelInterface> subchannel_; |
| }; |
| |
| void ShutdownLocked() override; |
| |
| void AttemptToConnectUsingLatestUpdateArgsLocked(); |
| |
| // Lateset update args. |
| UpdateArgs latest_update_args_; |
| // All our subchannels. |
| OrphanablePtr<PickFirstSubchannelList> subchannel_list_; |
| // Latest pending subchannel list. |
| OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_; |
| // Selected subchannel in \a subchannel_list_. |
| PickFirstSubchannelData* selected_ = nullptr; |
| // Are we in IDLE state? |
| bool idle_ = false; |
| // Are we shut down? |
| bool shutdown_ = false; |
| }; |
| |
| PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, "Pick First %p created.", this); |
| } |
| } |
| |
| PickFirst::~PickFirst() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, "Destroying Pick First %p", this); |
| } |
| GPR_ASSERT(subchannel_list_ == nullptr); |
| GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); |
| } |
| |
| void PickFirst::ShutdownLocked() { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, "Pick First %p Shutting down", this); |
| } |
| shutdown_ = true; |
| subchannel_list_.reset(); |
| latest_pending_subchannel_list_.reset(); |
| } |
| |
| void PickFirst::ExitIdleLocked() { |
| if (shutdown_) return; |
| if (idle_) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, "Pick First %p exiting idle", this); |
| } |
| idle_ = false; |
| AttemptToConnectUsingLatestUpdateArgsLocked(); |
| } |
| } |
| |
| void PickFirst::ResetBackoffLocked() { |
| if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked(); |
| if (latest_pending_subchannel_list_ != nullptr) { |
| latest_pending_subchannel_list_->ResetBackoffLocked(); |
| } |
| } |
| |
| void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { |
| // Create a subchannel list from the latest_update_args_. |
| auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>( |
| this, &grpc_lb_pick_first_trace, latest_update_args_.addresses, |
| *latest_update_args_.args); |
| // Empty update or no valid subchannels. |
| if (subchannel_list->num_subchannels() == 0) { |
| // Unsubscribe from all current subchannels. |
| subchannel_list_ = std::move(subchannel_list); // Empty list. |
| selected_ = nullptr; |
| // If not idle, put the channel in TRANSIENT_FAILURE. |
| // (If we are idle, then this will happen in ExitIdleLocked() if we |
| // haven't gotten a non-empty update by the time the application tries |
| // to start a new call.) |
| grpc_error* error = |
| grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), |
| absl::make_unique<TransientFailurePicker>(error)); |
| return; |
| } |
| // If one of the subchannels in the new list is already in state |
| // READY, then select it immediately. This can happen when the |
| // currently selected subchannel is also present in the update. It |
| // can also happen if one of the subchannels in the update is already |
| // in the global subchannel pool because it's in use by another channel. |
| for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { |
| PickFirstSubchannelData* sd = subchannel_list->subchannel(i); |
| grpc_connectivity_state state = sd->CheckConnectivityStateLocked(); |
| if (state == GRPC_CHANNEL_READY) { |
| subchannel_list_ = std::move(subchannel_list); |
| sd->StartConnectivityWatchLocked(); |
| sd->ProcessUnselectedReadyLocked(); |
| // If there was a previously pending update (which may or may |
| // not have contained the currently selected subchannel), drop |
| // it, so that it doesn't override what we've done here. |
| latest_pending_subchannel_list_.reset(); |
| return; |
| } |
| } |
| if (selected_ == nullptr) { |
| // We don't yet have a selected subchannel, so replace the current |
| // subchannel list immediately. |
| subchannel_list_ = std::move(subchannel_list); |
| // If we're not in IDLE state, start trying to connect to the first |
| // subchannel in the new list. |
| // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() |
| // here, since we've already checked the initial connectivity |
| // state of all subchannels above. |
| subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); |
| subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect(); |
| } else { |
| // We do have a selected subchannel (which means it's READY), so keep |
| // using it until one of the subchannels in the new list reports READY. |
| if (latest_pending_subchannel_list_ != nullptr) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, |
| "Pick First %p Shutting down latest pending subchannel list " |
| "%p, about to be replaced by newer latest %p", |
| this, latest_pending_subchannel_list_.get(), |
| subchannel_list.get()); |
| } |
| } |
| latest_pending_subchannel_list_ = std::move(subchannel_list); |
| // If we're not in IDLE state, start trying to connect to the first |
| // subchannel in the new list. |
| // Note: No need to use CheckConnectivityStateAndStartWatchingLocked() |
| // here, since we've already checked the initial connectivity |
| // state of all subchannels above. |
| latest_pending_subchannel_list_->subchannel(0) |
| ->StartConnectivityWatchLocked(); |
| latest_pending_subchannel_list_->subchannel(0) |
| ->subchannel() |
| ->AttemptToConnect(); |
| } |
| } |
| |
| void PickFirst::UpdateLocked(UpdateArgs args) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, |
| "Pick First %p received update with %" PRIuPTR " addresses", this, |
| args.addresses.size()); |
| } |
| // Update the latest_update_args_ |
| grpc_arg new_arg = grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1); |
| const grpc_channel_args* new_args = |
| grpc_channel_args_copy_and_add(args.args, &new_arg, 1); |
| GPR_SWAP(const grpc_channel_args*, new_args, args.args); |
| grpc_channel_args_destroy(new_args); |
| latest_update_args_ = std::move(args); |
| // If we are not in idle, start connection attempt immediately. |
| // Otherwise, we defer the attempt into ExitIdleLocked(). |
| if (!idle_) { |
| AttemptToConnectUsingLatestUpdateArgsLocked(); |
| } |
| } |
| |
| void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( |
| grpc_connectivity_state connectivity_state) { |
| PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); |
| // The notification must be for a subchannel in either the current or |
| // latest pending subchannel lists. |
| GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || |
| subchannel_list() == p->latest_pending_subchannel_list_.get()); |
| GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN); |
| // Handle updates for the currently selected subchannel. |
| if (p->selected_ == this) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, |
| "Pick First %p selected subchannel connectivity changed to %s", p, |
| ConnectivityStateName(connectivity_state)); |
| } |
| // If the new state is anything other than READY and there is a |
| // pending update, switch to the pending update. |
| if (connectivity_state != GRPC_CHANNEL_READY && |
| p->latest_pending_subchannel_list_ != nullptr) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, |
| "Pick First %p promoting pending subchannel list %p to " |
| "replace %p", |
| p, p->latest_pending_subchannel_list_.get(), |
| p->subchannel_list_.get()); |
| } |
| p->selected_ = nullptr; |
| CancelConnectivityWatchLocked( |
| "selected subchannel failed; switching to pending update"); |
| p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
| // Set our state to that of the pending subchannel list. |
| if (p->subchannel_list_->in_transient_failure()) { |
| grpc_error* error = grpc_error_set_int( |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "selected subchannel failed; switching to pending update"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), |
| absl::make_unique<TransientFailurePicker>(error)); |
| } else { |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_CONNECTING, absl::Status(), |
| absl::make_unique<QueuePicker>( |
| p->Ref(DEBUG_LOCATION, "QueuePicker"))); |
| } |
| } else { |
| if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
| // If the selected subchannel goes bad, request a re-resolution. We |
| // also set the channel state to IDLE. The reason is that if the new |
| // state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want |
| // to connect to the re-resolved backends until we leave IDLE state. |
| // TODO(qianchengz): We may want to request re-resolution in |
| // ExitIdleLocked(). |
| p->idle_ = true; |
| p->channel_control_helper()->RequestReresolution(); |
| p->selected_ = nullptr; |
| p->subchannel_list_.reset(); |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_IDLE, absl::Status(), |
| absl::make_unique<QueuePicker>( |
| p->Ref(DEBUG_LOCATION, "QueuePicker"))); |
| } else { |
| // This is unlikely but can happen when a subchannel has been asked |
| // to reconnect by a different channel and this channel has dropped |
| // some connectivity state notifications. |
| if (connectivity_state == GRPC_CHANNEL_READY) { |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_READY, absl::Status(), |
| absl::make_unique<Picker>(subchannel()->Ref())); |
| } else { // CONNECTING |
| p->channel_control_helper()->UpdateState( |
| connectivity_state, absl::Status(), |
| absl::make_unique<QueuePicker>( |
| p->Ref(DEBUG_LOCATION, "QueuePicker"))); |
| } |
| } |
| } |
| return; |
| } |
| // If we get here, there are two possible cases: |
| // 1. We do not currently have a selected subchannel, and the update is |
| // for a subchannel in p->subchannel_list_ that we're trying to |
| // connect to. The goal here is to find a subchannel that we can |
| // select. |
| // 2. We do currently have a selected subchannel, and the update is |
| // for a subchannel in p->latest_pending_subchannel_list_. The |
| // goal here is to find a subchannel from the update that we can |
| // select in place of the current one. |
| subchannel_list()->set_in_transient_failure(false); |
| switch (connectivity_state) { |
| case GRPC_CHANNEL_READY: { |
| ProcessUnselectedReadyLocked(); |
| break; |
| } |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
| CancelConnectivityWatchLocked("connection attempt failed"); |
| PickFirstSubchannelData* sd = this; |
| size_t next_index = |
| (sd->Index() + 1) % subchannel_list()->num_subchannels(); |
| sd = subchannel_list()->subchannel(next_index); |
| // If we're tried all subchannels, set state to TRANSIENT_FAILURE. |
| if (sd->Index() == 0) { |
| // Re-resolve if this is the most recent subchannel list. |
| if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr |
| ? p->latest_pending_subchannel_list_.get() |
| : p->subchannel_list_.get())) { |
| p->channel_control_helper()->RequestReresolution(); |
| } |
| subchannel_list()->set_in_transient_failure(true); |
| // Only report new state in case 1. |
| if (subchannel_list() == p->subchannel_list_.get()) { |
| grpc_error* error = grpc_error_set_int( |
| GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
| "failed to connect to all addresses"), |
| GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), |
| absl::make_unique<TransientFailurePicker>(error)); |
| } |
| } |
| sd->CheckConnectivityStateAndStartWatchingLocked(); |
| break; |
| } |
| case GRPC_CHANNEL_CONNECTING: |
| case GRPC_CHANNEL_IDLE: { |
| // Only update connectivity state in case 1. |
| if (subchannel_list() == p->subchannel_list_.get()) { |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_CONNECTING, absl::Status(), |
| absl::make_unique<QueuePicker>( |
| p->Ref(DEBUG_LOCATION, "QueuePicker"))); |
| } |
| break; |
| } |
| case GRPC_CHANNEL_SHUTDOWN: |
| GPR_UNREACHABLE_CODE(break); |
| } |
| } |
| |
| void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { |
| PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); |
| // If we get here, there are two possible cases: |
| // 1. We do not currently have a selected subchannel, and the update is |
| // for a subchannel in p->subchannel_list_ that we're trying to |
| // connect to. The goal here is to find a subchannel that we can |
| // select. |
| // 2. We do currently have a selected subchannel, and the update is |
| // for a subchannel in p->latest_pending_subchannel_list_. The |
| // goal here is to find a subchannel from the update that we can |
| // select in place of the current one. |
| GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || |
| subchannel_list() == p->latest_pending_subchannel_list_.get()); |
| // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_. |
| if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, |
| "Pick First %p promoting pending subchannel list %p to " |
| "replace %p", |
| p, p->latest_pending_subchannel_list_.get(), |
| p->subchannel_list_.get()); |
| } |
| p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); |
| } |
| // Cases 1 and 2. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { |
| gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); |
| } |
| p->selected_ = this; |
| p->channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_READY, absl::Status(), |
| absl::make_unique<Picker>(subchannel()->Ref())); |
| for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) { |
| if (i != Index()) { |
| subchannel_list()->subchannel(i)->ShutdownLocked(); |
| } |
| } |
| } |
| |
| void PickFirst::PickFirstSubchannelData:: |
| CheckConnectivityStateAndStartWatchingLocked() { |
| PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy()); |
| // Check current state. |
| grpc_connectivity_state current_state = CheckConnectivityStateLocked(); |
| // Start watch. |
| StartConnectivityWatchLocked(); |
| // If current state is READY, select the subchannel now, since we started |
| // watching from this state and will not get a notification of it |
| // transitioning into this state. |
| // If the current state is not READY, attempt to connect. |
| if (current_state == GRPC_CHANNEL_READY) { |
| if (p->selected_ != this) ProcessUnselectedReadyLocked(); |
| } else { |
| subchannel()->AttemptToConnect(); |
| } |
| } |
| |
| class PickFirstConfig : public LoadBalancingPolicy::Config { |
| public: |
| const char* name() const override { return kPickFirst; } |
| }; |
| |
| // |
| // factory |
| // |
| |
| class PickFirstFactory : public LoadBalancingPolicyFactory { |
| public: |
| OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
| LoadBalancingPolicy::Args args) const override { |
| return MakeOrphanable<PickFirst>(std::move(args)); |
| } |
| |
| const char* name() const override { return kPickFirst; } |
| |
| RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
| const Json& json, grpc_error** /*error*/) const override { |
| return MakeRefCounted<PickFirstConfig>(); |
| } |
| }; |
| |
| } // namespace |
| |
| } // namespace grpc_core |
| |
| void grpc_lb_policy_pick_first_init() { |
| grpc_core::LoadBalancingPolicyRegistry::Builder:: |
| RegisterLoadBalancingPolicyFactory( |
| absl::make_unique<grpc_core::PickFirstFactory>()); |
| } |
| |
| void grpc_lb_policy_pick_first_shutdown() {} |