blob: 01abaab780768093cbf91bdb19be1ade0734264c [file] [log] [blame]
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include <limits.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/http2_errors.h"
// TODO(juanlishen): The idle filter is disabled in client channel by default
// due to b/143502997. Try to fix the bug and enable the filter by default.
#define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
// The user input idle timeout smaller than this would be capped to it.
#define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)
namespace grpc_core {
TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
#define GRPC_IDLE_FILTER_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
} \
} while (0)
namespace {
/*
client_idle_filter maintains a state tracking if there are active calls in the
channel and its internal idle_timer_. The states are specified as following:
+--------------------------------------------+-------------+---------+
| ChannelState | idle_timer_ | channel |
+--------------------------------------------+-------------+---------+
| IDLE | unset | idle |
| CALLS_ACTIVE | unset | busy |
| TIMER_PENDING | set-valid | idle |
| TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy |
| TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle |
+--------------------------------------------+-------------+---------+
IDLE: The initial state of the client_idle_filter, indicating the channel is
in IDLE.
CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
TIMER_PENDING: The state after the timer is set and no calls have arrived
after the timer is set. The channel must have 0 active call in this state. If
the timer is fired in this state, the channel will go into IDLE state.
TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
call has arrived after the timer is set. The channel must have 1 or 1+ active
calls in this state. If the timer is fired in this state, we won't reschedule
it.
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
and at least one call has arrived after the timer is set, BUT the channel
currently has 0 active call. If the timer is fired in this state, we will
reschedule it according to the finish time of the latest call.
PROCESSING: The state set to block other threads when the setting thread is
doing some work to keep state consistency.
idle_timer_ will not be cancelled (unless the channel is shutting down).
If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
changed.
State transitions:
IDLE
| ^
--------------------------------- *
| *
v *
CALLS_ACTIVE =================> TIMER_PENDING
^ | ^
* ------------------------------ *
* | *
* v *
TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
^ |
| |
---------------------------------
---> Triggered by IncreaseCallCount()
===> Triggered by DecreaseCallCount()
***> Triggered by IdleTimerCallback()
*/
enum ChannelState {
IDLE,
CALLS_ACTIVE,
TIMER_PENDING,
TIMER_PENDING_CALLS_ACTIVE,
TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
PROCESSING
};
grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
return GPR_MAX(
grpc_channel_arg_get_integer(
grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
{DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
MIN_IDLE_TIMEOUT_MS);
}
class ChannelData {
public:
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args);
static void Destroy(grpc_channel_element* elem);
static void StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op);
void IncreaseCallCount();
void DecreaseCallCount();
private:
ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
grpc_error_handle* error);
~ChannelData() = default;
static void IdleTimerCallback(void* arg, grpc_error_handle error);
static void IdleTransportOpCompleteCallback(void* arg,
grpc_error_handle error);
void StartIdleTimer();
void EnterIdle();
grpc_channel_element* elem_;
// The channel stack to which we take refs for pending callbacks.
grpc_channel_stack* channel_stack_;
// Timeout after the last RPC finishes on the client channel at which the
// channel goes back into IDLE state.
const grpc_millis client_idle_timeout_;
// Member data used to track the state of channel.
grpc_millis last_idle_time_;
Atomic<intptr_t> call_count_{0};
Atomic<ChannelState> state_{IDLE};
// Idle timer and its callback closure.
grpc_timer idle_timer_;
grpc_closure idle_timer_callback_;
// The transport op telling the client channel to enter IDLE.
grpc_transport_op idle_transport_op_;
grpc_closure idle_transport_op_complete_callback_;
};
grpc_error_handle ChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
grpc_error_handle error = GRPC_ERROR_NONE;
new (elem->channel_data) ChannelData(elem, args, &error);
return error;
}
void ChannelData::Destroy(grpc_channel_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->~ChannelData();
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
grpc_transport_op* op) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
// Catch the disconnect_with_error transport op.
if (op->disconnect_with_error != nullptr) {
// IncreaseCallCount() introduces a phony call and prevent the timer from
// being reset by other threads.
chand->IncreaseCallCount();
// If the timer has been set, cancel the timer.
// No synchronization issues here. grpc_timer_cancel() is valid as long as
// the timer has been init()ed before.
grpc_timer_cancel(&chand->idle_timer_);
}
// Pass the op to the next filter.
grpc_channel_next_op(elem, op);
}
void ChannelData::IncreaseCallCount() {
const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
previous_value + 1);
if (previous_value == 0) {
// This call is the one that makes the channel busy.
// Loop here to make sure the previous decrease operation has finished.
ChannelState state = state_.Load(MemoryOrder::RELAXED);
while (true) {
switch (state) {
// Timer has not been set. Switch to CALLS_ACTIVE.
case IDLE:
// In this case, no other threads will modify the state, so we can
// just store the value.
state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
return;
// Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
case TIMER_PENDING:
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
// At this point, the state may have been switched to IDLE by the
// idle timer callback. Therefore, use CAS operation to change the
// state atomically.
// Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
// been properly set in DecreaseCallCount().
if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
MemoryOrder::ACQUIRE,
MemoryOrder::RELAXED)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
break;
}
}
}
}
void ChannelData::DecreaseCallCount() {
const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
previous_value - 1);
if (previous_value == 1) {
// This call is the one that makes the channel idle.
// last_idle_time_ does not need to be Atomic<> because busy-loops in
// IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
// prevent multiple threads from simultaneously accessing this variable.
last_idle_time_ = ExecCtx::Get()->Now();
ChannelState state = state_.Load(MemoryOrder::RELAXED);
while (true) {
switch (state) {
// Timer has not been set. Set the timer and switch to TIMER_PENDING
case CALLS_ACTIVE:
// Release store here to make other threads see the updated value of
// last_idle_time_.
StartIdleTimer();
state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
return;
// Timer has been set. Switch to
// TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
case TIMER_PENDING_CALLS_ACTIVE:
// At this point, the state may have been switched to CALLS_ACTIVE by
// the idle timer callback. Therefore, use CAS operation to change the
// state atomically.
// Release store here to make the idle timer callback see the updated
// value of last_idle_time_ to properly reset the idle timer.
if (state_.CompareExchangeWeak(
&state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
break;
}
}
}
}
ChannelData::ChannelData(grpc_channel_element* elem,
grpc_channel_element_args* args,
grpc_error_handle* /*error*/)
: elem_(elem),
channel_stack_(args->channel_stack),
client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
// If the idle filter is explicitly disabled in channel args, this ctor should
// not get called.
GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
client_idle_timeout_);
// Initialize the idle timer without setting it.
grpc_timer_init_unset(&idle_timer_);
// Initialize the idle timer callback closure.
GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
grpc_schedule_on_exec_ctx);
// Initialize the idle transport op complete callback.
GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
IdleTransportOpCompleteCallback, this,
grpc_schedule_on_exec_ctx);
}
void ChannelData::IdleTimerCallback(void* arg, grpc_error_handle error) {
GRPC_IDLE_FILTER_LOG("timer alarms");
ChannelData* chand = static_cast<ChannelData*>(arg);
if (error != GRPC_ERROR_NONE) {
GRPC_IDLE_FILTER_LOG("timer canceled");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
return;
}
bool finished = false;
ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
while (!finished) {
switch (state) {
case TIMER_PENDING:
// Change the state to PROCESSING to block IncreaseCallCout() until the
// EnterIdle() operation finishes, preventing mistakenly entering IDLE
// when active RPC exists.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
if (finished) {
chand->EnterIdle();
chand->state_.Store(IDLE, MemoryOrder::RELAXED);
}
break;
case TIMER_PENDING_CALLS_ACTIVE:
finished = chand->state_.CompareExchangeWeak(
&state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
break;
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
// Change the state to PROCESSING to block IncreaseCallCount() until the
// StartIdleTimer() operation finishes, preventing mistakenly restarting
// the timer after grpc_timer_cancel() when shutdown.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
if (finished) {
chand->StartIdleTimer();
chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = chand->state_.Load(MemoryOrder::RELAXED);
break;
}
}
GRPC_IDLE_FILTER_LOG("timer finishes");
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
}
void ChannelData::IdleTransportOpCompleteCallback(void* arg,
grpc_error_handle /*error*/) {
ChannelData* chand = static_cast<ChannelData*>(arg);
GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
}
void ChannelData::StartIdleTimer() {
GRPC_IDLE_FILTER_LOG("timer has started");
// Hold a ref to the channel stack for the timer callback.
GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
&idle_timer_callback_);
}
void ChannelData::EnterIdle() {
GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
// Hold a ref to the channel stack for the transport op.
GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
// Initialize the transport op.
idle_transport_op_ = {};
idle_transport_op_.disconnect_with_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
// Pass the transport op down to the channel stack.
grpc_channel_next_op(elem_, &idle_transport_op_);
}
class CallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_schedule_closure);
};
grpc_error_handle CallData::Init(grpc_call_element* elem,
const grpc_call_element_args* /*args*/) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->IncreaseCallCount();
return GRPC_ERROR_NONE;
}
void CallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
chand->DecreaseCallCount();
}
const grpc_channel_filter grpc_client_idle_filter = {
grpc_call_next_op,
ChannelData::StartTransportOp,
sizeof(CallData),
CallData::Init,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
CallData::Destroy,
sizeof(ChannelData),
ChannelData::Init,
ChannelData::Destroy,
grpc_channel_next_get_info,
"client_idle"};
static bool MaybeAddClientIdleFilter(grpc_channel_stack_builder* builder,
void* /*arg*/) {
const grpc_channel_args* channel_args =
grpc_channel_stack_builder_get_channel_arguments(builder);
if (!grpc_channel_args_want_minimal_stack(channel_args) &&
GetClientIdleTimeout(channel_args) != INT_MAX) {
return grpc_channel_stack_builder_prepend_filter(
builder, &grpc_client_idle_filter, nullptr, nullptr);
} else {
return true;
}
}
} // namespace
} // namespace grpc_core
void grpc_client_idle_filter_init(void) {
grpc_channel_init_register_stage(
GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_core::MaybeAddClientIdleFilter, nullptr);
}
void grpc_client_idle_filter_shutdown(void) {}