| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/filters/client_channel/resolving_lb_policy.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <stdbool.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/string_util.h> |
| #include <grpc/support/sync.h> |
| |
| #include "src/core/ext/filters/client_channel/backup_poller.h" |
| #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" |
| #include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
| #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
| #include "src/core/ext/filters/client_channel/resolver_registry.h" |
| #include "src/core/ext/filters/client_channel/retry_throttle.h" |
| #include "src/core/ext/filters/client_channel/server_address.h" |
| #include "src/core/ext/filters/client_channel/service_config.h" |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| #include "src/core/ext/filters/deadline/deadline_filter.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/connected_channel.h" |
| #include "src/core/lib/channel/status_util.h" |
| #include "src/core/lib/gpr/string.h" |
| #include "src/core/lib/gprpp/inlined_vector.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/iomgr.h" |
| #include "src/core/lib/iomgr/polling_entity.h" |
| #include "src/core/lib/profiling/timers.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/slice/slice_string_helpers.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/metadata.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/static_metadata.h" |
| #include "src/core/lib/transport/status_metadata.h" |
| |
| namespace grpc_core { |
| |
| // |
| // ResolvingLoadBalancingPolicy::ResolverResultHandler |
| // |
| |
| class ResolvingLoadBalancingPolicy::ResolverResultHandler |
| : public Resolver::ResultHandler { |
| public: |
| explicit ResolverResultHandler( |
| RefCountedPtr<ResolvingLoadBalancingPolicy> parent) |
| : parent_(std::move(parent)) {} |
| |
| ~ResolverResultHandler() { |
| if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete", |
| parent_.get()); |
| } |
| } |
| |
| void ReturnResult(Resolver::Result result) override { |
| parent_->OnResolverResultChangedLocked(std::move(result)); |
| } |
| |
| void ReturnError(grpc_error* error) override { |
| parent_->OnResolverError(error); |
| } |
| |
| private: |
| RefCountedPtr<ResolvingLoadBalancingPolicy> parent_; |
| }; |
| |
| // |
| // ResolvingLoadBalancingPolicy::ResolvingControlHelper |
| // |
| |
| class ResolvingLoadBalancingPolicy::ResolvingControlHelper |
| : public LoadBalancingPolicy::ChannelControlHelper { |
| public: |
| explicit ResolvingControlHelper( |
| RefCountedPtr<ResolvingLoadBalancingPolicy> parent) |
| : parent_(std::move(parent)) {} |
| |
| RefCountedPtr<SubchannelInterface> CreateSubchannel( |
| const grpc_channel_args& args) override { |
| if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. |
| if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; |
| return parent_->channel_control_helper()->CreateSubchannel(args); |
| } |
| |
| void UpdateState(grpc_connectivity_state state, |
| UniquePtr<SubchannelPicker> picker) override { |
| if (parent_->resolver_ == nullptr) return; // Shutting down. |
| // If this request is from the pending child policy, ignore it until |
| // it reports READY, at which point we swap it into place. |
| if (CalledByPendingChild()) { |
| if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { |
| gpr_log(GPR_INFO, |
| "resolving_lb=%p helper=%p: pending child policy %p reports " |
| "state=%s", |
| parent_.get(), this, child_, |
| grpc_connectivity_state_name(state)); |
| } |
| if (state != GRPC_CHANNEL_READY) return; |
| grpc_pollset_set_del_pollset_set( |
| parent_->lb_policy_->interested_parties(), |
| parent_->interested_parties()); |
| parent_->lb_policy_ = std::move(parent_->pending_lb_policy_); |
| } else if (!CalledByCurrentChild()) { |
| // This request is from an outdated child, so ignore it. |
| return; |
| } |
| parent_->channel_control_helper()->UpdateState(state, std::move(picker)); |
| } |
| |
| void RequestReresolution() override { |
| // If there is a pending child policy, ignore re-resolution requests |
| // from the current child policy (or any outdated child). |
| if (parent_->pending_lb_policy_ != nullptr && !CalledByPendingChild()) { |
| return; |
| } |
| if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: started name re-resolving", |
| parent_.get()); |
| } |
| if (parent_->resolver_ != nullptr) { |
| parent_->resolver_->RequestReresolutionLocked(); |
| } |
| } |
| |
| void AddTraceEvent(TraceSeverity severity, StringView message) override {} |
| |
| void set_child(LoadBalancingPolicy* child) { child_ = child; } |
| |
| private: |
| bool CalledByPendingChild() const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == parent_->pending_lb_policy_.get(); |
| } |
| |
| bool CalledByCurrentChild() const { |
| GPR_ASSERT(child_ != nullptr); |
| return child_ == parent_->lb_policy_.get(); |
| }; |
| |
| RefCountedPtr<ResolvingLoadBalancingPolicy> parent_; |
| LoadBalancingPolicy* child_ = nullptr; |
| }; |
| |
| // |
| // ResolvingLoadBalancingPolicy |
| // |
| |
| ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy( |
| Args args, TraceFlag* tracer, UniquePtr<char> target_uri, |
| ProcessResolverResultCallback process_resolver_result, |
| void* process_resolver_result_user_data) |
| : LoadBalancingPolicy(std::move(args)), |
| tracer_(tracer), |
| target_uri_(std::move(target_uri)), |
| process_resolver_result_(process_resolver_result), |
| process_resolver_result_user_data_(process_resolver_result_user_data) { |
| GPR_ASSERT(process_resolver_result != nullptr); |
| resolver_ = ResolverRegistry::CreateResolver( |
| target_uri_.get(), args.args, interested_parties(), combiner(), |
| UniquePtr<Resolver::ResultHandler>(New<ResolverResultHandler>(Ref()))); |
| // Since the validity of args has been checked when create the channel, |
| // CreateResolver() must return a non-null result. |
| GPR_ASSERT(resolver_ != nullptr); |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this); |
| } |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_CONNECTING, |
| UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref()))); |
| resolver_->StartLocked(); |
| } |
| |
| ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() { |
| GPR_ASSERT(resolver_ == nullptr); |
| GPR_ASSERT(lb_policy_ == nullptr); |
| } |
| |
| void ResolvingLoadBalancingPolicy::ShutdownLocked() { |
| if (resolver_ != nullptr) { |
| resolver_.reset(); |
| if (lb_policy_ != nullptr) { |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: shutting down lb_policy=%p", this, |
| lb_policy_.get()); |
| } |
| grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(), |
| interested_parties()); |
| lb_policy_.reset(); |
| } |
| if (pending_lb_policy_ != nullptr) { |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: shutting down pending lb_policy=%p", |
| this, pending_lb_policy_.get()); |
| } |
| grpc_pollset_set_del_pollset_set(pending_lb_policy_->interested_parties(), |
| interested_parties()); |
| pending_lb_policy_.reset(); |
| } |
| } |
| } |
| |
| void ResolvingLoadBalancingPolicy::ExitIdleLocked() { |
| if (lb_policy_ != nullptr) { |
| lb_policy_->ExitIdleLocked(); |
| if (pending_lb_policy_ != nullptr) pending_lb_policy_->ExitIdleLocked(); |
| } |
| } |
| |
| void ResolvingLoadBalancingPolicy::ResetBackoffLocked() { |
| if (resolver_ != nullptr) { |
| resolver_->ResetBackoffLocked(); |
| resolver_->RequestReresolutionLocked(); |
| } |
| if (lb_policy_ != nullptr) lb_policy_->ResetBackoffLocked(); |
| if (pending_lb_policy_ != nullptr) pending_lb_policy_->ResetBackoffLocked(); |
| } |
| |
| void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) { |
| if (resolver_ == nullptr) { |
| GRPC_ERROR_UNREF(error); |
| return; |
| } |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: resolver transient failure: %s", this, |
| grpc_error_string(error)); |
| } |
| // If we already have an LB policy from a previous resolution |
| // result, then we continue to let it set the connectivity state. |
| // Otherwise, we go into TRANSIENT_FAILURE. |
| if (lb_policy_ == nullptr) { |
| grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
| "Resolver transient failure", &error, 1); |
| channel_control_helper()->UpdateState( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(state_error))); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked( |
| const char* lb_policy_name, |
| RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, |
| Resolver::Result result, TraceStringVector* trace_strings) { |
| // If the child policy name changes, we need to create a new child |
| // policy. When this happens, we leave child_policy_ as-is and store |
| // the new child policy in pending_child_policy_. Once the new child |
| // policy transitions into state READY, we swap it into child_policy_, |
| // replacing the original child policy. So pending_child_policy_ is |
| // non-null only between when we apply an update that changes the child |
| // policy name and when the new child reports state READY. |
| // |
| // Updates can arrive at any point during this transition. We always |
| // apply updates relative to the most recently created child policy, |
| // even if the most recent one is still in pending_child_policy_. This |
| // is true both when applying the updates to an existing child policy |
| // and when determining whether we need to create a new policy. |
| // |
| // As a result of this, there are several cases to consider here: |
| // |
| // 1. We have no existing child policy (i.e., we have started up but |
| // have not yet received a serverlist from the balancer or gone |
| // into fallback mode; in this case, both child_policy_ and |
| // pending_child_policy_ are null). In this case, we create a |
| // new child policy and store it in child_policy_. |
| // |
| // 2. We have an existing child policy and have no pending child policy |
| // from a previous update (i.e., either there has not been a |
| // previous update that changed the policy name, or we have already |
| // finished swapping in the new policy; in this case, child_policy_ |
| // is non-null but pending_child_policy_ is null). In this case: |
| // a. If child_policy_->name() equals child_policy_name, then we |
| // update the existing child policy. |
| // b. If child_policy_->name() does not equal child_policy_name, |
| // we create a new policy. The policy will be stored in |
| // pending_child_policy_ and will later be swapped into |
| // child_policy_ by the helper when the new child transitions |
| // into state READY. |
| // |
| // 3. We have an existing child policy and have a pending child policy |
| // from a previous update (i.e., a previous update set |
| // pending_child_policy_ as per case 2b above and that policy has |
| // not yet transitioned into state READY and been swapped into |
| // child_policy_; in this case, both child_policy_ and |
| // pending_child_policy_ are non-null). In this case: |
| // a. If pending_child_policy_->name() equals child_policy_name, |
| // then we update the existing pending child policy. |
| // b. If pending_child_policy->name() does not equal |
| // child_policy_name, then we create a new policy. The new |
| // policy is stored in pending_child_policy_ (replacing the one |
| // that was there before, which will be immediately shut down) |
| // and will later be swapped into child_policy_ by the helper |
| // when the new child transitions into state READY. |
| const bool create_policy = |
| // case 1 |
| lb_policy_ == nullptr || |
| // case 2b |
| (pending_lb_policy_ == nullptr && |
| strcmp(lb_policy_->name(), lb_policy_name) != 0) || |
| // case 3b |
| (pending_lb_policy_ != nullptr && |
| strcmp(pending_lb_policy_->name(), lb_policy_name) != 0); |
| LoadBalancingPolicy* policy_to_update = nullptr; |
| if (create_policy) { |
| // Cases 1, 2b, and 3b: create a new child policy. |
| // If lb_policy_ is null, we set it (case 1), else we set |
| // pending_lb_policy_ (cases 2b and 3b). |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: Creating new %schild policy %s", this, |
| lb_policy_ == nullptr ? "" : "pending ", lb_policy_name); |
| } |
| auto& lb_policy = lb_policy_ == nullptr ? lb_policy_ : pending_lb_policy_; |
| lb_policy = |
| CreateLbPolicyLocked(lb_policy_name, *result.args, trace_strings); |
| policy_to_update = lb_policy.get(); |
| } else { |
| // Cases 2a and 3a: update an existing policy. |
| // If we have a pending child policy, send the update to the pending |
| // policy (case 3a), else send it to the current policy (case 2a). |
| policy_to_update = pending_lb_policy_ != nullptr ? pending_lb_policy_.get() |
| : lb_policy_.get(); |
| } |
| GPR_ASSERT(policy_to_update != nullptr); |
| // Update the policy. |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: Updating %schild policy %p", this, |
| policy_to_update == pending_lb_policy_.get() ? "pending " : "", |
| policy_to_update); |
| } |
| UpdateArgs update_args; |
| update_args.addresses = std::move(result.addresses); |
| update_args.config = std::move(lb_policy_config); |
| // TODO(roth): Once channel args is converted to C++, use std::move() here. |
| update_args.args = result.args; |
| result.args = nullptr; |
| policy_to_update->UpdateLocked(std::move(update_args)); |
| } |
| |
| // Creates a new LB policy. |
| // Updates trace_strings to indicate what was done. |
| OrphanablePtr<LoadBalancingPolicy> |
| ResolvingLoadBalancingPolicy::CreateLbPolicyLocked( |
| const char* lb_policy_name, const grpc_channel_args& args, |
| TraceStringVector* trace_strings) { |
| ResolvingControlHelper* helper = New<ResolvingControlHelper>(Ref()); |
| LoadBalancingPolicy::Args lb_policy_args; |
| lb_policy_args.combiner = combiner(); |
| lb_policy_args.channel_control_helper = |
| UniquePtr<ChannelControlHelper>(helper); |
| lb_policy_args.args = &args; |
| OrphanablePtr<LoadBalancingPolicy> lb_policy = |
| LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( |
| lb_policy_name, std::move(lb_policy_args)); |
| if (GPR_UNLIKELY(lb_policy == nullptr)) { |
| gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); |
| char* str; |
| gpr_asprintf(&str, "Could not create LB policy \"%s\"", lb_policy_name); |
| trace_strings->push_back(str); |
| return nullptr; |
| } |
| helper->set_child(lb_policy.get()); |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: created new LB policy \"%s\" (%p)", |
| this, lb_policy_name, lb_policy.get()); |
| } |
| char* str; |
| gpr_asprintf(&str, "Created new LB policy \"%s\"", lb_policy_name); |
| trace_strings->push_back(str); |
| grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
| interested_parties()); |
| return lb_policy; |
| } |
| |
| void ResolvingLoadBalancingPolicy::MaybeAddTraceMessagesForAddressChangesLocked( |
| bool resolution_contains_addresses, TraceStringVector* trace_strings) { |
| if (!resolution_contains_addresses && |
| previous_resolution_contained_addresses_) { |
| trace_strings->push_back(gpr_strdup("Address list became empty")); |
| } else if (resolution_contains_addresses && |
| !previous_resolution_contained_addresses_) { |
| trace_strings->push_back(gpr_strdup("Address list became non-empty")); |
| } |
| previous_resolution_contained_addresses_ = resolution_contains_addresses; |
| } |
| |
| void ResolvingLoadBalancingPolicy::ConcatenateAndAddChannelTraceLocked( |
| TraceStringVector* trace_strings) const { |
| if (!trace_strings->empty()) { |
| gpr_strvec v; |
| gpr_strvec_init(&v); |
| gpr_strvec_add(&v, gpr_strdup("Resolution event: ")); |
| bool is_first = 1; |
| for (size_t i = 0; i < trace_strings->size(); ++i) { |
| if (!is_first) gpr_strvec_add(&v, gpr_strdup(", ")); |
| is_first = false; |
| gpr_strvec_add(&v, (*trace_strings)[i]); |
| } |
| size_t len = 0; |
| UniquePtr<char> message(gpr_strvec_flatten(&v, &len)); |
| channel_control_helper()->AddTraceEvent(ChannelControlHelper::TRACE_INFO, |
| StringView(message.get())); |
| gpr_strvec_destroy(&v); |
| } |
| } |
| |
| void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( |
| Resolver::Result result) { |
| // Handle race conditions. |
| if (resolver_ == nullptr) return; |
| if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { |
| gpr_log(GPR_INFO, "resolving_lb=%p: got resolver result", this); |
| } |
| // We only want to trace the address resolution in the follow cases: |
| // (a) Address resolution resulted in service config change. |
| // (b) Address resolution that causes number of backends to go from |
| // zero to non-zero. |
| // (c) Address resolution that causes number of backends to go from |
| // non-zero to zero. |
| // (d) Address resolution that causes a new LB policy to be created. |
| // |
| // We track a list of strings to eventually be concatenated and traced. |
| TraceStringVector trace_strings; |
| const bool resolution_contains_addresses = result.addresses.size() > 0; |
| // Process the resolver result. |
| const char* lb_policy_name = nullptr; |
| RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config; |
| bool service_config_changed = false; |
| char* service_config_error_string = nullptr; |
| if (process_resolver_result_ != nullptr) { |
| grpc_error* service_config_error = GRPC_ERROR_NONE; |
| service_config_changed = process_resolver_result_( |
| process_resolver_result_user_data_, result, &lb_policy_name, |
| &lb_policy_config, &service_config_error); |
| if (service_config_error != GRPC_ERROR_NONE) { |
| service_config_error_string = |
| gpr_strdup(grpc_error_string(service_config_error)); |
| if (lb_policy_name == nullptr) { |
| // Use an empty lb_policy_name as an indicator that we received an |
| // invalid service config and we don't have a fallback service config. |
| OnResolverError(service_config_error); |
| } else { |
| GRPC_ERROR_UNREF(service_config_error); |
| } |
| } |
| } else { |
| lb_policy_name = child_policy_name_.get(); |
| lb_policy_config = child_lb_config_; |
| } |
| if (lb_policy_name != nullptr) { |
| // Create or update LB policy, as needed. |
| CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config, |
| std::move(result), &trace_strings); |
| } |
| // Add channel trace event. |
| if (service_config_changed) { |
| // TODO(ncteisen): might be worth somehow including a snippet of the |
| // config in the trace, at the risk of bloating the trace logs. |
| trace_strings.push_back(gpr_strdup("Service config changed")); |
| } |
| if (service_config_error_string != nullptr) { |
| trace_strings.push_back(service_config_error_string); |
| service_config_error_string = nullptr; |
| } |
| MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses, |
| &trace_strings); |
| ConcatenateAndAddChannelTraceLocked(&trace_strings); |
| } |
| |
| } // namespace grpc_core |