| /* |
| * |
| * Copyright 2015 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/filters/client_channel/subchannel.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| |
| #include <algorithm> |
| #include <cstring> |
| |
| #include "absl/strings/str_format.h" |
| |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/string_util.h> |
| |
| #include "src/core/ext/filters/client_channel/client_channel.h" |
| #include "src/core/ext/filters/client_channel/health/health_check_client.h" |
| #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" |
| #include "src/core/ext/filters/client_channel/service_config.h" |
| #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" |
| #include "src/core/lib/backoff/backoff.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/connected_channel.h" |
| #include "src/core/lib/debug/stats.h" |
| #include "src/core/lib/gpr/alloc.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/manual_constructor.h" |
| #include "src/core/lib/gprpp/ref_counted_ptr.h" |
| #include "src/core/lib/gprpp/sync.h" |
| #include "src/core/lib/iomgr/parse_address.h" |
| #include "src/core/lib/iomgr/sockaddr_utils.h" |
| #include "src/core/lib/profiling/timers.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/surface/channel.h" |
| #include "src/core/lib/surface/channel_init.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/status_metadata.h" |
| #include "src/core/lib/uri/uri_parser.h" |
| |
| // Strong and weak refs. |
| #define INTERNAL_REF_BITS 16 |
| #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) |
| |
| // Backoff parameters. |
| #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 |
| #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 |
| #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 |
| #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 |
| #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 |
| |
| // Conversion between subchannel call and call stack. |
| #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ |
| (grpc_call_stack*)((char*)(call) + \ |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) |
| #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \ |
| (SubchannelCall*)(((char*)(call_stack)) - \ |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) |
| |
| namespace grpc_core { |
| |
| TraceFlag grpc_trace_subchannel(false, "subchannel"); |
| DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); |
| |
| // |
| // ConnectedSubchannel |
| // |
| |
| ConnectedSubchannel::ConnectedSubchannel( |
| grpc_channel_stack* channel_stack, const grpc_channel_args* args, |
| RefCountedPtr<channelz::SubchannelNode> channelz_subchannel) |
| : RefCounted<ConnectedSubchannel>( |
| GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) |
| ? "ConnectedSubchannel" |
| : nullptr), |
| channel_stack_(channel_stack), |
| args_(grpc_channel_args_copy(args)), |
| channelz_subchannel_(std::move(channelz_subchannel)) {} |
| |
| ConnectedSubchannel::~ConnectedSubchannel() { |
| grpc_channel_args_destroy(args_); |
| GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); |
| } |
| |
| void ConnectedSubchannel::StartWatch( |
| grpc_pollset_set* interested_parties, |
| OrphanablePtr<ConnectivityStateWatcherInterface> watcher) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| op->start_connectivity_watch = std::move(watcher); |
| op->start_connectivity_watch_state = GRPC_CHANNEL_READY; |
| op->bind_pollset_set = interested_parties; |
| grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| void ConnectedSubchannel::Ping(grpc_closure* on_initiate, |
| grpc_closure* on_ack) { |
| grpc_transport_op* op = grpc_make_transport_op(nullptr); |
| grpc_channel_element* elem; |
| op->send_ping.on_initiate = on_initiate; |
| op->send_ping.on_ack = on_ack; |
| elem = grpc_channel_stack_element(channel_stack_, 0); |
| elem->filter->start_transport_op(elem, op); |
| } |
| |
| size_t ConnectedSubchannel::GetInitialCallSizeEstimate( |
| size_t parent_data_size) const { |
| size_t allocation_size = |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); |
| if (parent_data_size > 0) { |
| allocation_size += |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + |
| parent_data_size; |
| } else { |
| allocation_size += channel_stack_->call_stack_size; |
| } |
| return allocation_size; |
| } |
| |
| // |
| // SubchannelCall |
| // |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args, |
| grpc_error** error) { |
| const size_t allocation_size = |
| args.connected_subchannel->GetInitialCallSizeEstimate( |
| args.parent_data_size); |
| Arena* arena = args.arena; |
| return RefCountedPtr<SubchannelCall>(new ( |
| arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error)); |
| } |
| |
| SubchannelCall::SubchannelCall(Args args, grpc_error** error) |
| : connected_subchannel_(std::move(args.connected_subchannel)), |
| deadline_(args.deadline) { |
| grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| const grpc_call_element_args call_args = { |
| callstk, /* call_stack */ |
| nullptr, /* server_transport_data */ |
| args.context, /* context */ |
| args.path, /* path */ |
| args.start_time, /* start_time */ |
| args.deadline, /* deadline */ |
| args.arena, /* arena */ |
| args.call_combiner /* call_combiner */ |
| }; |
| *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, |
| SubchannelCall::Destroy, this, &call_args); |
| if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) { |
| const char* error_string = grpc_error_string(*error); |
| gpr_log(GPR_ERROR, "error: %s", error_string); |
| return; |
| } |
| grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); |
| auto* channelz_node = connected_subchannel_->channelz_subchannel(); |
| if (channelz_node != nullptr) { |
| channelz_node->RecordCallStarted(); |
| } |
| } |
| |
| void SubchannelCall::StartTransportStreamOpBatch( |
| grpc_transport_stream_op_batch* batch) { |
| GPR_TIMER_SCOPE("subchannel_call_process_op", 0); |
| MaybeInterceptRecvTrailingMetadata(batch); |
| grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); |
| GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); |
| top_elem->filter->start_transport_stream_op_batch(top_elem, batch); |
| } |
| |
| void* SubchannelCall::GetParentData() { |
| grpc_channel_stack* chanstk = connected_subchannel_->channel_stack(); |
| return reinterpret_cast<char*>(this) + |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + |
| GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); |
| } |
| |
| grpc_call_stack* SubchannelCall::GetCallStack() { |
| return SUBCHANNEL_CALL_TO_CALL_STACK(this); |
| } |
| |
| void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { |
| GPR_ASSERT(after_call_stack_destroy_ == nullptr); |
| GPR_ASSERT(closure != nullptr); |
| after_call_stack_destroy_ = closure; |
| } |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Ref() { |
| IncrementRefCount(); |
| return RefCountedPtr<SubchannelCall>(this); |
| } |
| |
| RefCountedPtr<SubchannelCall> SubchannelCall::Ref( |
| const grpc_core::DebugLocation& location, const char* reason) { |
| IncrementRefCount(location, reason); |
| return RefCountedPtr<SubchannelCall>(this); |
| } |
| |
| void SubchannelCall::Unref() { |
| GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); |
| } |
| |
| void SubchannelCall::Unref(const DebugLocation& /*location*/, |
| const char* reason) { |
| GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); |
| } |
| |
| void SubchannelCall::Destroy(void* arg, grpc_error* /*error*/) { |
| GPR_TIMER_SCOPE("subchannel_call_destroy", 0); |
| SubchannelCall* self = static_cast<SubchannelCall*>(arg); |
| // Keep some members before destroying the subchannel call. |
| grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_; |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel = |
| std::move(self->connected_subchannel_); |
| // Destroy the subchannel call. |
| self->~SubchannelCall(); |
| // Destroy the call stack. This should be after destroying the subchannel |
| // call, because call->after_call_stack_destroy(), if not null, will free the |
| // call arena. |
| grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr, |
| after_call_stack_destroy); |
| // Automatically reset connected_subchannel. This should be after destroying |
| // the call stack, because destroying call stack needs access to the channel |
| // stack. |
| } |
| |
| void SubchannelCall::MaybeInterceptRecvTrailingMetadata( |
| grpc_transport_stream_op_batch* batch) { |
| // only intercept payloads with recv trailing. |
| if (!batch->recv_trailing_metadata) { |
| return; |
| } |
| // only add interceptor is channelz is enabled. |
| if (connected_subchannel_->channelz_subchannel() == nullptr) { |
| return; |
| } |
| GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, |
| this, grpc_schedule_on_exec_ctx); |
| // save some state needed for the interception callback. |
| GPR_ASSERT(recv_trailing_metadata_ == nullptr); |
| recv_trailing_metadata_ = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata; |
| original_recv_trailing_metadata_ = |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
| &recv_trailing_metadata_ready_; |
| } |
| |
| namespace { |
| |
| // Sets *status based on the rest of the parameters. |
| void GetCallStatus(grpc_status_code* status, grpc_millis deadline, |
| grpc_metadata_batch* md_batch, grpc_error* error) { |
| if (error != GRPC_ERROR_NONE) { |
| grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); |
| } else { |
| if (md_batch->idx.named.grpc_status != nullptr) { |
| *status = grpc_get_status_code_from_metadata( |
| md_batch->idx.named.grpc_status->md); |
| } else { |
| *status = GRPC_STATUS_UNKNOWN; |
| } |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| } // namespace |
| |
| void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { |
| SubchannelCall* call = static_cast<SubchannelCall*>(arg); |
| GPR_ASSERT(call->recv_trailing_metadata_ != nullptr); |
| grpc_status_code status = GRPC_STATUS_OK; |
| GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, |
| GRPC_ERROR_REF(error)); |
| channelz::SubchannelNode* channelz_subchannel = |
| call->connected_subchannel_->channelz_subchannel(); |
| GPR_ASSERT(channelz_subchannel != nullptr); |
| if (status == GRPC_STATUS_OK) { |
| channelz_subchannel->RecordCallSucceeded(); |
| } else { |
| channelz_subchannel->RecordCallFailed(); |
| } |
| Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, |
| GRPC_ERROR_REF(error)); |
| } |
| |
| void SubchannelCall::IncrementRefCount() { |
| GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); |
| } |
| |
| void SubchannelCall::IncrementRefCount( |
| const grpc_core::DebugLocation& /*location*/, const char* reason) { |
| GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); |
| } |
| |
| // |
| // Subchannel::ConnectedSubchannelStateWatcher |
| // |
| |
| class Subchannel::ConnectedSubchannelStateWatcher |
| : public AsyncConnectivityStateWatcherInterface { |
| public: |
| // Must be instantiated while holding c->mu. |
| explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { |
| // Steal subchannel ref for connecting. |
| GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); |
| GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); |
| } |
| |
| ~ConnectedSubchannelStateWatcher() override { |
| GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); |
| } |
| |
| private: |
| void OnConnectivityStateChange(grpc_connectivity_state new_state, |
| const absl::Status& status) override { |
| Subchannel* c = subchannel_; |
| MutexLock lock(&c->mu_); |
| switch (new_state) { |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: |
| case GRPC_CHANNEL_SHUTDOWN: { |
| if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { |
| if (grpc_trace_subchannel.enabled()) { |
| gpr_log(GPR_INFO, |
| "Connected subchannel %p of subchannel %p has gone into " |
| "%s. Attempting to reconnect.", |
| c->connected_subchannel_.get(), c, |
| ConnectivityStateName(new_state)); |
| } |
| c->connected_subchannel_.reset(); |
| if (c->channelz_node() != nullptr) { |
| c->channelz_node()->SetChildSocket(nullptr); |
| } |
| // We need to construct our own status if the underlying state was |
| // shutdown since the accompanying status will be StatusCode::OK |
| // otherwise. |
| c->SetConnectivityStateLocked( |
| GRPC_CHANNEL_TRANSIENT_FAILURE, |
| new_state == GRPC_CHANNEL_SHUTDOWN |
| ? absl::Status(absl::StatusCode::kUnavailable, |
| "Subchannel has disconnected.") |
| : status); |
| c->backoff_begun_ = false; |
| c->backoff_.Reset(); |
| } |
| break; |
| } |
| default: { |
| // In principle, this should never happen. We should not get |
| // a callback for READY, because that was the state we started |
| // this watch from. And a connected subchannel should never go |
| // from READY to CONNECTING or IDLE. |
| c->SetConnectivityStateLocked(new_state, status); |
| } |
| } |
| } |
| |
| Subchannel* subchannel_; |
| }; |
| |
| // Asynchronously notifies the \a watcher of a change in the connectvity state |
| // of \a subchannel to the current \a state. Deletes itself when done. |
| class Subchannel::AsyncWatcherNotifierLocked { |
| public: |
| AsyncWatcherNotifierLocked( |
| RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher, |
| Subchannel* subchannel, grpc_connectivity_state state, |
| const absl::Status& status) |
| : watcher_(std::move(watcher)) { |
| RefCountedPtr<ConnectedSubchannel> connected_subchannel; |
| if (state == GRPC_CHANNEL_READY) { |
| connected_subchannel = subchannel->connected_subchannel_; |
| } |
| watcher_->PushConnectivityStateChange( |
| {state, status, std::move(connected_subchannel)}); |
| ExecCtx::Run(DEBUG_LOCATION, |
| GRPC_CLOSURE_INIT( |
| &closure_, |
| [](void* arg, grpc_error* /*error*/) { |
| auto* self = |
| static_cast<AsyncWatcherNotifierLocked*>(arg); |
| self->watcher_->OnConnectivityStateChange(); |
| delete self; |
| }, |
| this, nullptr), |
| GRPC_ERROR_NONE); |
| } |
| |
| private: |
| RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_; |
| grpc_closure closure_; |
| }; |
| |
| // |
| // Subchannel::ConnectivityStateWatcherList |
| // |
| |
| void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( |
| RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { |
| watchers_.insert(std::make_pair(watcher.get(), std::move(watcher))); |
| } |
| |
| void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( |
| ConnectivityStateWatcherInterface* watcher) { |
| watchers_.erase(watcher); |
| } |
| |
| void Subchannel::ConnectivityStateWatcherList::NotifyLocked( |
| Subchannel* subchannel, grpc_connectivity_state state, |
| const absl::Status& status) { |
| for (const auto& p : watchers_) { |
| new AsyncWatcherNotifierLocked(p.second, subchannel, state, status); |
| } |
| } |
| |
| // |
| // Subchannel::HealthWatcherMap::HealthWatcher |
| // |
| |
| // State needed for tracking the connectivity state with a particular |
| // health check service name. |
| class Subchannel::HealthWatcherMap::HealthWatcher |
| : public AsyncConnectivityStateWatcherInterface { |
| public: |
| HealthWatcher(Subchannel* c, |
| grpc_core::UniquePtr<char> health_check_service_name, |
| grpc_connectivity_state subchannel_state) |
| : subchannel_(c), |
| health_check_service_name_(std::move(health_check_service_name)), |
| state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING |
| : subchannel_state) { |
| GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher"); |
| // If the subchannel is already connected, start health checking. |
| if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked(); |
| } |
| |
| ~HealthWatcher() override { |
| GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher"); |
| } |
| |
| const char* health_check_service_name() const { |
| return health_check_service_name_.get(); |
| } |
| |
| grpc_connectivity_state state() const { return state_; } |
| |
| void AddWatcherLocked( |
| grpc_connectivity_state initial_state, |
| RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) { |
| if (state_ != initial_state) { |
| new AsyncWatcherNotifierLocked(watcher, subchannel_, state_, status_); |
| } |
| watcher_list_.AddWatcherLocked(std::move(watcher)); |
| } |
| |
| void RemoveWatcherLocked( |
| Subchannel::ConnectivityStateWatcherInterface* watcher) { |
| watcher_list_.RemoveWatcherLocked(watcher); |
| } |
| |
| bool HasWatchers() const { return !watcher_list_.empty(); } |
| |
| void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) { |
| if (state == GRPC_CHANNEL_READY) { |
| // If we had not already notified for CONNECTING state, do so now. |
| // (We may have missed this earlier, because if the transition |
| // from IDLE to CONNECTING to READY was too quick, the connected |
| // subchannel may not have sent us a notification for CONNECTING.) |
| if (state_ != GRPC_CHANNEL_CONNECTING) { |
| state_ = GRPC_CHANNEL_CONNECTING; |
| status_ = status; |
| watcher_list_.NotifyLocked(subchannel_, state_, status); |
| } |
| // If we've become connected, start health checking. |
| StartHealthCheckingLocked(); |
| } else { |
| state_ = state; |
| status_ = status; |
| watcher_list_.NotifyLocked(subchannel_, state_, status); |
| // We're not connected, so stop health checking. |
| health_check_client_.reset(); |
| } |
| } |
| |
| void Orphan() override { |
| watcher_list_.Clear(); |
| health_check_client_.reset(); |
| Unref(); |
| } |
| |
| private: |
| void OnConnectivityStateChange(grpc_connectivity_state new_state, |
| const absl::Status& status) override { |
| MutexLock lock(&subchannel_->mu_); |
| if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) { |
| state_ = new_state; |
| status_ = status; |
| watcher_list_.NotifyLocked(subchannel_, new_state, status); |
| } |
| } |
| |
| void StartHealthCheckingLocked() { |
| GPR_ASSERT(health_check_client_ == nullptr); |
| health_check_client_ = MakeOrphanable<HealthCheckClient>( |
| health_check_service_name_.get(), subchannel_->connected_subchannel_, |
| subchannel_->pollset_set_, subchannel_->channelz_node_, Ref()); |
| } |
| |
| Subchannel* subchannel_; |
| grpc_core::UniquePtr<char> health_check_service_name_; |
| OrphanablePtr<HealthCheckClient> health_check_client_; |
| grpc_connectivity_state state_; |
| absl::Status status_; |
| ConnectivityStateWatcherList watcher_list_; |
| }; |
| |
| // |
| // Subchannel::HealthWatcherMap |
| // |
| |
| void Subchannel::HealthWatcherMap::AddWatcherLocked( |
| Subchannel* subchannel, grpc_connectivity_state initial_state, |
| grpc_core::UniquePtr<char> health_check_service_name, |
| RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { |
| // If the health check service name is not already present in the map, |
| // add it. |
| auto it = map_.find(health_check_service_name.get()); |
| HealthWatcher* health_watcher; |
| if (it == map_.end()) { |
| const char* key = health_check_service_name.get(); |
| auto w = MakeOrphanable<HealthWatcher>( |
| subchannel, std::move(health_check_service_name), subchannel->state_); |
| health_watcher = w.get(); |
| map_[key] = std::move(w); |
| } else { |
| health_watcher = it->second.get(); |
| } |
| // Add the watcher to the entry. |
| health_watcher->AddWatcherLocked(initial_state, std::move(watcher)); |
| } |
| |
| void Subchannel::HealthWatcherMap::RemoveWatcherLocked( |
| const char* health_check_service_name, |
| ConnectivityStateWatcherInterface* watcher) { |
| auto it = map_.find(health_check_service_name); |
| GPR_ASSERT(it != map_.end()); |
| it->second->RemoveWatcherLocked(watcher); |
| // If we just removed the last watcher for this service name, remove |
| // the map entry. |
| if (!it->second->HasWatchers()) map_.erase(it); |
| } |
| |
| void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state, |
| const absl::Status& status) { |
| for (const auto& p : map_) { |
| p.second->NotifyLocked(state, status); |
| } |
| } |
| |
| grpc_connectivity_state |
| Subchannel::HealthWatcherMap::CheckConnectivityStateLocked( |
| Subchannel* subchannel, const char* health_check_service_name) { |
| auto it = map_.find(health_check_service_name); |
| if (it == map_.end()) { |
| // If the health check service name is not found in the map, we're |
| // not currently doing a health check for that service name. If the |
| // subchannel's state without health checking is READY, report |
| // CONNECTING, since that's what we'd be in as soon as we do start a |
| // watch. Otherwise, report the channel's state without health checking. |
| return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING |
| : subchannel->state_; |
| } |
| HealthWatcher* health_watcher = it->second.get(); |
| return health_watcher->state(); |
| } |
| |
| void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); } |
| |
| // |
| // Subchannel |
| // |
| |
| namespace { |
| |
| BackOff::Options ParseArgsForBackoffValues( |
| const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) { |
| grpc_millis initial_backoff_ms = |
| GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; |
| *min_connect_timeout_ms = |
| GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; |
| grpc_millis max_backoff_ms = |
| GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; |
| bool fixed_reconnect_backoff = false; |
| if (args != nullptr) { |
| for (size_t i = 0; i < args->num_args; i++) { |
| if (0 == strcmp(args->args[i].key, |
| "grpc.testing.fixed_reconnect_backoff_ms")) { |
| fixed_reconnect_backoff = true; |
| initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = |
| grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| *min_connect_timeout_ms = grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX}); |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| max_backoff_ms = grpc_channel_arg_get_integer( |
| &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX}); |
| } else if (0 == strcmp(args->args[i].key, |
| GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { |
| fixed_reconnect_backoff = false; |
| initial_backoff_ms = grpc_channel_arg_get_integer( |
| &args->args[i], |
| {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); |
| } |
| } |
| } |
| return BackOff::Options() |
| .set_initial_backoff(initial_backoff_ms) |
| .set_multiplier(fixed_reconnect_backoff |
| ? 1.0 |
| : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) |
| .set_jitter(fixed_reconnect_backoff ? 0.0 |
| : GRPC_SUBCHANNEL_RECONNECT_JITTER) |
| .set_max_backoff(max_backoff_ms); |
| } |
| |
| } // namespace |
| |
| void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange( |
| ConnectivityStateChange state_change) { |
| MutexLock lock(&mu_); |
| connectivity_state_queue_.push_back(std::move(state_change)); |
| } |
| |
| Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange |
| Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() { |
| MutexLock lock(&mu_); |
| GPR_ASSERT(!connectivity_state_queue_.empty()); |
| ConnectivityStateChange state_change = connectivity_state_queue_.front(); |
| connectivity_state_queue_.pop_front(); |
| return state_change; |
| } |
| |
| Subchannel::Subchannel(SubchannelKey* key, |
| OrphanablePtr<SubchannelConnector> connector, |
| const grpc_channel_args* args) |
| : key_(key), |
| connector_(std::move(connector)), |
| backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) { |
| GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); |
| gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS); |
| pollset_set_ = grpc_pollset_set_create(); |
| grpc_resolved_address* addr = |
| static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr))); |
| GetAddressFromSubchannelAddressArg(args, addr); |
| grpc_resolved_address* new_address = nullptr; |
| grpc_channel_args* new_args = nullptr; |
| if (ProxyMapperRegistry::MapAddress(*addr, args, &new_address, &new_args)) { |
| GPR_ASSERT(new_address != nullptr); |
| gpr_free(addr); |
| addr = new_address; |
| } |
| static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; |
| grpc_arg new_arg = CreateSubchannelAddressArg(addr); |
| gpr_free(addr); |
| args_ = grpc_channel_args_copy_and_add_and_remove( |
| new_args != nullptr ? new_args : args, keys_to_remove, |
| GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); |
| gpr_free(new_arg.value.string); |
| if (new_args != nullptr) grpc_channel_args_destroy(new_args); |
| GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, |
| grpc_schedule_on_exec_ctx); |
| const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ); |
| const bool channelz_enabled = |
| grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); |
| arg = grpc_channel_args_find( |
| args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); |
| const grpc_integer_options options = { |
| GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; |
| size_t channel_tracer_max_memory = |
| static_cast<size_t>(grpc_channel_arg_get_integer(arg, options)); |
| if (channelz_enabled) { |
| channelz_node_ = MakeRefCounted<channelz::SubchannelNode>( |
| GetTargetAddress(), channel_tracer_max_memory); |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("subchannel created")); |
| } |
| } |
| |
| Subchannel::~Subchannel() { |
| if (channelz_node_ != nullptr) { |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string("Subchannel destroyed")); |
| channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN); |
| } |
| grpc_channel_args_destroy(args_); |
| connector_.reset(); |
| grpc_pollset_set_destroy(pollset_set_); |
| delete key_; |
| } |
| |
| Subchannel* Subchannel::Create(OrphanablePtr<SubchannelConnector> connector, |
| const grpc_channel_args* args) { |
| SubchannelKey* key = new SubchannelKey(args); |
| SubchannelPoolInterface* subchannel_pool = |
| SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args); |
| GPR_ASSERT(subchannel_pool != nullptr); |
| Subchannel* c = subchannel_pool->FindSubchannel(key); |
| if (c != nullptr) { |
| delete key; |
| return c; |
| } |
| c = new Subchannel(key, std::move(connector), args); |
| // Try to register the subchannel before setting the subchannel pool. |
| // Otherwise, in case of a registration race, unreffing c in |
| // RegisterSubchannel() will cause c to be tried to be unregistered, while |
| // its key maps to a different subchannel. |
| Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); |
| if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); |
| return registered; |
| } |
| |
| void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) { |
| MutexLock lock(&mu_); |
| // Only update the value if the new keepalive time is larger. |
| if (new_keepalive_time > keepalive_time_) { |
| keepalive_time_ = new_keepalive_time; |
| if (grpc_trace_subchannel.enabled()) { |
| gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this, |
| new_keepalive_time); |
| } |
| const grpc_arg arg_to_add = grpc_channel_arg_integer_create( |
| const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time); |
| const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS; |
| grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
| args_, &arg_to_remove, 1, &arg_to_add, 1); |
| grpc_channel_args_destroy(args_); |
| args_ = new_args; |
| } |
| } |
| |
| Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = RefMutate((1 << INTERNAL_REF_BITS), |
| 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF")); |
| GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); |
| return this; |
| } |
| |
| void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| // add a weak ref and subtract a strong ref (atomically) |
| old_refs = RefMutate( |
| static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS), |
| 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF")); |
| if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { |
| Disconnect(); |
| } |
| GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref"); |
| } |
| |
| Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF")); |
| GPR_ASSERT(old_refs != 0); |
| return this; |
| } |
| |
| namespace { |
| |
| void subchannel_destroy(void* arg, grpc_error* /*error*/) { |
| Subchannel* self = static_cast<Subchannel*>(arg); |
| delete self; |
| } |
| |
| } // namespace |
| |
| void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { |
| gpr_atm old_refs; |
| old_refs = RefMutate(-static_cast<gpr_atm>(1), |
| 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF")); |
| if (old_refs == 1) { |
| ExecCtx::Run(DEBUG_LOCATION, |
| GRPC_CLOSURE_CREATE(subchannel_destroy, this, |
| grpc_schedule_on_exec_ctx), |
| GRPC_ERROR_NONE); |
| } |
| } |
| |
| Subchannel* Subchannel::RefFromWeakRef() { |
| for (;;) { |
| gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_); |
| if (old_refs >= (1 << INTERNAL_REF_BITS)) { |
| gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); |
| if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) { |
| return this; |
| } |
| } else { |
| return nullptr; |
| } |
| } |
| } |
| |
| const char* Subchannel::GetTargetAddress() { |
| const grpc_arg* addr_arg = |
| grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS); |
| const char* addr_str = grpc_channel_arg_get_string(addr_arg); |
| GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. |
| return addr_str; |
| } |
| |
| channelz::SubchannelNode* Subchannel::channelz_node() { |
| return channelz_node_.get(); |
| } |
| |
| grpc_connectivity_state Subchannel::CheckConnectivityState( |
| const char* health_check_service_name, |
| RefCountedPtr<ConnectedSubchannel>* connected_subchannel) { |
| MutexLock lock(&mu_); |
| grpc_connectivity_state state; |
| if (health_check_service_name == nullptr) { |
| state = state_; |
| } else { |
| state = health_watcher_map_.CheckConnectivityStateLocked( |
| this, health_check_service_name); |
| } |
| if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) { |
| *connected_subchannel = connected_subchannel_; |
| } |
| return state; |
| } |
| |
| void Subchannel::WatchConnectivityState( |
| grpc_connectivity_state initial_state, |
| grpc_core::UniquePtr<char> health_check_service_name, |
| RefCountedPtr<ConnectivityStateWatcherInterface> watcher) { |
| MutexLock lock(&mu_); |
| grpc_pollset_set* interested_parties = watcher->interested_parties(); |
| if (interested_parties != nullptr) { |
| grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); |
| } |
| if (health_check_service_name == nullptr) { |
| if (state_ != initial_state) { |
| new AsyncWatcherNotifierLocked(watcher, this, state_, status_); |
| } |
| watcher_list_.AddWatcherLocked(std::move(watcher)); |
| } else { |
| health_watcher_map_.AddWatcherLocked(this, initial_state, |
| std::move(health_check_service_name), |
| std::move(watcher)); |
| } |
| } |
| |
| void Subchannel::CancelConnectivityStateWatch( |
| const char* health_check_service_name, |
| ConnectivityStateWatcherInterface* watcher) { |
| MutexLock lock(&mu_); |
| grpc_pollset_set* interested_parties = watcher->interested_parties(); |
| if (interested_parties != nullptr) { |
| grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); |
| } |
| if (health_check_service_name == nullptr) { |
| watcher_list_.RemoveWatcherLocked(watcher); |
| } else { |
| health_watcher_map_.RemoveWatcherLocked(health_check_service_name, watcher); |
| } |
| } |
| |
| void Subchannel::AttemptToConnect() { |
| MutexLock lock(&mu_); |
| MaybeStartConnectingLocked(); |
| } |
| |
| void Subchannel::ResetBackoff() { |
| MutexLock lock(&mu_); |
| backoff_.Reset(); |
| if (have_retry_alarm_) { |
| retry_immediately_ = true; |
| grpc_timer_cancel(&retry_alarm_); |
| } else { |
| backoff_begun_ = false; |
| MaybeStartConnectingLocked(); |
| } |
| } |
| |
| grpc_arg Subchannel::CreateSubchannelAddressArg( |
| const grpc_resolved_address* addr) { |
| return grpc_channel_arg_string_create( |
| (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, |
| gpr_strdup(addr->len > 0 ? grpc_sockaddr_to_uri(addr).c_str() : "")); |
| } |
| |
| const char* Subchannel::GetUriFromSubchannelAddressArg( |
| const grpc_channel_args* args) { |
| const grpc_arg* addr_arg = |
| grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); |
| const char* addr_str = grpc_channel_arg_get_string(addr_arg); |
| GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. |
| return addr_str; |
| } |
| |
| namespace { |
| |
| void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) { |
| grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); |
| GPR_ASSERT(uri != nullptr); |
| if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); |
| grpc_uri_destroy(uri); |
| } |
| |
| } // namespace |
| |
| void Subchannel::GetAddressFromSubchannelAddressArg( |
| const grpc_channel_args* args, grpc_resolved_address* addr) { |
| const char* addr_uri_str = GetUriFromSubchannelAddressArg(args); |
| memset(addr, 0, sizeof(*addr)); |
| if (*addr_uri_str != '\0') { |
| UriToSockaddr(addr_uri_str, addr); |
| } |
| } |
| |
| namespace { |
| |
| // Returns a string indicating the subchannel's connectivity state change to |
| // \a state. |
| const char* SubchannelConnectivityStateChangeString( |
| grpc_connectivity_state state) { |
| switch (state) { |
| case GRPC_CHANNEL_IDLE: |
| return "Subchannel state change to IDLE"; |
| case GRPC_CHANNEL_CONNECTING: |
| return "Subchannel state change to CONNECTING"; |
| case GRPC_CHANNEL_READY: |
| return "Subchannel state change to READY"; |
| case GRPC_CHANNEL_TRANSIENT_FAILURE: |
| return "Subchannel state change to TRANSIENT_FAILURE"; |
| case GRPC_CHANNEL_SHUTDOWN: |
| return "Subchannel state change to SHUTDOWN"; |
| } |
| GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
| } |
| |
| } // namespace |
| |
| // Note: Must be called with a state that is different from the current state. |
| void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, |
| const absl::Status& status) { |
| state_ = state; |
| status_ = status; |
| if (channelz_node_ != nullptr) { |
| channelz_node_->UpdateConnectivityState(state); |
| channelz_node_->AddTraceEvent( |
| channelz::ChannelTrace::Severity::Info, |
| grpc_slice_from_static_string( |
| SubchannelConnectivityStateChangeString(state))); |
| } |
| // Notify non-health watchers. |
| watcher_list_.NotifyLocked(this, state, status); |
| // Notify health watchers. |
| health_watcher_map_.NotifyLocked(state, status); |
| } |
| |
| void Subchannel::MaybeStartConnectingLocked() { |
| if (disconnected_) { |
| // Don't try to connect if we're already disconnected. |
| return; |
| } |
| if (connecting_) { |
| // Already connecting: don't restart. |
| return; |
| } |
| if (connected_subchannel_ != nullptr) { |
| // Already connected: don't restart. |
| return; |
| } |
| connecting_ = true; |
| GRPC_SUBCHANNEL_WEAK_REF(this, "connecting"); |
| if (!backoff_begun_) { |
| backoff_begun_ = true; |
| ContinueConnectingLocked(); |
| } else { |
| GPR_ASSERT(!have_retry_alarm_); |
| have_retry_alarm_ = true; |
| const grpc_millis time_til_next = |
| next_attempt_deadline_ - ExecCtx::Get()->Now(); |
| if (time_til_next <= 0) { |
| gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this); |
| } else { |
| gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", |
| this, time_til_next); |
| } |
| GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this, |
| grpc_schedule_on_exec_ctx); |
| grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_); |
| } |
| } |
| |
| void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { |
| Subchannel* c = static_cast<Subchannel*>(arg); |
| // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use |
| // MutexLock instead of ReleasableMutexLock, here. |
| ReleasableMutexLock lock(&c->mu_); |
| c->have_retry_alarm_ = false; |
| if (c->disconnected_) { |
| error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", |
| &error, 1); |
| } else if (c->retry_immediately_) { |
| c->retry_immediately_ = false; |
| error = GRPC_ERROR_NONE; |
| } else { |
| GRPC_ERROR_REF(error); |
| } |
| if (error == GRPC_ERROR_NONE) { |
| gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); |
| c->ContinueConnectingLocked(); |
| lock.Unlock(); |
| } else { |
| lock.Unlock(); |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } |
| GRPC_ERROR_UNREF(error); |
| } |
| |
| void Subchannel::ContinueConnectingLocked() { |
| SubchannelConnector::Args args; |
| args.interested_parties = pollset_set_; |
| const grpc_millis min_deadline = |
| min_connect_timeout_ms_ + ExecCtx::Get()->Now(); |
| next_attempt_deadline_ = backoff_.NextAttemptTime(); |
| args.deadline = std::max(next_attempt_deadline_, min_deadline); |
| args.channel_args = args_; |
| SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status()); |
| connector_->Connect(args, &connecting_result_, &on_connecting_finished_); |
| } |
| |
| void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { |
| auto* c = static_cast<Subchannel*>(arg); |
| const grpc_channel_args* delete_channel_args = |
| c->connecting_result_.channel_args; |
| GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); |
| { |
| MutexLock lock(&c->mu_); |
| c->connecting_ = false; |
| if (c->connecting_result_.transport != nullptr && |
| c->PublishTransportLocked()) { |
| // Do nothing, transport was published. |
| } else if (c->disconnected_) { |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } else { |
| gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error)); |
| c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, |
| grpc_error_to_absl_status(error)); |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); |
| } |
| } |
| GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); |
| grpc_channel_args_destroy(delete_channel_args); |
| } |
| |
| namespace { |
| |
| void ConnectionDestroy(void* arg, grpc_error* /*error*/) { |
| grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg); |
| grpc_channel_stack_destroy(stk); |
| gpr_free(stk); |
| } |
| |
| } // namespace |
| |
| bool Subchannel::PublishTransportLocked() { |
| // Construct channel stack. |
| grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); |
| grpc_channel_stack_builder_set_channel_arguments( |
| builder, connecting_result_.channel_args); |
| grpc_channel_stack_builder_set_transport(builder, |
| connecting_result_.transport); |
| if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { |
| grpc_channel_stack_builder_destroy(builder); |
| return false; |
| } |
| grpc_channel_stack* stk; |
| grpc_error* error = grpc_channel_stack_builder_finish( |
| builder, 0, 1, ConnectionDestroy, nullptr, |
| reinterpret_cast<void**>(&stk)); |
| if (error != GRPC_ERROR_NONE) { |
| grpc_transport_destroy(connecting_result_.transport); |
| gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", |
| grpc_error_string(error)); |
| GRPC_ERROR_UNREF(error); |
| return false; |
| } |
| RefCountedPtr<channelz::SocketNode> socket = |
| std::move(connecting_result_.socket_node); |
| connecting_result_.Reset(); |
| if (disconnected_) { |
| grpc_channel_stack_destroy(stk); |
| gpr_free(stk); |
| return false; |
| } |
| // Publish. |
| connected_subchannel_.reset( |
| new ConnectedSubchannel(stk, args_, channelz_node_)); |
| gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", |
| connected_subchannel_.get(), this); |
| if (channelz_node_ != nullptr) { |
| channelz_node_->SetChildSocket(std::move(socket)); |
| } |
| // Start watching connected subchannel. |
| connected_subchannel_->StartWatch( |
| pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(this)); |
| // Report initial state. |
| SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status()); |
| return true; |
| } |
| |
| void Subchannel::Disconnect() { |
| // The subchannel_pool is only used once here in this subchannel, so the |
| // access can be outside of the lock. |
| if (subchannel_pool_ != nullptr) { |
| subchannel_pool_->UnregisterSubchannel(key_); |
| subchannel_pool_.reset(); |
| } |
| MutexLock lock(&mu_); |
| GPR_ASSERT(!disconnected_); |
| disconnected_ = true; |
| connector_.reset(); |
| connected_subchannel_.reset(); |
| health_watcher_map_.ShutdownLocked(); |
| } |
| |
| gpr_atm Subchannel::RefMutate( |
| gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) { |
| gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta) |
| : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta); |
| #ifndef NDEBUG |
| if (grpc_trace_subchannel_refcount.enabled()) { |
| gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, |
| "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this, |
| purpose, old_val, old_val + delta, reason); |
| } |
| #endif |
| return old_val; |
| } |
| |
| } // namespace grpc_core |