blob: b7dd1a19db428014d8c01c56560a5dcf22c7883b [file] [log] [blame]
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <set>
#include <string>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel_factory.h"
#include "src/core/ext/filters/client_channel/config_selector.h"
#include "src/core/ext/filters/client_channel/dynamic_filters.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/resolver.h"
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/work_serializer.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
//
// Client channel filter
//
// A client channel is a channel that begins disconnected, and can connect
// to some endpoint on demand. If that endpoint disconnects, it will be
// connected to again later.
//
// Calls on a disconnected client channel are queued until a connection is
// established.
// Channel arg key for server URI string.
#define GRPC_ARG_SERVER_URI "grpc.server_uri"
// Channel arg containing a pointer to the ClientChannel object.
#define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel"
// Channel arg containing a pointer to the ServiceConfig object.
#define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj"
// Max number of batches that can be pending on a call at any given
// time. This includes one batch for each of the following ops:
// recv_initial_metadata
// send_initial_metadata
// recv_message
// send_message
// recv_trailing_metadata
// send_trailing_metadata
#define MAX_PENDING_BATCHES 6
namespace grpc_core {
class ClientChannel {
public:
static const grpc_channel_filter kFilterVtable;
class LoadBalancedCall;
// Returns the ClientChannel object from channel, or null if channel
// is not a client channel.
static ClientChannel* GetFromChannel(grpc_channel* channel);
grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
// Starts a one-time connectivity state watch. When the channel's state
// becomes different from *state, sets *state to the new state and
// schedules on_complete. The watcher_timer_init callback is invoked as
// soon as the watch is actually started (i.e., after hopping into the
// client channel combiner). I/O will be serviced via pollent.
//
// This is intended to be used when starting a watch from outside of C-core
// via grpc_channel_watch_connectivity_state(). It should not be used
// by other callers.
void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init) {
new ExternalConnectivityWatcher(this, pollent, state, on_complete,
watcher_timer_init);
}
// Cancels a pending external watcher previously added by
// AddExternalConnectivityWatcher().
void CancelExternalConnectivityWatcher(grpc_closure* on_complete) {
ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
this, on_complete, /*cancel=*/true);
}
int NumExternalConnectivityWatchers() const {
MutexLock lock(&external_watchers_mu_);
return static_cast<int>(external_watchers_.size());
}
// Starts and stops a connectivity watch. The watcher will be initially
// notified as soon as the state changes from initial_state and then on
// every subsequent state change until either the watch is stopped or
// it is notified that the state has changed to SHUTDOWN.
//
// This is intended to be used when starting watches from code inside of
// C-core (e.g., for a nested control plane channel for things like xds).
void AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher);
RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete);
private:
class CallData;
class ResolverResultHandler;
class SubchannelWrapper;
class ClientChannelControlHelper;
class ConnectivityWatcherAdder;
class ConnectivityWatcherRemover;
// Represents a pending connectivity callback from an external caller
// via grpc_client_channel_watch_connectivity_state().
class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
public:
ExternalConnectivityWatcher(ClientChannel* chand,
grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init);
~ExternalConnectivityWatcher() override;
// Removes the watcher from the external_watchers_ map.
static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
grpc_closure* on_complete,
bool cancel);
void Notify(grpc_connectivity_state state,
const absl::Status& /* status */) override;
void Cancel();
private:
// Adds the watcher to state_tracker_. Consumes the ref that is passed to it
// from Start().
void AddWatcherLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
void RemoveWatcherLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
ClientChannel* chand_;
grpc_polling_entity pollent_;
grpc_connectivity_state initial_state_;
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
Atomic<bool> done_{false};
};
struct ResolverQueuedCall {
grpc_call_element* elem;
ResolverQueuedCall* next = nullptr;
};
struct LbQueuedCall {
LoadBalancedCall* lb_call;
LbQueuedCall* next = nullptr;
};
ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error);
~ClientChannel();
// Filter vtable functions.
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op);
static void GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info);
// Note: Does NOT return a new ref.
grpc_error_handle disconnect_error() const {
return disconnect_error_.Load(MemoryOrder::ACQUIRE);
}
// Note: All methods with "Locked" suffix must be invoked from within
// work_serializer_.
void OnResolverResultChangedLocked(Resolver::Result result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void OnResolverErrorLocked(grpc_error_handle error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const grpc_channel_args& args)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void UpdateStateAndPickerLocked(
grpc_connectivity_state state, const absl::Status& status,
const char* reason,
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
const char* lb_policy_name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void UpdateServiceConfigInDataPlaneLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void DestroyResolverAndLbPolicyLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
grpc_error_handle DoPingLocked(grpc_transport_op* op)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void StartTransportOpLocked(grpc_transport_op* op)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
// These methods all require holding resolution_mu_.
void AddResolverQueuedCall(ResolverQueuedCall* call,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
// These methods all require holding data_plane_mu_.
void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
SubchannelInterface* subchannel) const
ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
//
// Fields set at construction and never modified.
//
const bool deadline_checking_enabled_;
const bool enable_retries_;
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
const grpc_channel_args* channel_args_;
RefCountedPtr<ServiceConfig> default_service_config_;
std::string server_name_;
UniquePtr<char> target_uri_;
channelz::ChannelNode* channelz_node_;
grpc_pollset_set* interested_parties_;
//
// Fields related to name resolution. Guarded by resolution_mu_.
//
mutable Mutex resolution_mu_;
// Linked list of calls queued waiting for resolver result.
ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
nullptr;
// Data from service config.
grpc_error_handle resolver_transient_failure_error_
ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE;
bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false;
RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_);
RefCountedPtr<ConfigSelector> config_selector_
ABSL_GUARDED_BY(resolution_mu_);
RefCountedPtr<DynamicFilters> dynamic_filters_
ABSL_GUARDED_BY(resolution_mu_);
//
// Fields used in the data plane. Guarded by data_plane_mu_.
//
mutable Mutex data_plane_mu_;
std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
ABSL_GUARDED_BY(data_plane_mu_);
// Linked list of calls queued waiting for LB pick.
LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr;
//
// Fields used in the control plane. Guarded by work_serializer.
//
std::shared_ptr<WorkSerializer> work_serializer_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_);
OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(work_serializer_);
bool previous_resolution_contained_addresses_
ABSL_GUARDED_BY(work_serializer_) = false;
RefCountedPtr<ServiceConfig> saved_service_config_
ABSL_GUARDED_BY(work_serializer_);
RefCountedPtr<ConfigSelector> saved_config_selector_
ABSL_GUARDED_BY(work_serializer_);
absl::optional<std::string> health_check_service_name_
ABSL_GUARDED_BY(work_serializer_);
OrphanablePtr<LoadBalancingPolicy> lb_policy_
ABSL_GUARDED_BY(work_serializer_);
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_
ABSL_GUARDED_BY(work_serializer_);
// The number of SubchannelWrapper instances referencing a given Subchannel.
std::map<Subchannel*, int> subchannel_refcount_map_
ABSL_GUARDED_BY(work_serializer_);
// The set of SubchannelWrappers that currently exist.
// No need to hold a ref, since the map is updated in the control-plane
// work_serializer when the SubchannelWrappers are created and destroyed.
std::set<SubchannelWrapper*> subchannel_wrappers_
ABSL_GUARDED_BY(work_serializer_);
// Pending ConnectedSubchannel updates for each SubchannelWrapper.
// Updates are queued here in the control plane work_serializer and then
// applied in the data plane mutex when the picker is updated.
std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_);
int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1;
//
// Fields accessed from both data plane mutex and control plane
// work_serializer.
//
Atomic<grpc_error_handle> disconnect_error_;
//
// Fields guarded by a mutex, since they need to be accessed
// synchronously via get_channel_info().
//
Mutex info_mu_;
UniquePtr<char> info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_);
UniquePtr<char> info_service_config_json_ ABSL_GUARDED_BY(info_mu_);
//
// Fields guarded by a mutex, since they need to be accessed
// synchronously via grpc_channel_num_external_connectivity_watchers().
//
mutable Mutex external_watchers_mu_;
std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_);
};
//
// ClientChannel::LoadBalancedCall
//
// This object is ref-counted, but it cannot inherit from RefCounted<>,
// because it is allocated on the arena and can't free its memory when
// its refcount goes to zero. So instead, it manually implements the
// same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
class ClientChannel::LoadBalancedCall
: public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> {
public:
// If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed.
// If it is null, then the caller is responsible for checking whether
// the LB call has a subchannel call and ensuring that the
// on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed.
LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete);
~LoadBalancedCall() override;
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Invoked by channel for queued LB picks when the picker is updated.
static void PickSubchannel(void* arg, grpc_error_handle error);
// Helper function for performing an LB pick while holding the data plane
// mutex. Returns true if the pick is complete, in which case the caller
// must invoke PickDone() or AsyncPickDone() with the returned error.
bool PickSubchannelLocked(grpc_error_handle* error)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Schedules a callback to process the completed pick. The callback
// will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error);
RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_;
}
private:
class LbQueuedCallCanceller;
class Metadata;
class LbCallState;
// Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
static void FailPendingBatchInCallCombiner(void* arg,
grpc_error_handle error);
// A predicate type and some useful implementations for PendingBatchesFail().
typedef bool (*YieldCallCombinerPredicate)(
const CallCombinerClosureList& closures);
static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
return true;
}
static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
return false;
}
static bool YieldCallCombinerIfPendingBatchesFound(
const CallCombinerClosureList& closures) {
return closures.size() > 0;
}
// Fails all pending batches.
// If yield_call_combiner_predicate returns true, assumes responsibility for
// yielding the call combiner.
void PendingBatchesFail(
grpc_error_handle error,
YieldCallCombinerPredicate yield_call_combiner_predicate);
static void ResumePendingBatchInCallCombiner(void* arg,
grpc_error_handle ignored);
// Resumes all pending batches on subchannel_call_.
void PendingBatchesResume();
static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
void* arg, grpc_error_handle error);
void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch);
void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error_handle error);
// Removes the call from the channel's list of queued picks if present.
void MaybeRemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
// Adds the call to the channel's list of queued picks if not already present.
void MaybeAddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
ClientChannel* chand_;
// TODO(roth): Instead of duplicating these fields in every filter
// that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead.
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_polling_entity* pollent_;
grpc_closure* on_call_destruction_complete_;
// Set when we get a cancel_stream op.
grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
// Set when we fail inside the LB call.
grpc_error_handle failure_error_ = GRPC_ERROR_NONE;
grpc_closure pick_closure_;
// Accessed while holding ClientChannel::data_plane_mu_.
ClientChannel::LbQueuedCall queued_call_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
false;
LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
std::function<void(grpc_error_handle, LoadBalancingPolicy::MetadataInterface*,
LoadBalancingPolicy::CallState*)>
lb_recv_trailing_metadata_ready_;
RefCountedPtr<SubchannelCall> subchannel_call_;
// For intercepting recv_trailing_metadata_ready for the LB policy.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
// either we have invoked all of the batch's callbacks or we have
// passed the batch down to the subchannel call and are not
// intercepting any of its callbacks).
grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
};
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H