| // |
| // Copyright 2015 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/client_channel/subchannel.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <new> |
| #include <utility> |
| |
| #include "absl/status/statusor.h" |
| #include "absl/strings/cord.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/impl/channel_arg_names.h> |
| #include <grpc/slice.h> |
| #include <grpc/status.h> |
| #include <grpc/support/log.h> |
| |
| #include "src/core/client_channel/subchannel_pool_interface.h" |
| #include "src/core/lib/address_utils/sockaddr_utils.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/channel_stack.h" |
| #include "src/core/lib/channel/channel_stack_builder_impl.h" |
| #include "src/core/lib/channel/channel_trace.h" |
| #include "src/core/lib/channel/channelz.h" |
| #include "src/core/lib/config/core_configuration.h" |
| #include "src/core/lib/debug/stats.h" |
| #include "src/core/lib/debug/stats_data.h" |
| #include "src/core/lib/debug/trace.h" |
| #include "src/core/lib/gpr/alloc.h" |
| #include "src/core/lib/gpr/useful.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/status_helper.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/handshaker/proxy_mapper_registry.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/iomgr/pollset_set.h" |
| #include "src/core/lib/promise/cancel_callback.h" |
| #include "src/core/lib/promise/seq.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/surface/channel_init.h" |
| #include "src/core/lib/surface/channel_stack_type.h" |
| #include "src/core/lib/surface/init_internally.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/transport.h" |
| |
| // Backoff parameters. |
| #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 |
| #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 |
| |
| // Conversion between subchannel call and call stack. |
| #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ |
| (grpc_call_stack*)((char*)(call) + \ |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) |
| #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \ |
| (SubchannelCall*)(((char*)(call_stack)) - \ |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) |
| |
| namespace grpc_core { |
| |
| using ::grpc_event_engine::experimental::EventEngine; |
| |
| TraceFlag grpc_trace_subchannel(false, "subchannel"); |
| DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); |
| |
| // |
| // ConnectedSubchannel |
| // |
| |
| ConnectedSubchannel::ConnectedSubchannel( |
| grpc_channel_stack* channel_stack, const ChannelArgs& args, |
| RefCountedPtr<channelz::SubchannelNode> channelz_subchannel) |
| : RefCounted<ConnectedSubchannel>( |
| GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) |
| ? "ConnectedSubchannel" |
| : nullptr), |
| channel_stack_(channel_stack), |
| args_(args), |
| channelz_subchannel_(std::move(channelz_subchannel)) {} |
| |
| ConnectedSubchannel::~ConnectedSubchannel() { |
| GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); |
| } |
| |
| void ConnectedSubchannel::StartWatch( |
| grpc_pollset_set* interested_parties, |
| OrphanablePtr<ConnectivityStateWatcherInterface> watcher) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| op->start_connectivity_watch = std::move(watcher); |
| op->start_connectivity_watch_state = GRPC_CHANNEL_READY; |
| op->bind_pollset_set = interested_parties; |
| grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| void ConnectedSubchannel::Ping(grpc_closure* on_initiate, |
| grpc_closure* on_ack) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| grpc_channel_element* elem; |
| op->send_ping.on_initiate = on_initiate; |
| op->send_ping.on_ack = on_ack; |
| elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const { |
| return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + |
| channel_stack_->call_stack_size; |
| } |
| |
| ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise( |
| CallArgs call_args) { |
| // If not using channelz, we just need to call the channel stack. |
| if (channelz_subchannel() == nullptr) { |
| return channel_stack_->MakeClientCallPromise(std::move(call_args)); |
| } |
| // Otherwise, we need to wrap the channel stack promise with code that |
| // handles the channelz updates. |
| return OnCancel( |
| Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)), |
| [self = Ref()](ServerMetadataHandle metadata) { |
| channelz::SubchannelNode* channelz_subchannel = |
| self->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| if (metadata->get(GrpcStatusMetadata()) |
| .value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallFailed(); |
| } else { |
| channelz_subchannel->RecordCallSucceeded(); |
| } |
| return metadata; |
| }), |
| [self = Ref()]() { |
| channelz::SubchannelNode* channelz_subchannel = |
| self->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| channelz_subchannel->RecordCallFailed(); |
| }); |
| } |
| |
| // |
| // SubchannelCall |
| // |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args, |
| grpc_error_handle* error) { |
| const size_t allocation_size = |
| args.connected_subchannel->GetInitialCallSizeEstimate(); |
| Arena* arena = args.arena; |
| return RefCountedPtr<SubchannelCall>(new ( |
| arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); |
| } |
| |
| SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error) |
| : connected_subchannel_(std::move(args.connected_subchannel)), |
| deadline_(args.deadline) { |
| grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| const grpc_call_element_args call_args = { |
| callstk, // call_stack |
| nullptr, // server_transport_data |
| args.context, // context |
| args.path.c_slice(), // path |
| args.start_time, // start_time |
| args.deadline, // deadline |
| args.arena, // arena |
| args.call_combiner // call_combiner |
| }; |
| *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, |
| SubchannelCall::Destroy, this, &call_args); |
| if (GPR_UNLIKELY(!error->ok())) { |
| gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str()); |
| return; |
| } |
| grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); |
| auto* channelz_node = connected_subchannel_->channelz_subchannel(); |
| if (channelz_node != nullptr) { |
| channelz_node->RecordCallStarted(); |
| } |
| } |
| |
| void SubchannelCall::StartTransportStreamOpBatch( |
| grpc_transport_stream_op_batch* batch) { |
| MaybeInterceptRecvTrailingMetadata(batch); |
| grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); |
| GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); |
| top_elem->filter->start_transport_stream_op_batch(top_elem, batch); |
| } |
| |
| grpc_call_stack* SubchannelCall::GetCallStack() { |
| return SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| } |
| |
| void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { |
| GPR_ASSERT(after_call_stack_destroy_ == nullptr); |
| GPR_ASSERT(closure != nullptr); |
| after_call_stack_destroy_ = closure; |
| } |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Ref() { |
| IncrementRefCount(); |
| return RefCountedPtr<SubchannelCall>(this); |
| } |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location, |
| const char* reason) { |
| IncrementRefCount(location, reason); |
| return RefCountedPtr<SubchannelCall>(this); |
| } |
| |
| void SubchannelCall::Unref() { |
| GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); |
| } |
| |
| void SubchannelCall::Unref(const DebugLocation& /*location*/, |
| const char* reason) { |
| GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); |
| } |
| |
| void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) { |
| SubchannelCall* self = static_cast<SubchannelCall*>(arg); |
| // Keep some members before destroying the subchannel call. |
| grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_; |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel = |
| std::move(self->connected_subchannel_); |
| // Destroy the subchannel call. |
| self->~SubchannelCall(); |
| // Destroy the call stack. This should be after destroying the subchannel |
| // call, because call->after_call_stack_destroy(), if not null, will free the |
| // call arena. |
| grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr, |
| after_call_stack_destroy); |
| // Automatically reset connected_subchannel. This should be after destroying |
| // the call stack, because destroying call stack needs access to the channel |
| // stack. |
| } |
| |
| void SubchannelCall::MaybeInterceptRecvTrailingMetadata( |
| grpc_transport_stream_op_batch* batch) { |
| // only intercept payloads with recv trailing. |
| if (!batch->recv_trailing_metadata) { |
| return; |
| } |
| // only add interceptor is channelz is enabled. |
| if (connected_subchannel_->channelz_subchannel() == nullptr) { |
| return; |
| } |
| GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, |
| this, grpc_schedule_on_exec_ctx); |
| // save some state needed for the interception callback. |
| GPR_ASSERT(recv_trailing_metadata_ == nullptr); |
| recv_trailing_metadata_ = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata; |
| original_recv_trailing_metadata_ = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &recv_trailing_metadata_ready_; |
| } |
| |
| namespace { |
| |
| // Sets *status based on the rest of the parameters. |
| void GetCallStatus(grpc_status_code* status, Timestamp deadline, |
| grpc_metadata_batch* md_batch, grpc_error_handle error) { |
| if (!error.ok()) { |
| grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); |
| } else { |
| *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); |
| } |
| } |
| |
| } // namespace |
| |
| void SubchannelCall::RecvTrailingMetadataReady(void* arg, |
| grpc_error_handle error) { |
| SubchannelCall* call = static_cast<SubchannelCall*>(arg); |
| GPR_ASSERT(call->recv_trailing_metadata_ != nullptr); |
| grpc_status_code status = GRPC_STATUS_OK; |
| GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error); |
| channelz::SubchannelNode* channelz_subchannel = |
| call->connected_subchannel_->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| if (status == GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallSucceeded(); |
| } else { |
| channelz_subchannel->RecordCallFailed(); |
| } |
| Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error); |
| } |
| |
| void SubchannelCall::IncrementRefCount() { |
| GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); |
| } |
| |
| void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/, |
| const char* reason) { |
| GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); |
| } |
| |
| // |
| // Subchannel::ConnectedSubchannelStateWatcher |
| // |
| |
| class Subchannel::ConnectedSubchannelStateWatcher final |
| : public AsyncConnectivityStateWatcherInterface { |
| public: |
| // Must be instantiated while holding c->mu. |
| explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c) |
| : subchannel_(std::move(c)) {} |
| |
| ~ConnectedSubchannelStateWatcher() override { |
| subchannel_.reset(DEBUG_LOCATION, "state_watcher"); |
| } |
| |
| private: |
| void OnConnectivityStateChange(grpc_connectivity_state new_state, |
| const absl::Status& status) override { |
| Subchannel* c = subchannel_.get(); |
| { |
| MutexLock lock(&c->mu_); |
| // If we're either shutting down or have already seen this connection |
| // failure (i.e., c->connected_subchannel_ is null), do nothing. |
| // |
| // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN |
| // upon connection close. So if the server gracefully shuts down, |
| // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we |
| // will see only SHUTDOWN. Either way, we react to the first one we |
| // see, ignoring anything that happens after that. |
| if (c->connected_subchannel_ == nullptr) return; |
| if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || |
| new_state == GRPC_CHANNEL_SHUTDOWN) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { |
| gpr_log(GPR_INFO, |
| "subchannel %p %s: Connected subchannel %p reports %s: %s", c, |
| c->key_.ToString().c_str(), c->connected_subchannel_.get(), |
| ConnectivityStateName(new_state), status.ToString().c_str()); |
| } |
| c->connected_subchannel_.reset(); |
| if (c->channelz_node() != nullptr) { |
| c->channelz_node()->SetChildSocket(nullptr); |
| } |
| // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here, |
| // pass along the status from the transport, since it may have |
| // keepalive info attached to it that the channel needs. |
| // TODO(roth): Consider whether there's a cleaner way to do this. |
| c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status); |
| c->backoff_.Reset(); |
| } |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| c->work_serializer_.DrainQueue(); |
| } |
| |
| WeakRefCountedPtr<Subchannel> subchannel_; |
| }; |
| |
| // |
| // Subchannel::ConnectivityStateWatcherList |
| // |
| |
| void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( |
| RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { |
| watchers_.insert(std::make_pair(watcher.get(), std::move(watcher))); |
| } |
| |
| void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( |
| ConnectivityStateWatcherInterface* watcher) { |
| watchers_.erase(watcher); |
| } |
| |
| void Subchannel::ConnectivityStateWatcherList::NotifyLocked( |
| grpc_connectivity_state state, const absl::Status& status) { |
| for (const auto& p : watchers_) { |
| subchannel_->work_serializer_.Schedule( |
| [watcher = p.second->Ref(), state, status]() mutable { |
| auto* watcher_ptr = watcher.get(); |
| watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, |
| status); |
| }, |
| DEBUG_LOCATION); |
| } |
| } |
| |
| // |
| // Subchannel |
| // |
| |
| namespace { |
| |
| BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args, |
| Duration* min_connect_timeout) { |
| const absl::optional<Duration> fixed_reconnect_backoff = |
| args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms"); |
| if (fixed_reconnect_backoff.has_value()) { |
| const Duration backoff = |
| std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff); |
| *min_connect_timeout = backoff; |
| return BackOff::Options() |
| .set_initial_backoff(backoff) |
| .set_multiplier(1.0) |
| .set_jitter(0.0) |
| .set_max_backoff(backoff); |
| } |
| const Duration initial_backoff = std::max( |
| Duration::Milliseconds(100), |
| args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS) |
| .value_or(Duration::Seconds( |
| GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS))); |
| *min_connect_timeout = |
| std::max(Duration::Milliseconds(100), |
| args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS) |
| .value_or(Duration::Seconds( |
| GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS))); |
| const Duration max_backoff = |
| std::max(Duration::Milliseconds(100), |
| args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS) |
| .value_or(Duration::Seconds( |
| GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS))); |
| return BackOff::Options() |
| .set_initial_backoff(initial_backoff) |
| .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) |
| .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER) |
| .set_max_backoff(max_backoff); |
| } |
| |
| } // namespace |
| |
| Subchannel::Subchannel(SubchannelKey key, |
| OrphanablePtr<SubchannelConnector> connector, |
| const ChannelArgs& args) |
| : DualRefCounted<Subchannel>( |
| GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel" |
| : nullptr), |
| key_(std::move(key)), |
| args_(args), |
| pollset_set_(grpc_pollset_set_create()), |
| connector_(std::move(connector)), |
| watcher_list_(this), |
| work_serializer_(args_.GetObjectRef<EventEngine>()), |
| backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)), |
| event_engine_(args_.GetObjectRef<EventEngine>()) { |
| // A grpc_init is added here to ensure that grpc_shutdown does not happen |
| // until the subchannel is destroyed. Subchannels can persist longer than |
| // channels because they maybe reused/shared among multiple channels. As a |
| // result the subchannel destruction happens asynchronously to channel |
| // destruction. If the last channel destruction triggers a grpc_shutdown |
| // before the last subchannel destruction, then there maybe race conditions |
| // triggering segmentation faults. To prevent this issue, we call a grpc_init |
| // here and a grpc_shutdown in the subchannel destructor. |
| InitInternally(); |
| global_stats().IncrementClientSubchannelsCreated(); |
| GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, |
| grpc_schedule_on_exec_ctx); |
| // Check proxy mapper to determine address to connect to and channel |
| // args to use. |
| address_for_connect_ = CoreConfiguration::Get() |
| .proxy_mapper_registry() |
| .MapAddress(key_.address(), &args_) |
| .value_or(key_.address()); |
| // Initialize channelz. |
| const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ) |
| .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT); |
| if (channelz_enabled) { |
| const size_t channel_tracer_max_memory = Clamp( |
| args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE) |
| .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT), |
| 0, INT_MAX); |
| channelz_node_ = MakeRefCounted<channelz::SubchannelNode>( |
| grpc_sockaddr_to_uri(&key_.address()) |
| .value_or("<unknown address type>"), |
| channel_tracer_max_memory); |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("subchannel created")); |
| } |
| } |
| |
| Subchannel::~Subchannel() { |
| if (channelz_node_ != nullptr) { |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("Subchannel destroyed")); |
| channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN); |
| } |
| connector_.reset(); |
| grpc_pollset_set_destroy(pollset_set_); |
| // grpc_shutdown is called here because grpc_init is called in the ctor. |
| ShutdownInternally(); |
| } |
| |
| RefCountedPtr<Subchannel> Subchannel::Create( |
| OrphanablePtr<SubchannelConnector> connector, |
| const grpc_resolved_address& address, const ChannelArgs& args) { |
| SubchannelKey key(address, args); |
| auto* subchannel_pool = args.GetObject<SubchannelPoolInterface>(); |
| GPR_ASSERT(subchannel_pool != nullptr); |
| RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key); |
| if (c != nullptr) { |
| return c; |
| } |
| c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args); |
| // Try to register the subchannel before setting the subchannel pool. |
| // Otherwise, in case of a registration race, unreffing c in |
| // RegisterSubchannel() will cause c to be tried to be unregistered, while |
| // its key maps to a different subchannel. |
| RefCountedPtr<Subchannel> registered = |
| subchannel_pool->RegisterSubchannel(c->key_, c); |
| if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); |
| return registered; |
| } |
| |
| void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { |
| MutexLock lock(&mu_); |
| // Only update the value if the new keepalive time is larger. |
| if (new_keepalive_time > keepalive_time_) { |
| keepalive_time_ = new_keepalive_time; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { |
| gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d", |
| this, key_.ToString().c_str(), new_keepalive_time); |
| } |
| args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time); |
| } |
| } |
| |
| channelz::SubchannelNode* Subchannel::channelz_node() { |
| return channelz_node_.get(); |
| } |
| |
| void Subchannel::WatchConnectivityState( |
| RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { |
| { |
| MutexLock lock(&mu_); |
| grpc_pollset_set* interested_parties = watcher->interested_parties(); |
| if (interested_parties != nullptr) { |
| grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); |
| } |
| work_serializer_.Schedule( |
| [watcher = watcher->Ref(), state = state_, status = status_]() mutable { |
| auto* watcher_ptr = watcher.get(); |
| watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, |
| status); |
| }, |
| DEBUG_LOCATION); |
| watcher_list_.AddWatcherLocked(std::move(watcher)); |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::CancelConnectivityStateWatch( |
| ConnectivityStateWatcherInterface* watcher) { |
| { |
| MutexLock lock(&mu_); |
| grpc_pollset_set* interested_parties = watcher->interested_parties(); |
| if (interested_parties != nullptr) { |
| grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); |
| } |
| watcher_list_.RemoveWatcherLocked(watcher); |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| // (Shouldn't actually be necessary in this case, but better safe than sorry.) |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::RequestConnection() { |
| { |
| MutexLock lock(&mu_); |
| if (state_ == GRPC_CHANNEL_IDLE) { |
| StartConnectingLocked(); |
| } |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::ResetBackoff() { |
| // Hold a ref to ensure cancellation and subsequent deletion of the closure |
| // does not eliminate the last ref and destroy the Subchannel before the |
| // method returns. |
| auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff"); |
| { |
| MutexLock lock(&mu_); |
| backoff_.Reset(); |
| if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && |
| event_engine_->Cancel(retry_timer_handle_)) { |
| OnRetryTimerLocked(); |
| } else if (state_ == GRPC_CHANNEL_CONNECTING) { |
| next_attempt_time_ = Timestamp::Now(); |
| } |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::Orphaned() { |
| // The subchannel_pool is only used once here in this subchannel, so the |
| // access can be outside of the lock. |
| if (subchannel_pool_ != nullptr) { |
| subchannel_pool_->UnregisterSubchannel(key_, this); |
| subchannel_pool_.reset(); |
| } |
| { |
| MutexLock lock(&mu_); |
| GPR_ASSERT(!shutdown_); |
| shutdown_ = true; |
| connector_.reset(); |
| connected_subchannel_.reset(); |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::GetOrAddDataProducer( |
| UniqueTypeName type, |
| std::function<void(DataProducerInterface**)> get_or_add) { |
| MutexLock lock(&mu_); |
| auto it = data_producer_map_.emplace(type, nullptr).first; |
| get_or_add(&it->second); |
| } |
| |
| void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) { |
| MutexLock lock(&mu_); |
| auto it = data_producer_map_.find(data_producer->type()); |
| if (it != data_producer_map_.end() && it->second == data_producer) { |
| data_producer_map_.erase(it); |
| } |
| } |
| |
| // Note: Must be called with a state that is different from the current state. |
| void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, |
| const absl::Status& status) { |
| state_ = state; |
| if (status.ok()) { |
| status_ = status; |
| } else { |
| // Augment status message to include IP address. |
| status_ = absl::Status(status.code(), |
| absl::StrCat(grpc_sockaddr_to_uri(&key_.address()) |
| .value_or("<unknown address type>"), |
| ": ", status.message())); |
| status.ForEachPayload( |
| [this](absl::string_view key, const absl::Cord& value) |
| // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here, |
| // but that won't work, because we can't pass the lock |
| // annotation through absl::Status::ForEachPayload(). |
| ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); }); |
| } |
| if (channelz_node_ != nullptr) { |
| channelz_node_->UpdateConnectivityState(state); |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_cpp_string(absl::StrCat( |
| "Subchannel connectivity state changed to ", |
| ConnectivityStateName(state), |
| status.ok() ? "" : absl::StrCat(": ", status_.ToString())))); |
| } |
| // Notify watchers. |
| watcher_list_.NotifyLocked(state, status_); |
| } |
| |
| void Subchannel::OnRetryTimer() { |
| { |
| MutexLock lock(&mu_); |
| OnRetryTimerLocked(); |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| work_serializer_.DrainQueue(); |
| } |
| |
| void Subchannel::OnRetryTimerLocked() { |
| if (shutdown_) return; |
| gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE", |
| this, key_.ToString().c_str()); |
| SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus()); |
| } |
| |
| void Subchannel::StartConnectingLocked() { |
| // Set next attempt time. |
| const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now(); |
| next_attempt_time_ = backoff_.NextAttemptTime(); |
| // Report CONNECTING. |
| SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus()); |
| // Start connection attempt. |
| SubchannelConnector::Args args; |
| args.address = &address_for_connect_; |
| args.interested_parties = pollset_set_; |
| args.deadline = std::max(next_attempt_time_, min_deadline); |
| args.channel_args = args_; |
| WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback. |
| connector_->Connect(args, &connecting_result_, &on_connecting_finished_); |
| } |
| |
| void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) { |
| WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg)); |
| { |
| MutexLock lock(&c->mu_); |
| c->OnConnectingFinishedLocked(error); |
| } |
| // Drain any connectivity state notifications after releasing the mutex. |
| c->work_serializer_.DrainQueue(); |
| c.reset(DEBUG_LOCATION, "Connect"); |
| } |
| |
| void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { |
| if (shutdown_) { |
| connecting_result_.Reset(); |
| return; |
| } |
| // If we didn't get a transport or we fail to publish it, report |
| // TRANSIENT_FAILURE and start the retry timer. |
| // Note that if the connection attempt took longer than the backoff |
| // time, then the timer will fire immediately, and we will quickly |
| // transition back to IDLE. |
| if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { |
| const Duration time_until_next_attempt = |
| next_attempt_time_ - Timestamp::Now(); |
| gpr_log(GPR_INFO, |
| "subchannel %p %s: connect failed (%s), backing off for %" PRId64 |
| " ms", |
| this, key_.ToString().c_str(), StatusToString(error).c_str(), |
| time_until_next_attempt.millis()); |
| SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, |
| grpc_error_to_absl_status(error)); |
| retry_timer_handle_ = event_engine_->RunAfter( |
| time_until_next_attempt, |
| [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { |
| { |
| ApplicationCallbackExecCtx callback_exec_ctx; |
| ExecCtx exec_ctx; |
| self->OnRetryTimer(); |
| // Subchannel deletion might require an active ExecCtx. So if |
| // self.reset() is not called here, the WeakRefCountedPtr destructor |
| // may run after the ExecCtx declared in the callback is destroyed. |
| // Since subchannel may get destroyed when the WeakRefCountedPtr |
| // destructor runs, it may not have an active ExecCtx - thus leading |
| // to crashes. |
| self.reset(); |
| } |
| }); |
| } |
| } |
| |
| bool Subchannel::PublishTransportLocked() { |
| // Construct channel stack. |
| // Builder takes ownership of transport. |
| ChannelStackBuilderImpl builder( |
| "subchannel", GRPC_CLIENT_SUBCHANNEL, |
| connecting_result_.channel_args.SetObject( |
| std::exchange(connecting_result_.transport, nullptr))); |
| if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) { |
| return false; |
| } |
| absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stk = builder.Build(); |
| if (!stk.ok()) { |
| auto error = absl_status_to_grpc_error(stk.status()); |
| connecting_result_.Reset(); |
| gpr_log(GPR_ERROR, |
| "subchannel %p %s: error initializing subchannel stack: %s", this, |
| key_.ToString().c_str(), StatusToString(error).c_str()); |
| return false; |
| } |
| RefCountedPtr<channelz::SocketNode> socket = |
| std::move(connecting_result_.socket_node); |
| connecting_result_.Reset(); |
| if (shutdown_) return false; |
| // Publish. |
| connected_subchannel_.reset( |
| new ConnectedSubchannel(stk->release(), args_, channelz_node_)); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) { |
| gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this, |
| key_.ToString().c_str(), connected_subchannel_.get()); |
| } |
| if (channelz_node_ != nullptr) { |
| channelz_node_->SetChildSocket(std::move(socket)); |
| } |
| // Start watching connected subchannel. |
| connected_subchannel_->StartWatch( |
| pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>( |
| WeakRef(DEBUG_LOCATION, "state_watcher"))); |
| // Report initial state. |
| SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status()); |
| return true; |
| } |
| |
| } // namespace grpc_core |