blob: 9144df93c120a517bc2400a6ffb63fa6ebc7e287 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include <inttypes.h>
#include <limits.h>
#include <stdbool.h>
#include <stdio.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
#include "src/core/ext/filters/client_channel/resolving_lb_policy.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/status_metadata.h"
using grpc_core::internal::ClientChannelMethodParsedObject;
using grpc_core::internal::ServerRetryThrottleData;
//
// Client channel filter
//
// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
#define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
// This value was picked arbitrarily. It can be changed if there is
// any even moderately compelling reason to do so.
#define RETRY_BACKOFF_JITTER 0.2
// Max number of batches that can be pending on a call at any given
// time. This includes one batch for each of the following ops:
// recv_initial_metadata
// send_initial_metadata
// recv_message
// send_message
// recv_trailing_metadata
// send_trailing_metadata
#define MAX_PENDING_BATCHES 6
namespace grpc_core {
TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
namespace {
//
// ChannelData definition
//
class ChannelData {
public:
struct QueuedPick {
LoadBalancingPolicy::PickArgs pick;
grpc_call_element* elem;
QueuedPick* next = nullptr;
};
static grpc_error* Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op);
static void GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info);
void set_channelz_node(channelz::ClientChannelNode* node) {
channelz_node_ = node;
resolving_lb_policy_->set_channelz_node(node->Ref());
}
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* child_channels) {
if (resolving_lb_policy_ != nullptr) {
resolving_lb_policy_->FillChildRefsForChannelz(child_subchannels,
child_channels);
}
}
bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
bool enable_retries() const { return enable_retries_; }
size_t per_rpc_retry_buffer_size() const {
return per_rpc_retry_buffer_size_;
}
// Note: Does NOT return a new ref.
grpc_error* disconnect_error() const {
return disconnect_error_.Load(MemoryOrder::ACQUIRE);
}
grpc_combiner* data_plane_combiner() const { return data_plane_combiner_; }
LoadBalancingPolicy::SubchannelPicker* picker() const {
return picker_.get();
}
void AddQueuedPick(QueuedPick* pick, grpc_polling_entity* pollent);
void RemoveQueuedPick(QueuedPick* to_remove, grpc_polling_entity* pollent);
bool received_service_config_data() const {
return received_service_config_data_;
}
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
return retry_throttle_data_;
}
RefCountedPtr<ServiceConfig> service_config() const {
return service_config_;
}
grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init) {
// Will delete itself.
New<ExternalConnectivityWatcher>(this, pollent, state, on_complete,
watcher_timer_init);
}
int NumExternalConnectivityWatchers() const {
return external_connectivity_watcher_list_.size();
}
private:
class ConnectivityStateAndPickerSetter;
class ServiceConfigSetter;
class ClientChannelControlHelper;
class ExternalConnectivityWatcher {
public:
class WatcherList {
public:
WatcherList() { gpr_mu_init(&mu_); }
~WatcherList() { gpr_mu_destroy(&mu_); }
int size() const;
ExternalConnectivityWatcher* Lookup(grpc_closure* on_complete) const;
void Add(ExternalConnectivityWatcher* watcher);
void Remove(const ExternalConnectivityWatcher* watcher);
private:
// head_ is guarded by a mutex, since the size() method needs to
// iterate over the list, and it's called from the C-core API
// function grpc_channel_num_external_connectivity_watchers(), which
// is synchronous and therefore cannot run in the combiner.
mutable gpr_mu mu_;
ExternalConnectivityWatcher* head_ = nullptr;
};
ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state,
grpc_closure* on_complete,
grpc_closure* watcher_timer_init);
~ExternalConnectivityWatcher();
private:
static void OnWatchCompleteLocked(void* arg, grpc_error* error);
static void WatchConnectivityStateLocked(void* arg, grpc_error* ignored);
ChannelData* chand_;
grpc_polling_entity pollent_;
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
grpc_closure my_closure_;
ExternalConnectivityWatcher* next_ = nullptr;
};
ChannelData(grpc_channel_element_args* args, grpc_error** error);
~ChannelData();
static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);
grpc_error* DoPingLocked(grpc_transport_op* op);
static void StartTransportOpLocked(void* arg, grpc_error* ignored);
static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
void ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedObject* parsed_object,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config);
//
// Fields set at construction and never modified.
//
const bool deadline_checking_enabled_;
const bool enable_retries_;
const size_t per_rpc_retry_buffer_size_;
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
UniquePtr<char> server_name_;
RefCountedPtr<ServiceConfig> default_service_config_;
// Initialized shortly after construction.
channelz::ClientChannelNode* channelz_node_ = nullptr;
//
// Fields used in the data plane. Guarded by data_plane_combiner.
//
grpc_combiner* data_plane_combiner_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
QueuedPick* queued_picks_ = nullptr; // Linked list of queued picks.
// Data from service config.
bool received_service_config_data_ = false;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
RefCountedPtr<ServiceConfig> service_config_;
//
// Fields used in the control plane. Guarded by combiner.
//
grpc_combiner* combiner_;
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool set_service_config_ = false;
//
// Fields accessed from both data plane and control plane combiners.
//
Atomic<grpc_error*> disconnect_error_;
//
// Fields guarded by a mutex, since they need to be accessed
// synchronously via get_channel_info().
//
gpr_mu info_mu_;
UniquePtr<char> info_lb_policy_name_;
UniquePtr<char> info_service_config_json_;
};
//
// CallData definition
//
class CallData {
public:
static grpc_error* Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
RefCountedPtr<SubchannelCall> subchannel_call() { return subchannel_call_; }
// Invoked by channel for queued picks once resolver results are available.
void MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem);
// Invoked by channel for queued picks when the picker is updated.
static void StartPickLocked(void* arg, grpc_error* error);
private:
class QueuedPickCanceller;
// State used for starting a retryable batch on a subchannel call.
// This provides its own grpc_transport_stream_op_batch and other data
// structures needed to populate the ops in the batch.
// We allocate one struct on the arena for each attempt at starting a
// batch on a given subchannel call.
struct SubchannelCallBatchData {
// Creates a SubchannelCallBatchData object on the call's arena with the
// specified refcount. If set_on_complete is true, the batch's
// on_complete callback will be set to point to on_complete();
// otherwise, the batch's on_complete callback will be null.
static SubchannelCallBatchData* Create(grpc_call_element* elem,
int refcount, bool set_on_complete);
void Unref() {
if (gpr_unref(&refs)) Destroy();
}
SubchannelCallBatchData(grpc_call_element* elem, CallData* calld,
int refcount, bool set_on_complete);
// All dtor code must be added in `Destroy()`. This is because we may
// call closures in `SubchannelCallBatchData` after they are unrefed by
// `Unref()`, and msan would complain about accessing this class
// after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
// TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchData() { Destroy(); }
void Destroy();
gpr_refcount refs;
grpc_call_element* elem;
RefCountedPtr<SubchannelCall> subchannel_call;
// The batch to use in the subchannel call.
// Its payload field points to SubchannelCallRetryState::batch_payload.
grpc_transport_stream_op_batch batch;
// For intercepting on_complete.
grpc_closure on_complete;
};
// Retry state associated with a subchannel call.
// Stored in the parent_data of the subchannel call object.
struct SubchannelCallRetryState {
explicit SubchannelCallRetryState(grpc_call_context_element* context)
: batch_payload(context),
started_send_initial_metadata(false),
completed_send_initial_metadata(false),
started_send_trailing_metadata(false),
completed_send_trailing_metadata(false),
started_recv_initial_metadata(false),
completed_recv_initial_metadata(false),
started_recv_trailing_metadata(false),
completed_recv_trailing_metadata(false),
retry_dispatched(false) {}
// SubchannelCallBatchData.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload;
// For send_initial_metadata.
// Note that we need to make a copy of the initial metadata for each
// subchannel call instead of just referring to the copy in call_data,
// because filters in the subchannel stack will probably add entries,
// so we need to start in a pristine state for each attempt of the call.
grpc_linked_mdelem* send_initial_metadata_storage;
grpc_metadata_batch send_initial_metadata;
// For send_message.
// TODO(roth): Restructure this to eliminate use of ManualConstructor.
ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
// For send_trailing_metadata.
grpc_linked_mdelem* send_trailing_metadata_storage;
grpc_metadata_batch send_trailing_metadata;
// For intercepting recv_initial_metadata.
grpc_metadata_batch recv_initial_metadata;
grpc_closure recv_initial_metadata_ready;
bool trailing_metadata_available = false;
// For intercepting recv_message.
grpc_closure recv_message_ready;
OrphanablePtr<ByteStream> recv_message;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
grpc_closure recv_trailing_metadata_ready;
// These fields indicate which ops have been started and completed on
// this subchannel call.
size_t started_send_message_count = 0;
size_t completed_send_message_count = 0;
size_t started_recv_message_count = 0;
size_t completed_recv_message_count = 0;
bool started_send_initial_metadata : 1;
bool completed_send_initial_metadata : 1;
bool started_send_trailing_metadata : 1;
bool completed_send_trailing_metadata : 1;
bool started_recv_initial_metadata : 1;
bool completed_recv_initial_metadata : 1;
bool started_recv_trailing_metadata : 1;
bool completed_recv_trailing_metadata : 1;
// State for callback processing.
SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
nullptr;
grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
grpc_error* recv_message_error = GRPC_ERROR_NONE;
SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
// NOTE: Do not move this next to the metadata bitfields above. That would
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
// fields upon setting this field.
bool retry_dispatched : 1;
};
// Pending batches stored in call data.
struct PendingBatch {
// The pending batch. If nullptr, this slot is empty.
grpc_transport_stream_op_batch* batch;
// Indicates whether payload for send ops has been cached in CallData.
bool send_ops_cached;
};
CallData(grpc_call_element* elem, const ChannelData& chand,
const grpc_call_element_args& args);
~CallData();
// Caches data for send ops so that it can be retried later, if not
// already cached.
void MaybeCacheSendOpsForBatch(PendingBatch* pending);
void FreeCachedSendInitialMetadata(ChannelData* chand);
// Frees cached send_message at index idx.
void FreeCachedSendMessage(ChannelData* chand, size_t idx);
void FreeCachedSendTrailingMetadata(ChannelData* chand);
// Frees cached send ops that have already been completed after
// committing the call.
void FreeCachedSendOpDataAfterCommit(grpc_call_element* elem,
SubchannelCallRetryState* retry_state);
// Frees cached send ops that were completed by the completed batch in
// batch_data. Used when batches are completed after the call is committed.
void FreeCachedSendOpDataForCompletedBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state);
static void MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
const LoadBalancingPolicy::PickArgs& pick,
grpc_transport_stream_op_batch* batch);
// Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
void PendingBatchesAdd(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch);
void PendingBatchClear(PendingBatch* pending);
void MaybeClearPendingBatch(grpc_call_element* elem, PendingBatch* pending);
static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
// A predicate type and some useful implementations for PendingBatchesFail().
typedef bool (*YieldCallCombinerPredicate)(
const CallCombinerClosureList& closures);
static bool YieldCallCombiner(const CallCombinerClosureList& closures) {
return true;
}
static bool NoYieldCallCombiner(const CallCombinerClosureList& closures) {
return false;
}
static bool YieldCallCombinerIfPendingBatchesFound(
const CallCombinerClosureList& closures) {
return closures.size() > 0;
}
// Fails all pending batches.
// If yield_call_combiner_predicate returns true, assumes responsibility for
// yielding the call combiner.
void PendingBatchesFail(
grpc_call_element* elem, grpc_error* error,
YieldCallCombinerPredicate yield_call_combiner_predicate);
static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
// Resumes all pending batches on subchannel_call_.
void PendingBatchesResume(grpc_call_element* elem);
// Returns a pointer to the first pending batch for which predicate(batch)
// returns true, or null if not found.
template <typename Predicate>
PendingBatch* PendingBatchFind(grpc_call_element* elem,
const char* log_message, Predicate predicate);
// Commits the call so that no further retry attempts will be performed.
void RetryCommit(grpc_call_element* elem,
SubchannelCallRetryState* retry_state);
// Starts a retry after appropriate back-off.
void DoRetry(grpc_call_element* elem, SubchannelCallRetryState* retry_state,
grpc_millis server_pushback_ms);
// Returns true if the call is being retried.
bool MaybeRetry(grpc_call_element* elem, SubchannelCallBatchData* batch_data,
grpc_status_code status, grpc_mdelem* server_pushback_md);
// Invokes recv_initial_metadata_ready for a subchannel batch.
static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
// Intercepts recv_initial_metadata_ready callback for retries.
// Commits the call and returns the initial metadata up the stack.
static void RecvInitialMetadataReady(void* arg, grpc_error* error);
// Invokes recv_message_ready for a subchannel batch.
static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
// Intercepts recv_message_ready callback for retries.
// Commits the call and returns the message up the stack.
static void RecvMessageReady(void* arg, grpc_error* error);
// Sets *status and *server_pushback_md based on md_batch and error.
// Only sets *server_pushback_md if server_pushback_md != nullptr.
void GetCallStatus(grpc_call_element* elem, grpc_metadata_batch* md_batch,
grpc_error* error, grpc_status_code* status,
grpc_mdelem** server_pushback_md);
// Adds recv_trailing_metadata_ready closure to closures.
void AddClosureForRecvTrailingMetadataReady(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
grpc_error* error, CallCombinerClosureList* closures);
// Adds any necessary closures for deferred recv_initial_metadata and
// recv_message callbacks to closures.
static void AddClosuresForDeferredRecvCallbacks(
SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
// Returns true if any op in the batch was not yet started.
// Only looks at send ops, since recv ops are always started immediately.
bool PendingBatchIsUnstarted(PendingBatch* pending,
SubchannelCallRetryState* retry_state);
// For any pending batch containing an op that has not yet been started,
// adds the pending batch's completion closures to closures.
void AddClosuresToFailUnstartedPendingBatches(
grpc_call_element* elem, SubchannelCallRetryState* retry_state,
grpc_error* error, CallCombinerClosureList* closures);
// Runs necessary closures upon completion of a call attempt.
void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
grpc_error* error);
// Intercepts recv_trailing_metadata_ready callback for retries.
// Commits the call and returns the trailing metadata up the stack.
static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
// Adds the on_complete closure for the pending batch completed in
// batch_data to closures.
void AddClosuresForCompletedPendingBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, grpc_error* error,
CallCombinerClosureList* closures);
// If there are any cached ops to replay or pending ops to start on the
// subchannel call, adds a closure to closures to invoke
// StartRetriableSubchannelBatches().
void AddClosuresForReplayOrPendingSendOps(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
// Callback used to intercept on_complete from subchannel calls.
// Called only when retries are enabled.
static void OnComplete(void* arg, grpc_error* error);
static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
// Adds a closure to closures that will execute batch in the call combiner.
void AddClosureForSubchannelBatch(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch,
CallCombinerClosureList* closures);
// Adds retriable send_initial_metadata op to batch_data.
void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Adds retriable send_message op to batch_data.
void AddRetriableSendMessageOp(grpc_call_element* elem,
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Adds retriable send_trailing_metadata op to batch_data.
void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Adds retriable recv_initial_metadata op to batch_data.
void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Adds retriable recv_message op to batch_data.
void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Adds retriable recv_trailing_metadata op to batch_data.
void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data);
// Helper function used to start a recv_trailing_metadata batch. This
// is used in the case where a recv_initial_metadata or recv_message
// op fails in a way that we know the call is over but when the application
// has not yet started its own recv_trailing_metadata op.
void StartInternalRecvTrailingMetadata(grpc_call_element* elem);
// If there are any cached send ops that need to be replayed on the
// current subchannel call, creates and returns a new subchannel batch
// to replay those ops. Otherwise, returns nullptr.
SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
grpc_call_element* elem, SubchannelCallRetryState* retry_state);
// Adds subchannel batches for pending batches to closures.
void AddSubchannelBatchesForPendingBatches(
grpc_call_element* elem, SubchannelCallRetryState* retry_state,
CallCombinerClosureList* closures);
// Constructs and starts whatever subchannel batches are needed on the
// subchannel call.
static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
void CreateSubchannelCall(grpc_call_element* elem);
// Invoked when a pick is completed, on both success or failure.
static void PickDone(void* arg, grpc_error* error);
// Removes the call from the channel's list of queued picks.
void RemoveCallFromQueuedPicksLocked(grpc_call_element* elem);
// Adds the call to the channel's list of queued picks.
void AddCallToQueuedPicksLocked(grpc_call_element* elem);
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
void ApplyServiceConfigToCallLocked(grpc_call_element* elem);
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store pointers to the call stack
// and call combiner. If/when we have time, find a way to avoid this
// without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state_;
grpc_slice path_; // Request path.
gpr_timespec call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
ServiceConfig::CallData service_config_call_data_;
const ClientChannelMethodParsedObject* method_params_ = nullptr;
RefCountedPtr<SubchannelCall> subchannel_call_;
// Set when we get a cancel_stream op.
grpc_error* cancel_error_ = GRPC_ERROR_NONE;
ChannelData::QueuedPick pick_;
bool pick_queued_ = false;
bool service_config_applied_ = false;
QueuedPickCanceller* pick_canceller_ = nullptr;
grpc_closure pick_closure_;
grpc_polling_entity* pollent_ = nullptr;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
// either we have invoked all of the batch's callbacks or we have
// passed the batch down to the subchannel call and are not
// intercepting any of its callbacks).
PendingBatch pending_batches_[MAX_PENDING_BATCHES] = {};
bool pending_send_initial_metadata_ : 1;
bool pending_send_message_ : 1;
bool pending_send_trailing_metadata_ : 1;
// Retry state.
bool enable_retries_ : 1;
bool retry_committed_ : 1;
bool last_attempt_got_server_pushback_ : 1;
int num_attempts_completed_ = 0;
size_t bytes_buffered_for_retry_ = 0;
// TODO(roth): Restructure this to eliminate use of ManualConstructor.
ManualConstructor<BackOff> retry_backoff_;
grpc_timer retry_timer_;
// The number of pending retriable subchannel batches containing send ops.
// We hold a ref to the call stack while this is non-zero, since replay
// batches may not complete until after all callbacks have been returned
// to the surface, and we need to make sure that the call is not destroyed
// until all of these batches have completed.
// Note that we actually only need to track replay batches, but it's
// easier to track all batches with send ops.
int num_pending_retriable_subchannel_send_batches_ = 0;
// Cached data for retrying send ops.
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
grpc_metadata_batch send_initial_metadata_;
uint32_t send_initial_metadata_flags_;
gpr_atm* peer_string_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
// use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
InlinedVector<ByteStreamCache*, 3> send_messages_;
// send_trailing_metadata
bool seen_send_trailing_metadata_ = false;
grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
grpc_metadata_batch send_trailing_metadata_;
};
//
// ChannelData::ConnectivityStateAndPickerSetter
//
// A fire-and-forget class that sets the channel's connectivity state
// and then hops into the data plane combiner to update the picker.
// Must be instantiated while holding the control plane combiner.
// Deletes itself when done.
class ChannelData::ConnectivityStateAndPickerSetter {
public:
ConnectivityStateAndPickerSetter(
ChannelData* chand, grpc_connectivity_state state, const char* reason,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker)
: chand_(chand), picker_(std::move(picker)) {
// Update connectivity state here, while holding control plane combiner.
grpc_connectivity_state_set(&chand->state_tracker_, state, reason);
if (chand->channelz_node_ != nullptr) {
chand->channelz_node_->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
GetChannelConnectivityStateChangeString(state)));
}
// Bounce into the data plane combiner to reset the picker.
GRPC_CHANNEL_STACK_REF(chand->owning_stack_,
"ConnectivityStateAndPickerSetter");
GRPC_CLOSURE_INIT(&closure_, SetPicker, this,
grpc_combiner_scheduler(chand->data_plane_combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static const char* GetChannelConnectivityStateChangeString(
grpc_connectivity_state state) {
switch (state) {
case GRPC_CHANNEL_IDLE:
return "Channel state change to IDLE";
case GRPC_CHANNEL_CONNECTING:
return "Channel state change to CONNECTING";
case GRPC_CHANNEL_READY:
return "Channel state change to READY";
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return "Channel state change to TRANSIENT_FAILURE";
case GRPC_CHANNEL_SHUTDOWN:
return "Channel state change to SHUTDOWN";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
static void SetPicker(void* arg, grpc_error* ignored) {
auto* self = static_cast<ConnectivityStateAndPickerSetter*>(arg);
// Update picker.
self->chand_->picker_ = std::move(self->picker_);
// Re-process queued picks.
for (QueuedPick* pick = self->chand_->queued_picks_; pick != nullptr;
pick = pick->next) {
CallData::StartPickLocked(pick->elem, GRPC_ERROR_NONE);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityStateAndPickerSetter");
Delete(self);
}
ChannelData* chand_;
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker_;
grpc_closure closure_;
};
//
// ChannelData::ServiceConfigSetter
//
// A fire-and-forget class that sets the channel's service config data
// in the data plane combiner. Deletes itself when done.
class ChannelData::ServiceConfigSetter {
public:
ServiceConfigSetter(
ChannelData* chand,
Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data,
RefCountedPtr<ServiceConfig> service_config)
: chand_(chand),
retry_throttle_data_(retry_throttle_data),
service_config_(std::move(service_config)) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ServiceConfigSetter");
GRPC_CLOSURE_INIT(&closure_, SetServiceConfigData, this,
grpc_combiner_scheduler(chand->data_plane_combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
private:
static void SetServiceConfigData(void* arg, grpc_error* ignored) {
ServiceConfigSetter* self = static_cast<ServiceConfigSetter*>(arg);
ChannelData* chand = self->chand_;
// Update channel state.
chand->received_service_config_data_ = true;
if (self->retry_throttle_data_.has_value()) {
chand->retry_throttle_data_ =
internal::ServerRetryThrottleMap::GetDataForServer(
chand->server_name_.get(),
self->retry_throttle_data_.value().max_milli_tokens,
self->retry_throttle_data_.value().milli_token_ratio);
}
chand->service_config_ = std::move(self->service_config_);
// Apply service config to queued picks.
for (QueuedPick* pick = chand->queued_picks_; pick != nullptr;
pick = pick->next) {
CallData* calld = static_cast<CallData*>(pick->elem->call_data);
calld->MaybeApplyServiceConfigToCallLocked(pick->elem);
}
// Clean up.
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ServiceConfigSetter");
Delete(self);
}
ChannelData* chand_;
Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data_;
RefCountedPtr<ServiceConfig> service_config_;
grpc_closure closure_;
};
//
// ChannelData::ExternalConnectivityWatcher::WatcherList
//
int ChannelData::ExternalConnectivityWatcher::WatcherList::size() const {
MutexLock lock(&mu_);
int count = 0;
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
++count;
}
return count;
}
ChannelData::ExternalConnectivityWatcher*
ChannelData::ExternalConnectivityWatcher::WatcherList::Lookup(
grpc_closure* on_complete) const {
MutexLock lock(&mu_);
ExternalConnectivityWatcher* w = head_;
while (w != nullptr && w->on_complete_ != on_complete) {
w = w->next_;
}
return w;
}
void ChannelData::ExternalConnectivityWatcher::WatcherList::Add(
ExternalConnectivityWatcher* watcher) {
GPR_ASSERT(Lookup(watcher->on_complete_) == nullptr);
MutexLock lock(&mu_);
GPR_ASSERT(watcher->next_ == nullptr);
watcher->next_ = head_;
head_ = watcher;
}
void ChannelData::ExternalConnectivityWatcher::WatcherList::Remove(
const ExternalConnectivityWatcher* watcher) {
MutexLock lock(&mu_);
if (watcher == head_) {
head_ = watcher->next_;
return;
}
for (ExternalConnectivityWatcher* w = head_; w != nullptr; w = w->next_) {
if (w->next_ == watcher) {
w->next_ = w->next_->next_;
return;
}
}
GPR_UNREACHABLE_CODE(return );
}
//
// ChannelData::ExternalConnectivityWatcher
//
ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
ChannelData* chand, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete,
grpc_closure* watcher_timer_init)
: chand_(chand),
pollent_(pollent),
state_(state),
on_complete_(on_complete),
watcher_timer_init_(watcher_timer_init) {
grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&my_closure_, WatchConnectivityStateLocked, this,
grpc_combiner_scheduler(chand_->combiner_)),
GRPC_ERROR_NONE);
}
ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
grpc_polling_entity_del_from_pollset_set(&pollent_,
chand_->interested_parties_);
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
"ExternalConnectivityWatcher");
}
void ChannelData::ExternalConnectivityWatcher::OnWatchCompleteLocked(
void* arg, grpc_error* error) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
grpc_closure* on_complete = self->on_complete_;
self->chand_->external_connectivity_watcher_list_.Remove(self);
Delete(self);
GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
}
void ChannelData::ExternalConnectivityWatcher::WatchConnectivityStateLocked(
void* arg, grpc_error* ignored) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
if (self->state_ == nullptr) {
// Handle cancellation.
GPR_ASSERT(self->watcher_timer_init_ == nullptr);
ExternalConnectivityWatcher* found =
self->chand_->external_connectivity_watcher_list_.Lookup(
self->on_complete_);
if (found != nullptr) {
grpc_connectivity_state_notify_on_state_change(
&found->chand_->state_tracker_, nullptr, &found->my_closure_);
}
Delete(self);
return;
}
// New watcher.
self->chand_->external_connectivity_watcher_list_.Add(self);
// This assumes that the closure is scheduled on the ExecCtx scheduler
// and that GRPC_CLOSURE_RUN would run the closure immediately.
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&self->my_closure_, OnWatchCompleteLocked, self,
grpc_combiner_scheduler(self->chand_->combiner_));
grpc_connectivity_state_notify_on_state_change(
&self->chand_->state_tracker_, self->state_, &self->my_closure_);
}
//
// ChannelData::ClientChannelControlHelper
//
class ChannelData::ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper {
public:
explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
}
~ClientChannelControlHelper() override {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
"ClientChannelControlHelper");
}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
grpc_arg args_to_add[2];
int num_args_to_add = 0;
if (chand_->health_check_service_name_ != nullptr) {
args_to_add[0] = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(chand_->health_check_service_name_.get()));
num_args_to_add++;
}
args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
return subchannel;
}
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override {
return chand_->client_channel_factory_->CreateChannel(target, &args);
}
void UpdateState(
grpc_connectivity_state state,
UniquePtr<LoadBalancingPolicy::SubchannelPicker> picker) override {
grpc_error* disconnect_error =
chand_->disconnect_error_.Load(MemoryOrder::ACQUIRE);
if (grpc_client_channel_routing_trace.enabled()) {
const char* extra = disconnect_error == GRPC_ERROR_NONE
? ""
: " (ignoring -- channel shutting down)";
gpr_log(GPR_INFO, "chand=%p: update: state=%s picker=%p%s", chand_,
grpc_connectivity_state_name(state), picker.get(), extra);
}
// Do update only if not shutting down.
if (disconnect_error == GRPC_ERROR_NONE) {
// Will delete itself.
New<ConnectivityStateAndPickerSetter>(chand_, state, "helper",
std::move(picker));
}
}
// No-op -- we should never get this from ResolvingLoadBalancingPolicy.
void RequestReresolution() override {}
private:
ChannelData* chand_;
};
//
// ChannelData implementation
//
grpc_error* ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
grpc_error* error = GRPC_ERROR_NONE;
new (elem->channel_data) ChannelData(args, &error);
return error;
}
void ChannelData::Destroy(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
bool GetEnableRetries(const grpc_channel_args* args) {
return grpc_channel_arg_get_bool(
grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
}
size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
return static_cast<size_t>(grpc_channel_arg_get_integer(
grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
{DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
}
RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
const grpc_channel_args* args) {
const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
if (use_local_subchannel_pool) {
return MakeRefCounted<LocalSubchannelPool>();
}
return GlobalSubchannelPool::instance();
}
ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
: deadline_checking_enabled_(
grpc_deadline_checking_enabled(args->channel_args)),
enable_retries_(GetEnableRetries(args->channel_args)),
per_rpc_retry_buffer_size_(
GetMaxPerRpcRetryBufferSize(args->channel_args)),
owning_stack_(args->channel_stack),
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
data_plane_combiner_(grpc_combiner_create()),
combiner_(grpc_combiner_create()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
disconnect_error_(GRPC_ERROR_NONE) {
// Initialize data members.
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"client_channel");
gpr_mu_init(&info_mu_);
// Start backup polling.
grpc_client_channel_start_backup_polling(interested_parties_);
// Check client channel factory.
if (client_channel_factory_ == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Missing client channel factory in args for client channel filter");
return;
}
// Get server name to resolve, using proxy mapper if needed.
const char* server_uri = grpc_channel_arg_get_string(
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
if (server_uri == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"server URI channel arg missing or wrong type in client channel "
"filter");
return;
}
// Get default service config
const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
// TODO(yashkt): Make sure we set the channel in TRANSIENT_FAILURE on an
// invalid default service config
if (service_config_json != nullptr) {
*error = GRPC_ERROR_NONE;
default_service_config_ = ServiceConfig::Create(service_config_json, error);
if (*error != GRPC_ERROR_NONE) {
default_service_config_.reset();
return;
}
}
grpc_uri* uri = grpc_uri_parse(server_uri, true);
if (uri != nullptr && uri->path[0] != '\0') {
server_name_.reset(
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path));
}
grpc_uri_destroy(uri);
char* proxy_name = nullptr;
grpc_channel_args* new_args = nullptr;
grpc_proxy_mappers_map_name(server_uri, args->channel_args, &proxy_name,
&new_args);
UniquePtr<char> target_uri(proxy_name != nullptr ? proxy_name
: gpr_strdup(server_uri));
// Instantiate resolving LB policy.
LoadBalancingPolicy::Args lb_args;
lb_args.combiner = combiner_;
lb_args.channel_control_helper =
UniquePtr<LoadBalancingPolicy::ChannelControlHelper>(
New<ClientChannelControlHelper>(this));
lb_args.args = new_args != nullptr ? new_args : args->channel_args;
resolving_lb_policy_.reset(New<ResolvingLoadBalancingPolicy>(
std::move(lb_args), &grpc_client_channel_routing_trace,
std::move(target_uri), ProcessResolverResultLocked, this, error));
grpc_channel_args_destroy(new_args);
if (*error != GRPC_ERROR_NONE) {
// Orphan the resolving LB policy and flush the exec_ctx to ensure
// that it finishes shutting down. This ensures that if we are
// failing, we destroy the ClientChannelControlHelper (and thus
// unref the channel stack) before we return.
// TODO(roth): This is not a complete solution, because it only
// catches the case where channel stack initialization fails in this
// particular filter. If there is a failure in a different filter, we
// will leave a dangling ref here, which can cause a crash. Fortunately,
// in practice, there are no other filters that can cause failures in
// channel stack initialization, so this works for now.
resolving_lb_policy_.reset();
ExecCtx::Get()->Flush();
} else {
grpc_pollset_set_add_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: created resolving_lb_policy=%p", this,
resolving_lb_policy_.get());
}
}
}
ChannelData::~ChannelData() {
if (resolving_lb_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(resolving_lb_policy_->interested_parties(),
interested_parties_);
resolving_lb_policy_.reset();
}
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(data_plane_combiner_, "client_channel");
GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
grpc_connectivity_state_destroy(&state_tracker_);
gpr_mu_destroy(&info_mu_);
}
void ChannelData::ProcessLbPolicy(
const Resolver::Result& resolver_result,
const internal::ClientChannelGlobalParsedObject* parsed_object,
UniquePtr<char>* lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) {
// Prefer the LB policy name found in the service config.
if (parsed_object != nullptr &&
parsed_object->parsed_lb_config() != nullptr) {
lb_policy_name->reset(
gpr_strdup(parsed_object->parsed_lb_config()->name()));
*lb_policy_config = parsed_object->parsed_lb_config();
} else {
const char* local_policy_name = nullptr;
if (parsed_object != nullptr &&
parsed_object->parsed_deprecated_lb_policy() != nullptr) {
local_policy_name = parsed_object->parsed_deprecated_lb_policy();
} else {
const grpc_arg* channel_arg =
grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
local_policy_name = grpc_channel_arg_get_string(channel_arg);
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver has returned.
bool found_balancer_address = false;
for (size_t i = 0; i < resolver_result.addresses.size(); ++i) {
const ServerAddress& address = resolver_result.addresses[i];
if (address.IsBalancer()) {
found_balancer_address = true;
break;
}
}
if (found_balancer_address) {
if (local_policy_name != nullptr &&
strcmp(local_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy",
local_policy_name);
}
local_policy_name = "grpclb";
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (local_policy_name == nullptr) {
local_policy_name = "pick_first";
}
lb_policy_name->reset(gpr_strdup(local_policy_name));
}
}
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config
if (result.service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then prefer using the saved service
// config, otherwise use the default service config provided by the client
// API
if (chand->saved_service_config_ != nullptr) {
service_config = chand->saved_service_config_;
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. "
"Continuing to use previous service config.",
chand);
}
} else if (chand->default_service_config_ != nullptr) {
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned invalid service config. Using "
"default service config provided by client API.",
chand);
}
service_config = chand->default_service_config_;
}
} else if (result.service_config == nullptr) {
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p: resolver returned no service config. Using default "
"service config provided by client API.",
chand);
}
if (chand->default_service_config_ != nullptr) {
service_config = chand->default_service_config_;
}
} else {
service_config = result.service_config;
}
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
return false;
}
UniquePtr<char> service_config_json = nullptr;
// Process service config.
const internal::ClientChannelGlobalParsedObject* parsed_object = nullptr;
if (service_config != nullptr) {
parsed_object =
static_cast<const internal::ClientChannelGlobalParsedObject*>(
service_config->GetParsedGlobalServiceConfigObject(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
const bool service_config_changed =
((service_config == nullptr) !=
(chand->saved_service_config_ == nullptr)) ||
(service_config != nullptr &&
strcmp(service_config->service_config_json(),
chand->saved_service_config_->service_config_json()) != 0);
if (service_config_changed) {
if (service_config != nullptr) {
service_config_json.reset(
gpr_strdup(service_config->service_config_json()));
} else {
service_config_json.reset(gpr_strdup(""));
}
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json.get());
}
chand->saved_service_config_ = std::move(service_config);
if (parsed_object != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_object->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
}
if (service_config_changed || !chand->set_service_config_) {
chand->set_service_config_ = true;
Optional<internal::ClientChannelGlobalParsedObject::RetryThrottling>
retry_throttle_data;
if (parsed_object != nullptr) {
retry_throttle_data = parsed_object->retry_throttling();
}
// Create service config setter to update channel state in the data
// plane combiner. Destroys itself when done.
New<ServiceConfigSetter>(chand, retry_throttle_data,
chand->saved_service_config_);
}
UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(result, parsed_object, &processed_lb_policy_name,
lb_policy_config);
// Swap out the data used by GetChannelInfo().
{
MutexLock lock(&chand->info_mu_);
chand->info_lb_policy_name_ = std::move(processed_lb_policy_name);
if (service_config_json != nullptr) {
chand->info_service_config_json_ = std::move(service_config_json);
}
}
// Return results.
*lb_policy_name = chand->info_lb_policy_name_.get();
return service_config_changed;
}
grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
if (grpc_connectivity_state_check(&state_tracker_) != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
}
LoadBalancingPolicy::PickArgs pick;
grpc_error* error = GRPC_ERROR_NONE;
picker_->Pick(&pick, &error);
if (pick.connected_subchannel != nullptr) {
pick.connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LB policy dropped call on ping");
}
}
return error;
}
void ChannelData::StartTransportOpLocked(void* arg, grpc_error* ignored) {
grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
grpc_channel_element* elem =
static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Connectivity watch.
if (op->on_connectivity_state_change != nullptr) {
grpc_connectivity_state_notify_on_state_change(
&chand->state_tracker_, op->connectivity_state,
op->on_connectivity_state_change);
op->on_connectivity_state_change = nullptr;
op->connectivity_state = nullptr;
}
// Ping.
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
grpc_error* error = chand->DoPingLocked(op);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
}
op->bind_pollset = nullptr;
op->send_ping.on_initiate = nullptr;
op->send_ping.on_ack = nullptr;
}
// Reset backoff.
if (op->reset_connect_backoff) {
if (chand->resolving_lb_policy_ != nullptr) {
chand->resolving_lb_policy_->ResetBackoffLocked();
}
}
// Disconnect.
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
grpc_error* error = GRPC_ERROR_NONE;
GPR_ASSERT(chand->disconnect_error_.CompareExchangeStrong(
&error, op->disconnect_with_error, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
grpc_pollset_set_del_pollset_set(
chand->resolving_lb_policy_->interested_parties(),
chand->interested_parties_);
chand->resolving_lb_policy_.reset();
// Will delete itself.
New<ConnectivityStateAndPickerSetter>(
chand, GRPC_CHANNEL_SHUTDOWN, "shutdown from API",
UniquePtr<LoadBalancingPolicy::SubchannelPicker>(
New<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(op->disconnect_with_error))));
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(op->set_accept_stream == false);
// Handle bind_pollset.
if (op->bind_pollset != nullptr) {
grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
}
// Pop into control plane combiner for remaining ops.
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op,
grpc_combiner_scheduler(chand->combiner_)),
GRPC_ERROR_NONE);
}
void ChannelData::GetChannelInfo(grpc_channel_element* elem,
const grpc_channel_info* info) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
MutexLock lock(&chand->info_mu_);
if (info->lb_policy_name != nullptr) {
*info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
}
if (info->service_config_json != nullptr) {
*info->service_config_json =
gpr_strdup(chand->info_service_config_json_.get());
}
}
void ChannelData::AddQueuedPick(QueuedPick* pick,
grpc_polling_entity* pollent) {
// Add call to queued picks list.
pick->next = queued_picks_;
queued_picks_ = pick;
// Add call's pollent to channel's interested_parties, so that I/O
// can be done under the call's CQ.
grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
}
void ChannelData::RemoveQueuedPick(QueuedPick* to_remove,
grpc_polling_entity* pollent) {
// Remove call's pollent from channel's interested_parties.
grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
// Remove from queued picks list.
for (QueuedPick** pick = &queued_picks_; *pick != nullptr;
pick = &(*pick)->next) {
if (*pick == to_remove) {
*pick = to_remove->next;
return;
}
}
}
void ChannelData::TryToConnectLocked(void* arg, grpc_error* error_ignored) {
auto* chand = static_cast<ChannelData*>(arg);
if (chand->resolving_lb_policy_ != nullptr) {
chand->resolving_lb_policy_->ExitIdleLocked();
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "TryToConnect");
}
grpc_connectivity_state ChannelData::CheckConnectivityState(
bool try_to_connect) {
grpc_connectivity_state out = grpc_connectivity_state_check(&state_tracker_);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(TryToConnectLocked, this,
grpc_combiner_scheduler(combiner_)),
GRPC_ERROR_NONE);
}
return out;
}
//
// CallData implementation
//
// Retry support:
//
// In order to support retries, we act as a proxy for stream op batches.
// When we get a batch from the surface, we add it to our list of pending
// batches, and we then use those batches to construct separate "child"
// batches to be started on the subchannel call. When the child batches
// return, we then decide which pending batches have been completed and
// schedule their callbacks accordingly. If a subchannel call fails and
// we want to retry it, we do a new pick and start again, constructing
// new "child" batches for the new subchannel call.
//
// Note that retries are committed when receiving data from the server
// (except for Trailers-Only responses). However, there may be many
// send ops started before receiving any data, so we may have already
// completed some number of send ops (and returned the completions up to
// the surface) by the time we realize that we need to retry. To deal
// with this, we cache data for send ops, so that we can replay them on a
// different subchannel call even after we have completed the original
// batches.
//
// There are two sets of data to maintain:
// - In call_data (in the parent channel), we maintain a list of pending
// ops and cached data for send ops.
// - In the subchannel call, we maintain state to indicate what ops have
// already been sent down to that call.
//
// When constructing the "child" batches, we compare those two sets of
// data to see which batches need to be sent to the subchannel call.
// TODO(roth): In subsequent PRs:
// - add support for transparent retries (including initial metadata)
// - figure out how to record stats in census for retries
// (census filter is on top of this one)
// - add census stats for retries
CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
const grpc_call_element_args& args)
: deadline_state_(elem, args.call_stack, args.call_combiner,
GPR_LIKELY(chand.deadline_checking_enabled())
? args.deadline
: GRPC_MILLIS_INF_FUTURE),
path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
call_combiner_(args.call_combiner),
call_context_(args.context),
pending_send_initial_metadata_(false),
pending_send_message_(false),
pending_send_trailing_metadata_(false),
enable_retries_(chand.enable_retries()),
retry_committed_(false),
last_attempt_got_server_pushback_(false) {}
CallData::~CallData() {
grpc_slice_unref_internal(path_);
GRPC_ERROR_UNREF(cancel_error_);
// Make sure there are no remaining pending batches.
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i].batch == nullptr);
}
}
grpc_error* CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
new (elem->call_data) CallData(elem, *chand, *args);
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure) {
CallData* calld = static_cast<CallData*>(elem->call_data);
if (GPR_LIKELY(calld->subchannel_call_ != nullptr)) {
calld->subchannel_call_->SetAfterCallStackDestroy(then_schedule_closure);
then_schedule_closure = nullptr;
}
calld->~CallData();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
void CallData::StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
CallData* calld = static_cast<CallData*>(elem->call_data);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (GPR_LIKELY(chand->deadline_checking_enabled())) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
}
// If we've previously been cancelled, immediately fail any new batches.
if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
chand, calld, grpc_error_string(calld->cancel_error_));
}
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
return;
}
// Handle cancellation.
if (GPR_UNLIKELY(batch->cancel_stream)) {
// Stash a copy of cancel_error in our call data, so that we can use
// it for subsequent operations. This ensures that if the call is
// cancelled before any batches are passed down (e.g., if the deadline
// is in the past when the call starts), we can return the right
// error to the caller when the first batch does get passed down.
GRPC_ERROR_UNREF(calld->cancel_error_);
calld->cancel_error_ =
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
calld, grpc_error_string(calld->cancel_error_));
}
// If we do not have a subchannel call (i.e., a pick has not yet
// been started), fail all pending batches. Otherwise, send the
// cancellation down to the subchannel call.
if (calld->subchannel_call_ == nullptr) {
// TODO(roth): If there is a pending retry callback, do we need to
// cancel it here?
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
NoYieldCallCombiner);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
} else {
// Note: This will release the call combiner.
calld->subchannel_call_->StartTransportStreamOpBatch(batch);
}
return;
}
// Add the batch to the pending list.
calld->PendingBatchesAdd(elem, batch);
// Check if we've already gotten a subchannel call.
// Note that once we have completed the pick, we do not need to enter
// the channel combiner, which is more efficient (especially for
// streaming calls).
if (calld->subchannel_call_ != nullptr) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
calld, calld->subchannel_call_.get());
}
calld->PendingBatchesResume(elem);
return;
}
// We do not yet have a subchannel call.
// For batches containing a send_initial_metadata op, enter the channel
// combiner to start a pick.
if (GPR_LIKELY(batch->send_initial_metadata)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
chand, calld);
}
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(
&batch->handler_private.closure, StartPickLocked, elem,
grpc_combiner_scheduler(chand->data_plane_combiner())),
GRPC_ERROR_NONE);
} else {
// For all other batches, release the call combiner.
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: saved batch, yielding call combiner", chand,
calld);
}
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"batch does not include send_initial_metadata");
}
}
void CallData::SetPollent(grpc_call_element* elem,
grpc_polling_entity* pollent) {
CallData* calld = static_cast<CallData*>(elem->call_data);
calld->pollent_ = pollent;
}
//
// send op data caching
//
void CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
if (pending->send_ops_cached) return;
pending->send_ops_cached = true;
grpc_transport_stream_op_batch* batch = pending->batch;
// Save a copy of metadata for send_initial_metadata ops.
if (batch->send_initial_metadata) {
seen_send_initial_metadata_ = true;
GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
send_initial_metadata_storage_);
send_initial_metadata_flags_ =
batch->payload->send_initial_metadata.send_initial_metadata_flags;
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.
if (batch->send_message) {
ByteStreamCache* cache = arena_->New<ByteStreamCache>(
std::move(batch->payload->send_message.send_message));
send_messages_.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
if (batch->send_trailing_metadata) {
seen_send_trailing_metadata_ = true;
GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
grpc_metadata_batch* send_trailing_metadata =
batch->payload->send_trailing_metadata.send_trailing_metadata;
send_trailing_metadata_storage_ = (grpc_linked_mdelem*)arena_->Alloc(
sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
send_trailing_metadata_storage_);
}
}
void CallData::FreeCachedSendInitialMetadata(ChannelData* chand) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
this);
}
grpc_metadata_batch_destroy(&send_initial_metadata_);
}
void CallData::FreeCachedSendMessage(ChannelData* chand, size_t idx) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
chand, this, idx);
}
send_messages_[idx]->Destroy();
}
void CallData::FreeCachedSendTrailingMetadata(ChannelData* chand) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: destroying calld->send_trailing_metadata",
chand, this);
}
grpc_metadata_batch_destroy(&send_trailing_metadata_);
}
void CallData::FreeCachedSendOpDataAfterCommit(
grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (retry_state->completed_send_initial_metadata) {
FreeCachedSendInitialMetadata(chand);
}
for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
FreeCachedSendMessage(chand, i);
}
if (retry_state->completed_send_trailing_metadata) {
FreeCachedSendTrailingMetadata(chand);
}
}
void CallData::FreeCachedSendOpDataForCompletedBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (batch_data->batch.send_initial_metadata) {
FreeCachedSendInitialMetadata(chand);
}
if (batch_data->batch.send_message) {
FreeCachedSendMessage(chand, retry_state->completed_send_message_count - 1);
}
if (batch_data->batch.send_trailing_metadata) {
FreeCachedSendTrailingMetadata(chand);
}
}
//
// LB recv_trailing_metadata_ready handling
//
void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
const LoadBalancingPolicy::PickArgs& pick,
grpc_transport_stream_op_batch* batch) {
if (pick.recv_trailing_metadata_ready != nullptr) {
*pick.original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
pick.recv_trailing_metadata_ready;
if (pick.recv_trailing_metadata != nullptr) {
*pick.recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
}
}
}
//
// pending_batches management
//
size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
// Note: It is important the send_initial_metadata be the first entry
// here, since the code in pick_subchannel_locked() assumes it will be.
if (batch->send_initial_metadata) return 0;
if (batch->send_message) return 1;
if (batch->send_trailing_metadata) return 2;
if (batch->recv_initial_metadata) return 3;
if (batch->recv_message) return 4;
if (batch->recv_trailing_metadata) return 5;
GPR_UNREACHABLE_CODE(return (size_t)-1);
}
// This is called via the call combiner, so access to calld is synchronized.
void CallData::PendingBatchesAdd(grpc_call_element* elem,
grpc_transport_stream_op_batch* batch) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
const size_t idx = GetBatchIndex(batch);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
this, idx);
}
PendingBatch* pending = &pending_batches_[idx];
GPR_ASSERT(pending->batch == nullptr);
pending->batch = batch;
pending->send_ops_cached = false;
if (enable_retries_) {
// Update state in calld about pending batches.
// Also check if the batch takes us over the retry buffer limit.
// Note: We don't check the size of trailing metadata here, because
// gRPC clients do not send trailing metadata.
if (batch->send_initial_metadata) {
pending_send_initial_metadata_ = true;
bytes_buffered_for_retry_ += grpc_metadata_batch_size(
batch->payload->send_initial_metadata.send_initial_metadata);
}
if (batch->send_message) {
pending_send_message_ = true;
bytes_buffered_for_retry_ +=
batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
pending_send_trailing_metadata_ = true;
}
if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
chand->per_rpc_retry_buffer_size())) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: exceeded retry buffer size, committing",
chand, this);
}
SubchannelCallRetryState* retry_state =
subchannel_call_ == nullptr ? nullptr
: static_cast<SubchannelCallRetryState*>(
subchannel_call_->GetParentData());
RetryCommit(elem, retry_state);
// If we are not going to retry and have not yet started, pretend
// retries are disabled so that we don't bother with retry overhead.
if (num_attempts_completed_ == 0) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: disabling retries before first attempt",
chand, this);
}
enable_retries_ = false;
}
}
}
}
void CallData::PendingBatchClear(PendingBatch* pending) {
if (enable_retries_) {
if (pending->batch->send_initial_metadata) {
pending_send_initial_metadata_ = false;
}
if (pending->batch->send_message) {
pending_send_message_ = false;
}
if (pending->batch->send_trailing_metadata) {
pending_send_trailing_metadata_ = false;
}
}
pending->batch = nullptr;
}
void CallData::MaybeClearPendingBatch(grpc_call_element* elem,
PendingBatch* pending) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
grpc_transport_stream_op_batch* batch = pending->batch;
// We clear the pending batch if all of its callbacks have been
// scheduled and reset to nullptr.
if (batch->on_complete == nullptr &&
(!batch->recv_initial_metadata ||
batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
nullptr) &&
(!batch->recv_message ||
batch->payload->recv_message.recv_message_ready == nullptr) &&
(!batch->recv_trailing_metadata ||
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
nullptr)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
this);
}
PendingBatchClear(pending);
}
}
// This is called via the call combiner, so access to calld is synchronized.
void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(error), calld->call_combiner_);
}
// This is called via the call combiner, so access to calld is synchronized.
void CallData::PendingBatchesFail(
grpc_call_element* elem, grpc_error* error,
YieldCallCombinerPredicate yield_call_combiner_predicate) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
if (grpc_client_channel_call_trace.enabled()) {
size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
if (pending_batches_[i].batch != nullptr) ++num_batches;
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, this, num_batches, grpc_error_string(error));
}
CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
batch);
}
batch->handler_private.extra_arg = this;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
FailPendingBatchInCallCombiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
"PendingBatchesFail");
PendingBatchClear(pending);
}
}
if (yield_call_combiner_predicate(closures)) {
closures.RunClosures(call_combiner_);
} else {
closures.RunClosuresWithoutYielding(call_combiner_);
}
GRPC_ERROR_UNREF(error);
}
// This is called via the call combiner, so access to calld is synchronized.
void CallData::ResumePendingBatchInCallCombiner(void* arg,
grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* subchannel_call =
static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
subchannel_call->StartTransportStreamOpBatch(batch);
}
// This is called via the call combiner, so access to calld is synchronized.
void CallData::PendingBatchesResume(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (enable_retries_) {
StartRetriableSubchannelBatches(elem, GRPC_ERROR_NONE);
return;
}
// Retries not enabled; send down batches as-is.
if (grpc_client_channel_call_trace.enabled()) {
size_t num_batches = 0;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
if (pending_batches_[i].batch != nullptr) ++num_batches;
}
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" pending batches on subchannel_call=%p",
chand, this, num_batches, subchannel_call_.get());
}
CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
if (batch->recv_trailing_metadata) {
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(pick_.pick,
batch);
}
batch->handler_private.extra_arg = subchannel_call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
ResumePendingBatchInCallCombiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"PendingBatchesResume");
PendingBatchClear(pending);
}
}
// Note: This will release the call combiner.
closures.RunClosures(call_combiner_);
}
template <typename Predicate>
CallData::PendingBatch* CallData::PendingBatchFind(grpc_call_element* elem,
const char* log_message,
Predicate predicate) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr && predicate(batch)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
this, log_message, i);
}
return pending;
}
}
return nullptr;
}
//
// retry code
//
void CallData::RetryCommit(grpc_call_element* elem,
SubchannelCallRetryState* retry_state) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (retry_committed_) return;
retry_committed_ = true;
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, this);
}
if (retry_state != nullptr) {
FreeCachedSendOpDataAfterCommit(elem, retry_state);
}
}
void CallData::DoRetry(grpc_call_element* elem,
SubchannelCallRetryState* retry_state,
grpc_millis server_pushback_ms) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(method_params_ != nullptr);
const auto* retry_policy = method_params_->retry_policy();
GPR_ASSERT(retry_policy != nullptr);
// Reset subchannel call and connected subchannel.
subchannel_call_.reset();
pick_.pick.connected_subchannel.reset();
// Compute backoff delay.
grpc_millis next_attempt_time;
if (server_pushback_ms >= 0) {
next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
last_attempt_got_server_pushback_ = true;
} else {
if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
retry_backoff_.Init(
BackOff::Options()
.set_initial_backoff(retry_policy->initial_backoff)
.set_multiplier(retry_policy->backoff_multiplier)
.set_jitter(RETRY_BACKOFF_JITTER)
.set_max_backoff(retry_policy->max_backoff));
last_attempt_got_server_pushback_ = false;
}
next_attempt_time = retry_backoff_->NextAttemptTime();
}
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
this, next_attempt_time - ExecCtx::Get()->Now());
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&pick_closure_, StartPickLocked, elem,
grpc_combiner_scheduler(chand->data_plane_combiner()));
grpc_timer_init(&retry_timer_, next_attempt_time, &pick_closure_);
// Update bookkeeping.
if (retry_state != nullptr) retry_state->retry_dispatched = true;
}
bool CallData::MaybeRetry(grpc_call_element* elem,
SubchannelCallBatchData* batch_data,
grpc_status_code status,
grpc_mdelem* server_pushback_md) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Get retry policy.
if (method_params_ == nullptr) return false;
const auto* retry_policy = method_params_->retry_policy();
if (retry_policy == nullptr) return false;
// If we've already dispatched a retry from this call, return true.
// This catches the case where the batch has multiple callbacks
// (i.e., it includes either recv_message or recv_initial_metadata).
SubchannelCallRetryState* retry_state = nullptr;
if (batch_data != nullptr) {
retry_state = static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
if (retry_state->retry_dispatched) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
this);
}
return true;
}
}
// Check status.
if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
if (retry_throttle_data_ != nullptr) {
retry_throttle_data_->RecordSuccess();
}
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, this);
}
return false;
}
// Status is not OK. Check whether the status is retryable.
if (!retry_policy->retryable_status_codes.Contains(status)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: status %s not configured as retryable", chand,
this, grpc_status_code_to_string(status));
}
return false;
}
// Record the failure and check whether retries are throttled.
// Note that it's important for this check to come after the status
// code check above, since we should only record failures whose statuses
// match the configured retryable status codes, so that we don't count
// things like failures due to malformed requests (INVALID_ARGUMENT).
// Conversely, it's important for this to come before the remaining
// checks, so that we don't fail to record failures due to other factors.
if (retry_throttle_data_ != nullptr &&
!retry_throttle_data_->RecordFailure()) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, this);
}
return false;
}
// Check whether the call is committed.
if (retry_committed_) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
this);
}
return false;
}
// Check whether we have retries remaining.
++num_attempts_completed_;
if (num_attempts_completed_ >= retry_policy->max_attempts) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
this, retry_policy->max_attempts);
}
return false;
}
// If the call was cancelled from the surface, don't retry.
if (cancel_error_ != GRPC_ERROR_NONE) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: call cancelled from surface, not retrying",
chand, this);
}
return false;
}
// Check server push-back.
grpc_millis server_pushback_ms = -1;
if (server_pushback_md != nullptr) {
// If the value is "-1" or any other unparseable string, we do not retry.
uint32_t ms;
if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: not retrying due to server push-back",
chand, this);
}
return false;
} else {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
chand, this, ms);
}
server_pushback_ms = (grpc_millis)ms;
}
}
DoRetry(elem, retry_state, server_pushback_ms);
return true;
}
//
// CallData::SubchannelCallBatchData
//
CallData::SubchannelCallBatchData* CallData::SubchannelCallBatchData::Create(
grpc_call_element* elem, int refcount, bool set_on_complete) {
CallData* calld = static_cast<CallData*>(elem->call_data);
return calld->arena_->New<SubchannelCallBatchData>(elem, calld, refcount,
set_on_complete);
}
CallData::SubchannelCallBatchData::SubchannelCallBatchData(
grpc_call_element* elem, CallData* calld, int refcount,
bool set_on_complete)
: elem(elem), subchannel_call(calld->subchannel_call_) {
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
calld->subchannel_call_->GetParentData());
batch.payload = &retry_state->batch_payload;
gpr_ref_init(&refs, refcount);
if (set_on_complete) {
GRPC_CLOSURE_INIT(&on_complete, CallData::OnComplete, this,
grpc_schedule_on_exec_ctx);
batch.on_complete = &on_complete;
}
GRPC_CALL_STACK_REF(calld->owning_call_, "batch_data");
}
void CallData::SubchannelCallBatchData::Destroy() {
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(subchannel_call->GetParentData());
if (batch.send_initial_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
}
if (batch.send_trailing_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
}
if (batch.recv_initial_metadata) {
grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
}
if (batch.recv_trailing_metadata) {
grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
}
subchannel_call.reset();
CallData* calld = static_cast<CallData*>(elem->call_data);
GRPC_CALL_STACK_UNREF(calld->owning_call_, "batch_data");
}
//
// recv_initial_metadata callback handling
//
void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
// Find pending batch.
PendingBatch* pending = calld->PendingBatchFind(
batch_data->elem, "invoking recv_initial_metadata_ready for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_initial_metadata &&
batch->payload->recv_initial_metadata
.recv_initial_metadata_ready != nullptr;
});
GPR_ASSERT(pending != nullptr);
// Return metadata.
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_initial_metadata,
pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
grpc_closure* recv_initial_metadata_ready =
pending->batch->payload->recv_initial_metadata
.recv_initial_metadata_ready;
pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
nullptr;
calld->MaybeClearPendingBatch(batch_data->elem, pending);
batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
}
void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
chand, calld, grpc_error_string(error));
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_initial_metadata = true;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_initial_metadata op, so do nothing.
if (retry_state->retry_dispatched) {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_initial_metadata_ready after retry dispatched");
return;
}
// If we got an error or a Trailers-Only response and have not yet gotten
// the recv_trailing_metadata_ready callback, then defer propagating this
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_initial_metadata_ready "
"(Trailers-Only)",
chand, calld);
}
retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
// ourselves to get status.
calld->StartInternalRecvTrailingMetadata(elem);
} else {
GRPC_CALL_COMBINER_STOP(
calld->call_combiner_,
"recv_initial_metadata_ready trailers-only or error");
}
return;
}
// Received valid initial metadata, so commit the call.
calld->RetryCommit(elem, retry_state);
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvInitialMetadataCallback(batch_data, error);
}
//
// recv_message callback handling
//
void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
CallData* calld = static_cast<CallData*>(batch_data->elem->call_data);
// Find pending op.
PendingBatch* pending = calld->PendingBatchFind(
batch_data->elem, "invoking recv_message_ready for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_message &&
batch->payload->recv_message.recv_message_ready != nullptr;
});
GPR_ASSERT(pending != nullptr);
// Return payload.
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
*pending->batch->payload->recv_message.recv_message =
std::move(retry_state->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
grpc_closure* recv_message_ready =
pending->batch->payload->recv_message.recv_message_ready;
pending->batch->payload->recv_message.recv_message_ready = nullptr;
calld->MaybeClearPendingBatch(batch_data->elem, pending);
batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
}
void CallData::RecvMessageReady(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
chand, calld, grpc_error_string(error));
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
++retry_state->completed_recv_message_count;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_message op, so do nothing.
if (retry_state->retry_dispatched) {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
"recv_message_ready after retry dispatched");
return;
}
// If we got an error or the payload was nullptr and we have not yet gotten
// the recv_trailing_metadata_ready callback, then defer propagating this
// callback back to the surface. We can evaluate whether to retry when
// recv_trailing_metadata comes back.
if (GPR_UNLIKELY(
(retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
!retry_state->completed_recv_trailing_metadata)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: deferring recv_message_ready (nullptr "
"message and recv_trailing_metadata pending)",
chand, calld);
}
retry_state->recv_message_ready_deferred_batch = batch_data;
retry_state->recv_message_error = GRPC_ERROR_REF(error);
if (!retry_state->started_recv_trailing_metadata) {
// recv_trailing_metadata not yet started by application; start it
// ourselves to get status.
calld->StartInternalRecvTrailingMetadata(elem);
} else {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "recv_message_ready null");
}
return;
}
// Received a valid message, so commit the call.
calld->RetryCommit(elem, retry_state);
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
calld->InvokeRecvMessageCallback(batch_data, error);
}
//
// recv_trailing_metadata handling
//
void CallData::GetCallStatus(grpc_call_element* elem,
grpc_metadata_batch* md_batch, grpc_error* error,
grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
} else {
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
if (server_pushback_md != nullptr &&
md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
}
GRPC_ERROR_UNREF(error);
}
void CallData::AddClosureForRecvTrailingMetadataReady(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
grpc_error* error, CallCombinerClosureList* closures) {
// Find pending batch.
PendingBatch* pending = PendingBatchFind(
elem, "invoking recv_trailing_metadata for",
[](grpc_transport_stream_op_batch* batch) {
return batch->recv_trailing_metadata &&
batch->payload->recv_trailing_metadata
.recv_trailing_metadata_ready != nullptr;
});
// If we generated the recv_trailing_metadata op internally via
// StartInternalRecvTrailingMetadata(), then there will be no pending batch.
if (pending == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
// Return metadata.
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_trailing_metadata,
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
// Add closure.
closures->Add(pending->batch->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
error, "recv_trailing_metadata_ready for pending batch");
// Update bookkeeping.
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
nullptr;
MaybeClearPendingBatch(elem, pending);
}
void CallData::AddClosuresForDeferredRecvCallbacks(
SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
CallCombinerClosureList* closures) {
if (batch_data->batch.recv_trailing_metadata) {
// Add closure for deferred recv_initial_metadata_ready.
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
InvokeRecvInitialMetadataCallback,
retry_state->recv_initial_metadata_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&retry_state->recv_initial_metadata_ready,
retry_state->recv_initial_metadata_error,
"resuming recv_initial_metadata_ready");
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
nullptr)) {
GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
InvokeRecvMessageCallback,
retry_state->recv_message_ready_deferred_batch,
grpc_schedule_on_exec_ctx);
closures->Add(&retry_state->recv_message_ready,
retry_state->recv_message_error,
"resuming recv_message_ready");
retry_state->recv_message_ready_deferred_batch = nullptr;
}
}
}
bool CallData::PendingBatchIsUnstarted(PendingBatch* pending,
SubchannelCallRetryState* retry_state) {
if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
return false;
}
if (pending->batch->send_initial_metadata &&
!retry_state->started_send_initial_metadata) {
return true;
}
if (pending->batch->send_message &&
retry_state->started_send_message_count < send_messages_.size()) {
return true;
}
if (pending->batch->send_trailing_metadata &&
!retry_state->started_send_trailing_metadata) {
return true;
}
return false;
}
void CallData::AddClosuresToFailUnstartedPendingBatches(
grpc_call_element* elem, SubchannelCallRetryState* retry_state,
grpc_error* error, CallCombinerClosureList* closures) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
if (PendingBatchIsUnstarted(pending, retry_state)) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: failing unstarted pending batch at index "
"%" PRIuPTR,
chand, this, i);
}
closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
"failing on_complete for pending batch");
pending->batch->on_complete = nullptr;
MaybeClearPendingBatch(elem, pending);
}
}
GRPC_ERROR_UNREF(error);
}
void CallData::RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
grpc_error* error) {
grpc_call_element* elem = batch_data->elem;
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
// Construct list of closures to execute.
CallCombinerClosureList closures;
// First, add closure for recv_trailing_metadata_ready.
AddClosureForRecvTrailingMetadataReady(elem, batch_data,
GRPC_ERROR_REF(error), &closures);
// If there are deferred recv_initial_metadata_ready or recv_message_ready
// callbacks, add them to closures.
AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
// Add closures to fail any pending batches that have not yet been started.
AddClosuresToFailUnstartedPendingBatches(elem, retry_state,
GRPC_ERROR_REF(error), &closures);
// Don't need batch_data anymore.
batch_data->Unref();
// Schedule all of the closures identified above.
// Note: This will release the call combiner.
closures.RunClosures(call_combiner_);
GRPC_ERROR_UNREF(error);
}
void CallData::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
chand, calld, grpc_error_string(error));
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_trailing_metadata = true;
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
grpc_mdelem* server_pushback_md = nullptr;
grpc_metadata_batch* md_batch =
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
calld->GetCallStatus(elem, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
}
// Check if we should retry.
if (calld->MaybeRetry(elem, batch_data, status, server_pushback_md)) {
// Unref batch_data for deferred recv_initial_metadata_ready or
// recv_message_ready callbacks, if any.
if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
batch_data->Unref();
GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
}
if (retry_state->recv_message_ready_deferred_batch != nullptr) {
batch_data->Unref();
GRPC_ERROR_UNREF(retry_state->recv_message_error);
}
batch_data->Unref();
return;
}
// Not retrying, so commit the call.
calld->RetryCommit(elem, retry_state);
// Run any necessary closures.
calld->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
}
//
// on_complete callback handling
//
void CallData::AddClosuresForCompletedPendingBatch(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, grpc_error* error,
CallCombinerClosureList* closures) {
PendingBatch* pending = PendingBatchFind(
elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
// Match the pending batch with the same set of send ops as the
// subchannel batch we've just completed.
return batch->on_complete != nullptr &&
batch_data->batch.send_initial_metadata ==
batch->send_initial_metadata &&
batch_data->batch.send_message == batch->send_message &&
batch_data->batch.send_trailing_metadata ==
batch->send_trailing_metadata;
});
// If batch_data is a replay batch, then there will be no pending
// batch to complete.
if (pending == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
// Add closure.
closures->Add(pending->batch->on_complete, error,
"on_complete for pending batch");
pending->batch->on_complete = nullptr;
MaybeClearPendingBatch(elem, pending);
}
void CallData::AddClosuresForReplayOrPendingSendOps(
grpc_call_element* elem, SubchannelCallBatchData* batch_data,
SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
bool have_pending_send_message_ops =
retry_state->started_send_message_count < send_messages_.size();
bool have_pending_send_trailing_metadata_op =
seen_send_trailing_metadata_ &&
!retry_state->started_send_trailing_metadata;
if (!have_pending_send_message_ops &&
!have_pending_send_trailing_metadata_op) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr || pending->send_ops_cached) continue;
if (batch->send_message) have_pending_send_message_ops = true;
if (batch->send_trailing_metadata) {
have_pending_send_trailing_metadata_op = true;
}
}
}
if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting next batch for pending send op(s)",
chand, this);
}
GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
StartRetriableSubchannelBatches, elem,
grpc_schedule_on_exec_ctx);
closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
"starting next batch for send_* op(s)");
}
}
void CallData::OnComplete(void* arg, grpc_error* error) {
SubchannelCallBatchData* batch_data =
static_cast<SubchannelCallBatchData*>(arg);
grpc_call_element* elem = batch_data->elem;
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (grpc_client_channel_call_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
chand, calld, grpc_error_string(error), batch_str);
gpr_free(batch_str);
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
batch_data->subchannel_call->GetParentData());
// Update bookkeeping in retry_state.
if (batch_data->batch.send_initial_metadata) {
retry_state->completed_send_initial_metadata = true;
}
if (batch_data->batch.send_message) {
++retry_state->completed_send_message_count;
}
if (batch_data->batch.send_trailing_metadata) {
retry_state->completed_send_trailing_metadata = true;
}
// If the call is committed, free cached data for send ops that we've just
// completed.
if (calld->retry_committed_) {
calld->FreeCachedSendOpDataForCompletedBatch(elem, batch_data, retry_state);
}
// Construct list of closures to execute.
CallCombinerClosureList closures;
// If a retry was already dispatched, that means we saw
// recv_trailing_metadata before this, so we do nothing here.
// Otherwise, invoke the callback to return the result to the surface.
if (!retry_state->retry_dispatched) {
// Add closure for the completed pending batch, if any.
calld->AddClosuresForCompletedPendingBatch(
elem, batch_data, retry_state, GRPC_ERROR_REF(error), &closures);
// If needed, add a callback to start any replay or pending send ops on
// the subchannel call.
if (!retry_state->completed_recv_trailing_metadata) {
calld->AddClosuresForReplayOrPendingSendOps(elem, batch_data, retry_state,
&closures);
}
}
// Track number of pending subchannel send batches and determine if this
// was the last one.
--calld->num_pending_retriable_subchannel_send_batches_;
const bool last_send_batch_complete =
calld->num_pending_retriable_subchannel_send_batches_ == 0;
// Don't need batch_data anymore.
batch_data->Unref();
// Schedule all of the closures identified above.
// Note: This yeilds the call combiner.
closures.RunClosures(calld->call_combiner_);
// If this was the last subchannel send batch, unref the call stack.
if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call_, "subchannel_send_batches");
}
}
//
// subchannel batch construction
//
void CallData::StartBatchInCallCombiner(void* arg, grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* subchannel_call =
static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
subchannel_call->StartTransportStreamOpBatch(batch);
}
void CallData::AddClosureForSubchannelBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
CallCombinerClosureList* closures) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
batch->handler_private.extra_arg = subchannel_call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
batch, grpc_schedule_on_exec_ctx);
if (grpc_client_channel_call_trace.enabled()) {
char* batch_str = grpc_transport_stream_op_batch_string(batch);
gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
this, batch_str);
gpr_free(batch_str);
}
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"start_subchannel_batch");
}
void CallData::AddRetriableSendInitialMetadataOp(
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
// Maps the number of retries to the corresponding metadata value slice.
static const grpc_slice* retry_count_strings[] = {
&GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
// We need to make a copy of the metadata batch for each attempt, since
// the filters in the subchannel stack may modify this batch, and we don't
// want those modifications to be passed forward to subsequent attempts.
//
// If we've already completed one or more attempts, add the
// grpc-retry-attempts header.
retry_state->send_initial_metadata_storage =
static_cast<grpc_linked_mdelem*>(arena_->Alloc(
sizeof(grpc_linked_mdelem) *
(send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
grpc_metadata_batch_copy(&send_initial_metadata_,
&retry_state->send_initial_metadata,
retry_state->send_initial_metadata_storage);
if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts != nullptr)) {
grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
retry_state->send_initial_metadata.idx.named
.grpc_previous_rpc_attempts);
}
if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
grpc_mdelem retry_md = grpc_mdelem_create(
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
*retry_count_strings[num_attempts_completed_ - 1], nullptr);
grpc_error* error = grpc_metadata_batch_add_tail(
&retry_state->send_initial_metadata,
&retry_state
->send_initial_metadata_storage[send_initial_metadata_.list.count],
retry_md);
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
gpr_log(GPR_ERROR, "error adding retry metadata: %s",
grpc_error_string(error));
GPR_ASSERT(false);
}
}
retry_state->started_send_initial_metadata = true;
batch_data->batch.send_initial_metadata = true;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
&retry_state->send_initial_metadata;
batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
send_initial_metadata_flags_;
batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
}
void CallData::AddRetriableSendMessageOp(grpc_call_element* elem,
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
chand, this, retry_state->started_send_message_count);
}
ByteStreamCache* cache =
send_messages_[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
retry_state->send_message.Init(cache);
batch_data->batch.send_message = true;
batch_data->batch.payload->send_message.send_message.reset(
retry_state->send_message.get());
}
void CallData::AddRetriableSendTrailingMetadataOp(
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
// We need to make a copy of the metadata batch for each attempt, since
// the filters in the subchannel stack may modify this batch, and we don't
// want those modifications to be passed forward to subsequent attempts.
retry_state->send_trailing_metadata_storage =
static_cast<grpc_linked_mdelem*>(arena_->Alloc(
sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
grpc_metadata_batch_copy(&send_trailing_metadata_,
&retry_state->send_trailing_metadata,
retry_state->send_trailing_metadata_storage);
retry_state->started_send_trailing_metadata = true;
batch_data->batch.send_trailing_metadata = true;
batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
&retry_state->send_trailing_metadata;
}
void CallData::AddRetriableRecvInitialMetadataOp(
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
retry_state->started_recv_initial_metadata = true;
batch_data->batch.recv_initial_metadata = true;
grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
&retry_state->recv_initial_metadata;
batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
&retry_state->trailing_metadata_available;
GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
RecvInitialMetadataReady, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
&retry_state->recv_initial_metadata_ready;
}
void CallData::AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
++retry_state->started_recv_message_count;
batch_data->batch.recv_message = true;
batch_data->batch.payload->recv_message.recv_message =
&retry_state->recv_message;
GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
batch_data, grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_message.recv_message_ready =
&retry_state->recv_message_ready;
}
void CallData::AddRetriableRecvTrailingMetadataOp(
SubchannelCallRetryState* retry_state,
SubchannelCallBatchData* batch_data) {
retry_state->started_recv_trailing_metadata = true;
batch_data->batch.recv_trailing_metadata = true;
grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
&retry_state->recv_trailing_metadata;
batch_data->batch.payload->recv_trailing_metadata.collect_stats =
&retry_state->collect_stats;
GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
RecvTrailingMetadataReady, batch_data,
grpc_schedule_on_exec_ctx);
batch_data->batch.payload->recv_trailing_metadata
.recv_trailing_metadata_ready =
&retry_state->recv_trailing_metadata_ready;
MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
pick_.pick, &batch_data->batch);
}
void CallData::StartInternalRecvTrailingMetadata(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: call failed but recv_trailing_metadata not "
"started; starting it internally",
chand, this);
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(subchannel_call_->GetParentData());
// Create batch_data with 2 refs, since this batch will be unreffed twice:
// once for the recv_trailing_metadata_ready callback when the subchannel
// batch returns, and again when we actually get a recv_trailing_metadata
// op from the surface.
SubchannelCallBatchData* batch_data =
SubchannelCallBatchData::Create(elem, 2, false /* set_on_complete */);
AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
subchannel_call_->StartTransportStreamOpBatch(&batch_data->batch);
}
// If there are any cached send ops that need to be replayed on the
// current subchannel call, creates and returns a new subchannel batch
// to replay those ops. Otherwise, returns nullptr.
CallData::SubchannelCallBatchData*
CallData::MaybeCreateSubchannelBatchForReplay(
grpc_call_element* elem, SubchannelCallRetryState* retry_state) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
SubchannelCallBatchData* replay_batch_data = nullptr;
// send_initial_metadata.
if (seen_send_initial_metadata_ &&
!retry_state->started_send_initial_metadata &&
!pending_send_initial_metadata_) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_initial_metadata op",
chand, this);
}
replay_batch_data =
SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
}
// send_message.
// Note that we can only have one send_message op in flight at a time.
if (retry_state->started_send_message_count < send_messages_.size() &&
retry_state->started_send_message_count ==
retry_state->completed_send_message_count &&
!pending_send_message_) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_message op",
chand, this);
}
if (replay_batch_data == nullptr) {
replay_batch_data =
SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
}
AddRetriableSendMessageOp(elem, retry_state, replay_batch_data);
}
// send_trailing_metadata.
// Note that we only add this op if we have no more send_message ops
// to start, since we can't send down any more send_message ops after
// send_trailing_metadata.
if (seen_send_trailing_metadata_ &&
retry_state->started_send_message_count == send_messages_.size() &&
!retry_state->started_send_trailing_metadata &&
!pending_send_trailing_metadata_) {
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: replaying previously completed "
"send_trailing_metadata op",
chand, this);
}
if (replay_batch_data == nullptr) {
replay_batch_data =
SubchannelCallBatchData::Create(elem, 1, true /* set_on_complete */);
}
AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
}
return replay_batch_data;
}
void CallData::AddSubchannelBatchesForPendingBatches(
grpc_call_element* elem, SubchannelCallRetryState* retry_state,
CallCombinerClosureList* closures) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr) continue;
// Skip any batch that either (a) has already been started on this
// subchannel call or (b) we can't start yet because we're still
// replaying send ops that need to be completed first.
// TODO(roth): Note that if any one op in the batch can't be sent
// yet due to ops that we're replaying, we don't start any of the ops
// in the batch. This is probably okay, but it could conceivably
// lead to increased latency in some cases -- e.g., we could delay
// starting a recv op due to it being in the same batch with a send
// op. If/when we revamp the callback protocol in
// transport_stream_op_batch, we may be able to fix this.
if (batch->send_initial_metadata &&
retry_state->started_send_initial_metadata) {
continue;
}
if (batch->send_message && retry_state->completed_send_message_count <
retry_state->started_send_message_count) {
continue;
}
// Note that we only start send_trailing_metadata if we have no more
// send_message ops to start, since we can't send down any more
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
send_messages_.size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
if (batch->recv_initial_metadata &&
retry_state->started_recv_initial_metadata) {
continue;
}
if (batch->recv_message && retry_state->completed_recv_message_count <
retry_state->started_recv_message_count) {
continue;
}
if (batch->recv_trailing_metadata &&
retry_state->started_recv_trailing_metadata) {
// If we previously completed a recv_trailing_metadata op
// initiated by StartInternalRecvTrailingMetadata(), use the
// result of that instead of trying to re-start this op.
if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
nullptr))) {
// If the batch completed, then trigger the completion callback
// directly, so that we return the previously returned results to
// the application. Otherwise, just unref the internally
// started subchannel batch, since we'll propagate the
// completion when it completes.
if (retry_state->completed_recv_trailing_metadata) {
// Batches containing recv_trailing_metadata always succeed.
closures->Add(
&retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
} else {
retry_state->recv_trailing_metadata_internal_batch->Unref();
}
retry_state->recv_trailing_metadata_internal_batch = nullptr;
}
continue;
}
// If we're not retrying, just send the batch as-is.
if (method_params_ == nullptr ||
method_params_->retry_policy() == nullptr || retry_committed_) {
AddClosureForSubchannelBatch(elem, batch, closures);
PendingBatchClear(pending);
continue;
}
// Create batch with the right number of callbacks.
const bool has_send_ops = batch->send_initial_metadata ||
batch->send_message ||
batch->send_trailing_metadata;
const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
batch->recv_message +
batch->recv_trailing_metadata;
SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
MaybeCacheSendOpsForBatch(pending);
// send_initial_metadata.
if (batch->send_initial_metadata) {
AddRetriableSendInitialMetadataOp(retry_state, batch_data);
}
// send_message.
if (batch->send_message) {
AddRetriableSendMessageOp(elem, retry_state, batch_data);
}
// send_trailing_metadata.
if (batch->send_trailing_metadata) {
AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
}
// recv_initial_metadata.
if (batch->recv_initial_metadata) {
// recv_flags is only used on the server side.
GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
}
// recv_message.
if (batch->recv_message) {
AddRetriableRecvMessageOp(retry_state, batch_data);
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
}
AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
batch->send_trailing_metadata) {
if (num_pending_retriable_subchannel_send_batches_ == 0) {
GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
}
++num_pending_retriable_subchannel_send_batches_;
}
}
}
void CallData::StartRetriableSubchannelBatches(void* arg, grpc_error* ignored) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
chand, calld);
}
SubchannelCallRetryState* retry_state =
static_cast<SubchannelCallRetryState*>(
calld->subchannel_call_->GetParentData());
// Construct list of closures to execute, one for each pending batch.
CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
SubchannelCallBatchData* replay_batch_data =
calld->MaybeCreateSubchannelBatchForReplay(elem, retry_state);
if (replay_batch_data != nullptr) {
calld->AddClosureForSubchannelBatch(elem, &replay_batch_data->batch,
&closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (calld->num_pending_retriable_subchannel_send_batches_ == 0) {
GRPC_CALL_STACK_REF(calld->owning_call_, "subchannel_send_batches");
}
++calld->num_pending_retriable_subchannel_send_batches_;
}
// Now add pending batches.
calld->AddSubchannelBatchesForPendingBatches(elem, retry_state, &closures);
// Start batches on subchannel call.
if (grpc_client_channel_call_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
chand, calld, closures.size(), calld->subchannel_call_.get());
}
// Note: This will yield the call combiner.
closures.RunClosures(calld->call_combiner_);
}
//
// LB pick
//
void CallData::CreateSubchannelCall(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
const size_t parent_data_size =
enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
const ConnectedSubchannel::CallArgs call_args = {
pollent_, path_, call_start_time_, deadline_, arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
call_context_, call_combiner_, parent_data_size};
grpc_error* error = GRPC_ERROR_NONE;
subchannel_call_ =
pick_.pick.connected_subchannel->CreateCall(call_args, &error);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, this, subchannel_call_.get(), grpc_error_string(error));
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
PendingBatchesFail(elem, error, YieldCallCombiner);
} else {
if (parent_data_size > 0) {
new (subchannel_call_->GetParentData())
SubchannelCallRetryState(call_context_);
}
PendingBatchesResume(elem);
}
}
void CallData::PickDone(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
CallData* calld = static_cast<CallData*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: failed to pick subchannel: error=%s", chand,
calld, grpc_error_string(error));
}
calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
return;
}
calld->CreateSubchannelCall(elem);
}
// A class to handle the call combiner cancellation callback for a
// queued pick.
class CallData::QueuedPickCanceller {
public:
explicit QueuedPickCanceller(grpc_call_element* elem) : elem_(elem) {
auto* calld = static_cast<CallData*>(elem->call_data);
auto* chand = static_cast<ChannelData*>(elem->channel_data);
GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_combiner_scheduler(chand->data_plane_combiner()));
calld->call_combiner_->SetNotifyOnCancel(&closure_);
}
private:
static void CancelLocked(void* arg, grpc_error* error) {
auto* self = static_cast<QueuedPickCanceller*>(arg);
auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
auto* calld = static_cast<CallData*>(self->elem_->call_data);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: cancelling queued pick: "
"error=%s self=%p calld->pick_canceller=%p",
chand, calld, grpc_error_string(error), self,
calld->pick_canceller_);
}
if (calld->pick_canceller_ == self && error != GRPC_ERROR_NONE) {
// Remove pick from list of queued picks.
calld->RemoveCallFromQueuedPicksLocked(self->elem_);
// Fail pending batches on the call.
calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
YieldCallCombinerIfPendingBatchesFound);
}
GRPC_CALL_STACK_UNREF(calld->owning_call_, "QueuedPickCanceller");
Delete(self);
}
grpc_call_element* elem_;
grpc_closure closure_;
};
void CallData::RemoveCallFromQueuedPicksLocked(grpc_call_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: removing from queued picks list",
chand, this);
}
chand->RemoveQueuedPick(&pick_, pollent_);
pick_queued_ = false;
// Lame the call combiner canceller.
pick_canceller_ = nullptr;
}
void CallData::AddCallToQueuedPicksLocked(grpc_call_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: adding to queued picks list", chand,
this);
}
pick_queued_ = true;
pick_.elem = elem;
chand->AddQueuedPick(&pick_, pollent_);
// Register call combiner cancellation callback.
pick_canceller_ = New<QueuedPickCanceller>(elem);
}
void CallData::ApplyServiceConfigToCallLocked(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
chand, this);
}
// Store a ref to the service_config in service_config_call_data_. Also, save
// a pointer to this in the call_context so that all future filters can access
// it.
service_config_call_data_ =
ServiceConfig::CallData(chand->service_config(), path_);
if (service_config_call_data_.service_config() != nullptr) {
call_context_[GRPC_SERVICE_CONFIG_CALL_DATA].value =
&service_config_call_data_;
method_params_ = static_cast<ClientChannelMethodParsedObject*>(
service_config_call_data_.GetMethodParsedObject(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
retry_throttle_data_ = chand->retry_throttle_data();
if (method_params_ != nullptr) {
// If the deadline from the service config is shorter than the one
// from the client API, reset the deadline timer.
if (chand->deadline_checking_enabled() && method_params_->timeout() != 0) {
const grpc_millis per_method_deadline =
grpc_timespec_to_millis_round_up(call_start_time_) +
method_params_->timeout();
if (per_method_deadline < deadline_) {
deadline_ = per_method_deadline;
grpc_deadline_state_reset(elem, deadline_);
}
}
// If the service config set wait_for_ready and the application
// did not explicitly set it, use the value from the service config.
uint32_t* send_initial_metadata_flags =
&pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata_flags;
if (method_params_->wait_for_ready().has_value() &&
!(*send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
if (method_params_->wait_for_ready().value()) {
*send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} else {
*send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
}
// If no retry policy, disable retries.
// TODO(roth): Remove this when adding support for transparent retries.
if (method_params_ == nullptr || method_params_->retry_policy() == nullptr) {
enable_retries_ = false;
}
}
void CallData::MaybeApplyServiceConfigToCallLocked(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Apply service config data to the call only once, and only if the
// channel has the data available.
if (GPR_LIKELY(chand->received_service_config_data() &&
!service_config_applied_)) {
service_config_applied_ = true;
ApplyServiceConfigToCallLocked(elem);
}
}
const char* PickResultName(LoadBalancingPolicy::PickResult result) {
switch (result) {
case LoadBalancingPolicy::PICK_COMPLETE:
return "COMPLETE";
case LoadBalancingPolicy::PICK_QUEUE:
return "QUEUE";
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE:
return "TRANSIENT_FAILURE";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
void CallData::StartPickLocked(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
CallData* calld = static_cast<CallData*>(elem->call_data);
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
GPR_ASSERT(calld->pick_.pick.connected_subchannel == nullptr);
GPR_ASSERT(calld->subchannel_call_ == nullptr);
// If this is a retry, use the send_initial_metadata payload that
// we've cached; otherwise, use the pending batch. The
// send_initial_metadata batch will be the first pending batch in the
// list, as set by GetBatchIndex() above.
// TODO(roth): What if the LB policy needs to add something to the
// call's initial metadata, and then there's a retry? We don't want
// the new metadata to be added twice. We might need to somehow
// allocate the subchannel batch earlier so that we can give the
// subchannel's copy of the metadata batch (which is copied for each
// attempt) to the LB policy instead the one from the parent channel.
calld->pick_.pick.initial_metadata =
calld->seen_send_initial_metadata_
? &calld->send_initial_metadata_
: calld->pending_batches_[0]
.batch->payload->send_initial_metadata.send_initial_metadata;
uint32_t* send_initial_metadata_flags =
calld->seen_send_initial_metadata_
? &calld->send_initial_metadata_flags_
: &calld->pending_batches_[0]
.batch->payload->send_initial_metadata
.send_initial_metadata_flags;
// Apply service config to call if needed.
calld->MaybeApplyServiceConfigToCallLocked(elem);
// When done, we schedule this closure to leave the data plane combiner.
GRPC_CLOSURE_INIT(&calld->pick_closure_, PickDone, elem,
grpc_schedule_on_exec_ctx);
// Attempt pick.
error = GRPC_ERROR_NONE;
auto pick_result = chand->picker()->Pick(&calld->pick_.pick, &error);
if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: LB pick returned %s (connected_subchannel=%p, "
"error=%s)",
chand, calld, PickResultName(pick_result),
calld->pick_.pick.connected_subchannel.get(),
grpc_error_string(error));
}
switch (pick_result) {
case LoadBalancingPolicy::PICK_TRANSIENT_FAILURE: {
// If we're shutting down, fail all RPCs.
grpc_error* disconnect_error = chand->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
GRPC_CLOSURE_SCHED(&calld->pick_closure_,
GRPC_ERROR_REF(disconnect_error));
break;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if ((*send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
// Retry if appropriate; otherwise, fail.
grpc_status_code status = GRPC_STATUS_OK;
grpc_error_get_status(error, calld->deadline_, &status, nullptr,
nullptr, nullptr);
if (!calld->enable_retries_ ||
!calld->MaybeRetry(elem, nullptr /* batch_data */, status,
nullptr /* server_pushback_md */)) {
grpc_error* new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to pick subchannel", &error, 1);
GRPC_ERROR_UNREF(error);
GRPC_CLOSURE_SCHED(&calld->pick_closure_, new_error);
}
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
break;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF(error);
}
// Fallthrough
case LoadBalancingPolicy::PICK_QUEUE:
if (!calld->pick_queued_) calld->AddCallToQueuedPicksLocked(elem);
break;
default: // PICK_COMPLETE
// Handle drops.
if (GPR_UNLIKELY(calld->pick_.pick.connected_subchannel == nullptr)) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
}
GRPC_CLOSURE_SCHED(&calld->pick_closure_, error);
if (calld->pick_queued_) calld->RemoveCallFromQueuedPicksLocked(elem);
}
}
} // namespace
} // namespace grpc_core
/*************************************************************************
* EXPORTED SYMBOLS
*/
using grpc_core::CallData;
using grpc_core::ChannelData;
const grpc_channel_filter grpc_client_channel_filter = {
CallData::StartTransportStreamOpBatch,
ChannelData::StartTransportOp,
sizeof(CallData),
CallData::Init,
CallData::SetPollent,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
ChannelData::GetChannelInfo,
"client-channel",
};
void grpc_client_channel_set_channelz_node(
grpc_channel_element* elem, grpc_core::channelz::ClientChannelNode* node) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->set_channelz_node(node);
}
void grpc_client_channel_populate_child_refs(
grpc_channel_element* elem,
grpc_core::channelz::ChildRefsList* child_subchannels,
grpc_core::channelz::ChildRefsList* child_channels) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->FillChildRefsForChannelz(child_subchannels, child_channels);
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element* elem, int try_to_connect) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
return chand->CheckConnectivityState(try_to_connect);
}
int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
return chand->NumExternalConnectivityWatchers();
}
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* closure,
grpc_closure* watcher_timer_init) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
return chand->AddExternalConnectivityWatcher(pollent, state, closure,
watcher_timer_init);
}
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
auto* calld = static_cast<CallData*>(elem->call_data);
return calld->subchannel_call();
}