blob: 8944f5d9d15306812052eeee2dec0de38268d118 [file] [log] [blame]
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/deadline/deadline_filter.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/channel_init.h"
namespace grpc_core {
// A fire-and-forget class representing a pending deadline timer.
// Allocated on the call arena.
class TimerState {
public:
TimerState(grpc_call_element* elem, grpc_millis deadline) : elem_(elem) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem_->call_data);
GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
grpc_timer_init(&timer_, deadline, &closure_);
}
void Cancel() { grpc_timer_cancel(&timer_); }
private:
// The on_complete callback used when sending a cancel_error batch down the
// filter stack. Yields the call combiner when the batch returns.
static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"got on_complete from cancel_stream batch");
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
}
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void SendCancelOpInCallCombiner(void* arg, grpc_error_handle error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_REF(error);
self->elem_->filter->start_transport_stream_op_batch(self->elem_, batch);
}
// Timer callback.
static void TimerCallback(void* arg, grpc_error_handle error) {
TimerState* self = static_cast<TimerState*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(self->elem_->call_data);
if (error != GRPC_ERROR_CANCELLED) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
nullptr);
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &self->closure_,
error,
"deadline exceeded -- sending cancel_stream op");
} else {
GRPC_CALL_STACK_UNREF(deadline_state->call_stack, "DeadlineTimerState");
}
}
// NOTE: This object's dtor is never called, so do not add any data
// members that require destruction!
// TODO(roth): We should ideally call this object's dtor somewhere,
// but that would require adding more synchronization, because we'd
// need to call the dtor only after both (a) the timer callback
// finishes and (b) the filter sees the call completion and attempts
// to cancel the timer.
grpc_call_element* elem_;
grpc_timer timer_;
grpc_closure closure_;
};
} // namespace grpc_core
//
// grpc_deadline_state
//
// Starts the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void start_timer_if_needed(grpc_call_element* elem,
grpc_millis deadline) {
if (deadline == GRPC_MILLIS_INF_FUTURE) {
return;
}
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
GPR_ASSERT(deadline_state->timer_state == nullptr);
deadline_state->timer_state =
deadline_state->arena->New<grpc_core::TimerState>(elem, deadline);
}
// Cancels the deadline timer.
// This is called via the call combiner, so access to deadline_state is
// synchronized.
static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
if (deadline_state->timer_state != nullptr) {
deadline_state->timer_state->Cancel();
deadline_state->timer_state = nullptr;
}
}
// Callback run when we receive trailing metadata.
static void recv_trailing_metadata_ready(void* arg, grpc_error_handle error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
// Invoke the original callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Inject our own recv_trailing_metadata_ready callback into op.
static void inject_recv_trailing_metadata_ready(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
deadline_state->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
start_timer_after_init_state(grpc_call_element* elem, grpc_millis deadline)
: elem(elem), deadline(deadline) {}
~start_timer_after_init_state() { start_timer_if_needed(elem, deadline); }
bool in_call_combiner = false;
grpc_call_element* elem;
grpc_millis deadline;
grpc_closure closure;
};
static void start_timer_after_init(void* arg, grpc_error_handle error) {
struct start_timer_after_init_state* state =
static_cast<struct start_timer_after_init_state*>(arg);
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(state->elem->call_data);
if (!state->in_call_combiner) {
// We are initially called without holding the call combiner, so we
// need to bounce ourselves into it.
state->in_call_combiner = true;
GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
GRPC_ERROR_REF(error),
"scheduling deadline timer");
return;
}
delete state;
GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
"done scheduling deadline timer");
}
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
const grpc_call_element_args& args,
grpc_millis deadline)
: call_stack(args.call_stack),
call_combiner(args.call_combiner),
arena(args.arena) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
if (deadline != GRPC_MILLIS_INF_FUTURE) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
// until after the call stack is fully initialized. If we start the
// timer here, we have no guarantee that the timer won't pop before
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state =
new start_timer_after_init_state(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE);
}
}
grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
void grpc_deadline_state_reset(grpc_call_element* elem,
grpc_millis new_deadline) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
cancel_timer_if_needed(deadline_state);
start_timer_if_needed(elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
//
// filter code
//
// Constructor for channel_data. Used for both client and server filters.
static grpc_error_handle deadline_init_channel_elem(
grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
return GRPC_ERROR_NONE;
}
// Destructor for channel_data. Used for both client and server filters.
static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
// Call data used for both client and server filter.
typedef struct base_call_data {
grpc_deadline_state deadline_state;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
// Constructor for call_data. Used for both client and server filters.
static grpc_error_handle deadline_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) {
new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
return GRPC_ERROR_NONE;
}
// Destructor for call_data. Used for both client and server filters.
static void deadline_destroy_call_elem(
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
grpc_deadline_state* deadline_state =
static_cast<grpc_deadline_state*>(elem->call_data);
deadline_state->~grpc_deadline_state();
}
// Method for starting a call op for client filter.
static void deadline_client_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, op);
// Chain to next filter.
grpc_call_next_op(elem, op);
}
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
// Method for starting a call op for server filter.
static void deadline_server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
if (op->cancel_stream) {
cancel_timer_if_needed(&calld->base.deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
// own callback into that hook.
if (op->recv_initial_metadata) {
calld->next_recv_initial_metadata_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
grpc_call_next_op(elem, op);
}
const grpc_channel_filter grpc_client_deadline_filter = {
deadline_client_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(base_call_data),
deadline_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
deadline_destroy_call_elem,
0, // sizeof(channel_data)
deadline_init_channel_elem,
deadline_destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
const grpc_channel_filter grpc_server_deadline_filter = {
deadline_server_start_transport_stream_op_batch,
grpc_channel_next_op,
sizeof(server_call_data),
deadline_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
deadline_destroy_call_elem,
0, // sizeof(channel_data)
deadline_init_channel_elem,
deadline_destroy_channel_elem,
grpc_channel_next_get_info,
"deadline",
};
bool grpc_deadline_checking_enabled(const grpc_channel_args* channel_args) {
return grpc_channel_arg_get_bool(
grpc_channel_args_find(channel_args, GRPC_ARG_ENABLE_DEADLINE_CHECKS),
!grpc_channel_args_want_minimal_stack(channel_args));
}
static bool maybe_add_deadline_filter(grpc_channel_stack_builder* builder,
void* arg) {
return grpc_deadline_checking_enabled(
grpc_channel_stack_builder_get_channel_arguments(builder))
? grpc_channel_stack_builder_prepend_filter(
builder, static_cast<const grpc_channel_filter*>(arg),
nullptr, nullptr)
: true;
}
void grpc_deadline_filter_init(void) {
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter,
const_cast<grpc_channel_filter*>(&grpc_client_deadline_filter));
grpc_channel_init_register_stage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
maybe_add_deadline_filter,
const_cast<grpc_channel_filter*>(&grpc_server_deadline_filter));
}
void grpc_deadline_filter_shutdown(void) {}