blob: 876bd872655d8b399807faceefced2f75f6d467a [file] [log] [blame]
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include "src/core/client_channel/client_channel_filter.h"
#include "src/core/client_channel/retry_filter.h"
#include "src/core/client_channel/retry_service_config.h"
#include "src/core/client_channel/retry_throttle.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
class RetryFilter::LegacyCallData final {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure);
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
private:
class CallStackDestructionBarrier;
// Pending batches stored in call data.
struct PendingBatch {
// The pending batch. If nullptr, this slot is empty.
grpc_transport_stream_op_batch* batch = nullptr;
// Indicates whether payload for send ops has been cached in
// LegacyCallData.
bool send_ops_cached = false;
};
// State associated with each call attempt.
class CallAttempt final : public RefCounted<CallAttempt> {
public:
CallAttempt(LegacyCallData* calld, bool is_transparent_retry);
~CallAttempt() override;
bool lb_call_committed() const { return lb_call_committed_; }
// Constructs and starts whatever batches are needed on this call
// attempt.
void StartRetriableBatches();
// Frees cached send ops that have already been completed after
// committing the call.
void FreeCachedSendOpDataAfterCommit();
// Cancels the call attempt.
void CancelFromSurface(grpc_transport_stream_op_batch* cancel_batch);
private:
// State used for starting a retryable batch on the call attempt's LB call.
// This provides its own grpc_transport_stream_op_batch and other data
// structures needed to populate the ops in the batch.
// We allocate one struct on the arena for each attempt at starting a
// batch on a given LB call.
class BatchData final
: public RefCounted<BatchData, PolymorphicRefCount, UnrefCallDtor> {
public:
BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
bool set_on_complete);
~BatchData() override;
grpc_transport_stream_op_batch* batch() { return &batch_; }
// Adds retriable send_initial_metadata op.
void AddRetriableSendInitialMetadataOp();
// Adds retriable send_message op.
void AddRetriableSendMessageOp();
// Adds retriable send_trailing_metadata op.
void AddRetriableSendTrailingMetadataOp();
// Adds retriable recv_initial_metadata op.
void AddRetriableRecvInitialMetadataOp();
// Adds retriable recv_message op.
void AddRetriableRecvMessageOp();
// Adds retriable recv_trailing_metadata op.
void AddRetriableRecvTrailingMetadataOp();
// Adds cancel_stream op.
void AddCancelStreamOp(grpc_error_handle error);
private:
// Frees cached send ops that were completed by the completed batch in
// batch_data. Used when batches are completed after the call is
// committed.
void FreeCachedSendOpDataForCompletedBatch();
// If there is a pending recv_initial_metadata op, adds a closure
// to closures for recv_initial_metadata_ready.
void MaybeAddClosureForRecvInitialMetadataCallback(
grpc_error_handle error, CallCombinerClosureList* closures);
// Intercepts recv_initial_metadata_ready callback for retries.
// Commits the call and returns the initial metadata up the stack.
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
// If there is a pending recv_message op, adds a closure to closures
// for recv_message_ready.
void MaybeAddClosureForRecvMessageCallback(
grpc_error_handle error, CallCombinerClosureList* closures);
// Intercepts recv_message_ready callback for retries.
// Commits the call and returns the message up the stack.
static void RecvMessageReady(void* arg, grpc_error_handle error);
// If there is a pending recv_trailing_metadata op, adds a closure to
// closures for recv_trailing_metadata_ready.
void MaybeAddClosureForRecvTrailingMetadataReady(
grpc_error_handle error, CallCombinerClosureList* closures);
// Adds any necessary closures for deferred batch completion
// callbacks to closures.
void AddClosuresForDeferredCompletionCallbacks(
CallCombinerClosureList* closures);
// For any pending batch containing an op that has not yet been started,
// adds the pending batch's completion closures to closures.
void AddClosuresToFailUnstartedPendingBatches(
grpc_error_handle error, CallCombinerClosureList* closures);
// Runs necessary closures upon completion of a call attempt.
void RunClosuresForCompletedCall(grpc_error_handle error);
// Intercepts recv_trailing_metadata_ready callback for retries.
// Commits the call and returns the trailing metadata up the stack.
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
// Adds the on_complete closure for the pending batch completed in
// batch_data to closures.
void AddClosuresForCompletedPendingBatch(
grpc_error_handle error, CallCombinerClosureList* closures);
// If there are any cached ops to replay or pending ops to start on the
// LB call, adds them to closures.
void AddClosuresForReplayOrPendingSendOps(
CallCombinerClosureList* closures);
// Callback used to intercept on_complete from LB calls.
static void OnComplete(void* arg, grpc_error_handle error);
// Callback used to handle on_complete for internally generated
// cancel_stream op.
static void OnCompleteForCancelOp(void* arg, grpc_error_handle error);
// This DOES hold a ref, but it cannot be a RefCountedPtr<>, because
// our dtor unrefs the owning call, which may delete the arena in
// which we are allocated, which means that running the dtor of any
// data members after that would cause a crash.
CallAttempt* call_attempt_;
// The batch to use in the LB call.
// Its payload field points to CallAttempt::batch_payload_.
grpc_transport_stream_op_batch batch_;
// For intercepting on_complete.
grpc_closure on_complete_;
};
// Creates a BatchData object on the call's arena with the
// specified refcount. If set_on_complete is true, the batch's
// on_complete callback will be set to point to on_complete();
// otherwise, the batch's on_complete callback will be null.
BatchData* CreateBatch(int refcount, bool set_on_complete) {
return calld_->arena_->New<BatchData>(Ref(DEBUG_LOCATION, "CreateBatch"),
refcount, set_on_complete);
}
// If there are any cached send ops that need to be replayed on this
// call attempt, creates and returns a new batch to replay those ops.
// Otherwise, returns nullptr.
BatchData* MaybeCreateBatchForReplay();
// Adds a closure to closures that will execute batch in the call combiner.
void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
const char* reason,
CallCombinerClosureList* closures);
// Helper function used to start a recv_trailing_metadata batch. This
// is used in the case where a recv_initial_metadata or recv_message
// op fails in a way that we know the call is over but when the application
// has not yet started its own recv_trailing_metadata op.
void AddBatchForInternalRecvTrailingMetadata(
CallCombinerClosureList* closures);
// Adds a batch to closures to cancel this call attempt, if
// cancellation has not already been sent on the LB call.
void MaybeAddBatchForCancelOp(grpc_error_handle error,
CallCombinerClosureList* closures);
// Adds batches for pending batches to closures.
void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
// Adds whatever batches are needed on this attempt to closures.
void AddRetriableBatches(CallCombinerClosureList* closures);
// Returns true if any send op in the batch was not yet started on this
// attempt.
bool PendingBatchContainsUnstartedSendOps(PendingBatch* pending);
// Returns true if there are cached send ops to replay.
bool HaveSendOpsToReplay();
// If our retry state is no longer needed, switch to fast path by moving
// our LB call into calld_->committed_call_ and having calld_ drop
// its ref to us.
void MaybeSwitchToFastPath();
// Returns true if the call should be retried.
bool ShouldRetry(absl::optional<grpc_status_code> status,
absl::optional<Duration> server_pushback_ms);
// Abandons the call attempt. Unrefs any deferred batches.
void Abandon();
void OnPerAttemptRecvTimer();
static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
void MaybeCancelPerAttemptRecvTimer();
LegacyCallData* calld_;
OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
bool lb_call_committed_ = false;
grpc_closure on_per_attempt_recv_timer_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
per_attempt_recv_timer_handle_;
// BatchData.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload_;
// For send_initial_metadata.
grpc_metadata_batch send_initial_metadata_;
// For send_trailing_metadata.
grpc_metadata_batch send_trailing_metadata_;
// For intercepting recv_initial_metadata.
grpc_metadata_batch recv_initial_metadata_;
grpc_closure recv_initial_metadata_ready_;
bool trailing_metadata_available_ = false;
// For intercepting recv_message.
grpc_closure recv_message_ready_;
absl::optional<SliceBuffer> recv_message_;
uint32_t recv_message_flags_;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata_;
grpc_transport_stream_stats collect_stats_;
grpc_closure recv_trailing_metadata_ready_;
// These fields indicate which ops have been started and completed on
// this call attempt.
size_t started_send_message_count_ = 0;
size_t completed_send_message_count_ = 0;
size_t started_recv_message_count_ = 0;
size_t completed_recv_message_count_ = 0;
bool started_send_initial_metadata_ : 1;
bool completed_send_initial_metadata_ : 1;
bool started_send_trailing_metadata_ : 1;
bool completed_send_trailing_metadata_ : 1;
bool started_recv_initial_metadata_ : 1;
bool completed_recv_initial_metadata_ : 1;
bool started_recv_trailing_metadata_ : 1;
bool completed_recv_trailing_metadata_ : 1;
bool sent_cancel_stream_ : 1;
// State for callback processing.
RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
grpc_error_handle recv_initial_metadata_error_;
RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
grpc_error_handle recv_message_error_;
struct OnCompleteDeferredBatch {
OnCompleteDeferredBatch(RefCountedPtr<BatchData> batch,
grpc_error_handle error)
: batch(std::move(batch)), error(error) {}
RefCountedPtr<BatchData> batch;
grpc_error_handle error;
};
// There cannot be more than 3 pending send op batches at a time.
absl::InlinedVector<OnCompleteDeferredBatch, 3>
on_complete_deferred_batches_;
RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
grpc_error_handle recv_trailing_metadata_error_;
bool seen_recv_trailing_metadata_from_surface_ : 1;
// NOTE: Do not move this next to the metadata bitfields above. That would
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
// fields upon setting this field.
bool abandoned_ : 1;
};
LegacyCallData(RetryFilter* chand, const grpc_call_element_args& args);
~LegacyCallData();
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
void PendingBatchClear(PendingBatch* pending);
void MaybeClearPendingBatch(PendingBatch* pending);
static void FailPendingBatchInCallCombiner(void* arg,
grpc_error_handle error);
// Fails all pending batches. Does NOT yield call combiner.
void PendingBatchesFail(grpc_error_handle error);
// Returns a pointer to the first pending batch for which predicate(batch)
// returns true, or null if not found.
template <typename Predicate>
PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
// Caches data for send ops so that it can be retried later, if not
// already cached.
void MaybeCacheSendOpsForBatch(PendingBatch* pending);
void FreeCachedSendInitialMetadata();
// Frees cached send_message at index idx.
void FreeCachedSendMessage(size_t idx);
void FreeCachedSendTrailingMetadata();
void FreeAllCachedSendOpData();
// Commits the call so that no further retry attempts will be performed.
void RetryCommit(CallAttempt* call_attempt);
// Starts a timer to retry after appropriate back-off.
// If server_pushback is nullopt, retry_backoff_ is used.
void StartRetryTimer(absl::optional<Duration> server_pushback);
void OnRetryTimer();
static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/);
// Adds a closure to closures to start a transparent retry.
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry);
void CreateCallAttempt(bool is_transparent_retry);
RetryFilter* chand_;
grpc_polling_entity* pollent_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
const internal::RetryMethodConfig* retry_policy_ = nullptr;
BackOff retry_backoff_;
grpc_slice path_; // Request path.
Timestamp deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_error_handle cancelled_from_surface_;
RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
// TODO(roth): As part of implementing hedging, we will need to maintain a
// list of all pending attempts, so that we can cancel them all if the call
// gets cancelled.
RefCountedPtr<CallAttempt> call_attempt_;
// LB call used when we've committed to a call attempt and the retry
// state for that attempt is no longer needed. This provides a fast
// path for long-running streaming calls that minimizes overhead.
OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
committed_call_;
// When are are not yet fully committed to a particular call (i.e.,
// either we might still retry or we have committed to the call but
// there are still some cached ops to be replayed on the call),
// batches received from above will be added to this list, and they
// will not be removed until we have invoked their completion callbacks.
size_t bytes_buffered_for_retry_ = 0;
PendingBatch pending_batches_[MAX_PENDING_BATCHES];
bool pending_send_initial_metadata_ : 1;
bool pending_send_message_ : 1;
bool pending_send_trailing_metadata_ : 1;
// Retry state.
bool retry_committed_ : 1;
bool retry_codepath_started_ : 1;
bool sent_transparent_retry_not_seen_by_server_ : 1;
int num_attempts_completed_ = 0;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
retry_timer_handle_;
grpc_closure retry_closure_;
// Cached data for retrying send ops.
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
// use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
struct CachedSendMessage {
SliceBuffer* slices;
uint32_t flags;
};
absl::InlinedVector<CachedSendMessage, 3> send_messages_;
// send_trailing_metadata
bool seen_send_trailing_metadata_ = false;
grpc_metadata_batch send_trailing_metadata_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H