blob: c2218ed5c467e02635044bbae62d362be44a8bb3 [file] [log] [blame]
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(ctiller): Add a unit test suite for these filters once it's practical to
// mock transport operations.
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/channel_idle/channel_idle_filter.h"
#include <stdlib.h>
#include <functional>
#include <utility>
#include "absl/types/optional.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/sleep.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/http2_errors.h"
namespace grpc_core {
namespace {
// TODO(ctiller): The idle filter was disabled in client channel by default
// due to b/143502997. Now the bug is fixed enable the filter by default.
const auto kDefaultIdleTimeout = Duration::Infinity();
// If these settings change, make sure that we are not sending a GOAWAY for
// inproc transport, since a GOAWAY to inproc ends up destroying the transport.
const auto kDefaultMaxConnectionAge = Duration::Infinity();
const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity();
const auto kDefaultMaxConnectionIdle = Duration::Infinity();
const auto kMaxConnectionAgeJitter = 0.1;
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
} // namespace
#define GRPC_IDLE_FILTER_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
} \
} while (0)
namespace {
Duration GetClientIdleTimeout(const ChannelArgs& args) {
return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
.value_or(kDefaultIdleTimeout);
}
} // namespace
struct MaxAgeFilter::Config {
Duration max_connection_age;
Duration max_connection_idle;
Duration max_connection_age_grace;
bool enable() const {
return max_connection_age != Duration::Infinity() ||
max_connection_idle != Duration::Infinity();
}
/* A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out
connection storms. Note that the MAX_CONNECTION_AGE option without jitter
would not create connection storms by itself, but if there happened to be a
connection storm it could cause it to repeat at a fixed period. */
static Config FromChannelArgs(const ChannelArgs& args) {
const Duration args_max_age =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS)
.value_or(kDefaultMaxConnectionAge);
const Duration args_max_idle =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
.value_or(kDefaultMaxConnectionIdle);
const Duration args_max_age_grace =
args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)
.value_or(kDefaultMaxConnectionAgeGrace);
/* generate a random number between 1 - kMaxConnectionAgeJitter and
1 + kMaxConnectionAgeJitter */
const double multiplier =
rand() * kMaxConnectionAgeJitter * 2.0 / RAND_MAX + 1.0 -
kMaxConnectionAgeJitter;
/* GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
will not be cast to int implicitly before the comparison. */
return Config{args_max_age * multiplier, args_max_idle, args_max_age_grace};
}
};
absl::StatusOr<ClientIdleFilter> ClientIdleFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
ClientIdleFilter filter(filter_args.channel_stack(),
GetClientIdleTimeout(args));
return absl::StatusOr<ClientIdleFilter>(std::move(filter));
}
absl::StatusOr<MaxAgeFilter> MaxAgeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args filter_args) {
MaxAgeFilter filter(filter_args.channel_stack(),
Config::FromChannelArgs(args));
return absl::StatusOr<MaxAgeFilter>(std::move(filter));
}
void MaxAgeFilter::Shutdown() {
max_age_activity_.Reset();
ChannelIdleFilter::Shutdown();
}
void MaxAgeFilter::PostInit() {
struct StartupClosure {
RefCountedPtr<grpc_channel_stack> channel_stack;
MaxAgeFilter* filter;
grpc_closure closure;
};
auto run_startup = [](void* p, grpc_error_handle) {
auto* startup = static_cast<StartupClosure*>(p);
// Trigger idle timer
startup->filter->IncreaseCallCount();
startup->filter->DecreaseCallCount();
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch.reset(
new ConnectivityWatcher(startup->filter));
op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_channel_next_op(
grpc_channel_stack_element(startup->channel_stack.get(), 0), op);
delete startup;
};
auto* startup =
new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}};
GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &startup->closure, GRPC_ERROR_NONE);
auto channel_stack = this->channel_stack()->Ref();
// Start the max age timer
if (max_connection_age_ != Duration::Infinity()) {
max_age_activity_.Set(MakeActivity(
TrySeq(
// First sleep until the max connection age
Sleep(ExecCtx::Get()->Now() + max_connection_age_),
// Then send a goaway.
[this] {
GRPC_CHANNEL_STACK_REF(this->channel_stack(),
"max_age send_goaway");
// Jump out of the activity to send the goaway.
auto fn = [](void* arg, grpc_error_handle) {
auto* channel_stack = static_cast<grpc_channel_stack*>(arg);
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"),
GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
grpc_channel_element* elem =
grpc_channel_stack_element(channel_stack, 0);
elem->filter->start_transport_op(elem, op);
GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway");
};
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr),
GRPC_ERROR_NONE);
return Immediate(absl::OkStatus());
},
// Sleep for the grace period
[this] {
return Sleep(ExecCtx::Get()->Now() + max_connection_age_grace_);
}),
ExecCtxWakeupScheduler(), [channel_stack, this](absl::Status status) {
// OnDone -- close the connection if the promise completed
// successfully.
// (if it did not, it was cancelled)
if (status.ok()) CloseChannel();
}));
}
}
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> ChannelIdleFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
using Decrementer = std::unique_ptr<ChannelIdleFilter, CallCountDecreaser>;
IncreaseCallCount();
return ArenaPromise<ServerMetadataHandle>(
[decrementer = Decrementer(this),
next = next_promise_factory(std::move(call_args))]() mutable
-> Poll<ServerMetadataHandle> { return next(); });
}
bool ChannelIdleFilter::StartTransportOp(grpc_transport_op* op) {
// Catch the disconnect_with_error transport op.
if (!GRPC_ERROR_IS_NONE(op->disconnect_with_error)) Shutdown();
// Pass the op to the next filter.
return false;
}
void ChannelIdleFilter::Shutdown() {
// IncreaseCallCount() introduces a phony call and prevent the timer from
// being reset by other threads.
IncreaseCallCount();
activity_.Reset();
}
void ChannelIdleFilter::IncreaseCallCount() {
idle_filter_state_->IncreaseCallCount();
}
void ChannelIdleFilter::DecreaseCallCount() {
if (idle_filter_state_->DecreaseCallCount()) {
// If there are no more calls in progress, start the idle timer.
StartIdleTimer();
}
}
void ChannelIdleFilter::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
auto idle_filter_state = idle_filter_state_;
// Hold a ref to the channel stack for the timer callback.
auto channel_stack = channel_stack_->Ref();
auto timeout = client_idle_timeout_;
auto promise = Loop([timeout, idle_filter_state]() {
return TrySeq(Sleep(ExecCtx::Get()->Now() + timeout),
[idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
if (idle_filter_state->CheckTimer()) {
return Continue{};
} else {
return absl::OkStatus();
}
});
});
activity_.Set(MakeActivity(std::move(promise), ExecCtxWakeupScheduler{},
[channel_stack, this](absl::Status status) {
if (status.ok()) CloseChannel();
}));
}
void ChannelIdleFilter::CloseChannel() {
auto* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
// Pass the transport op down to the channel stack.
auto* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
const grpc_channel_filter ClientIdleFilter::kFilter =
MakePromiseBasedFilter<ClientIdleFilter, FilterEndpoint::kClient>(
"client_idle");
const grpc_channel_filter MaxAgeFilter::kFilter =
MakePromiseBasedFilter<MaxAgeFilter, FilterEndpoint::kServer>("max_age");
void RegisterChannelIdleFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
GetClientIdleTimeout(channel_args) != Duration::Infinity()) {
builder->PrependFilter(&ClientIdleFilter::kFilter);
}
return true;
});
builder->channel_init()->RegisterStage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
auto channel_args = builder->channel_args();
if (!channel_args.WantMinimalStack() &&
MaxAgeFilter::Config::FromChannelArgs(channel_args).enable()) {
builder->PrependFilter(&MaxAgeFilter::kFilter);
}
return true;
});
}
MaxAgeFilter::MaxAgeFilter(grpc_channel_stack* channel_stack,
const Config& max_age_config)
: ChannelIdleFilter(channel_stack, max_age_config.max_connection_idle),
max_connection_age_(max_age_config.max_connection_age),
max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
} // namespace grpc_core