| // |
| // Copyright 2018 gRPC authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| |
| #include <grpc/support/port_platform.h> |
| |
| #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
| |
| #include <inttypes.h> |
| #include <limits.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <initializer_list> |
| #include <memory> |
| #include <new> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/base/attributes.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/cord.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/str_format.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| |
| #include <grpc/event_engine/event_engine.h> |
| #include <grpc/grpc.h> |
| #include <grpc/impl/connectivity_state.h> |
| #include <grpc/slice_buffer.h> |
| #include <grpc/status.h> |
| #include <grpc/support/alloc.h> |
| #include <grpc/support/log.h> |
| #include <grpc/support/time.h> |
| |
| #include "src/core/ext/transport/chttp2/transport/context_list.h" |
| #include "src/core/ext/transport/chttp2/transport/flow_control.h" |
| #include "src/core/ext/transport/chttp2/transport/frame.h" |
| #include "src/core/ext/transport/chttp2/transport/frame_data.h" |
| #include "src/core/ext/transport/chttp2/transport/frame_goaway.h" |
| #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h" |
| #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
| #include "src/core/ext/transport/chttp2/transport/http2_settings.h" |
| #include "src/core/ext/transport/chttp2/transport/http_trace.h" |
| #include "src/core/ext/transport/chttp2/transport/internal.h" |
| #include "src/core/ext/transport/chttp2/transport/stream_map.h" |
| #include "src/core/ext/transport/chttp2/transport/varint.h" |
| #include "src/core/lib/channel/call_tracer.h" |
| #include "src/core/lib/channel/channel_args.h" |
| #include "src/core/lib/channel/context.h" |
| #include "src/core/lib/debug/stats.h" |
| #include "src/core/lib/debug/stats_data.h" |
| #include "src/core/lib/experiments/experiments.h" |
| #include "src/core/lib/gpr/useful.h" |
| #include "src/core/lib/gprpp/bitset.h" |
| #include "src/core/lib/gprpp/crash.h" |
| #include "src/core/lib/gprpp/debug_location.h" |
| #include "src/core/lib/gprpp/ref_counted.h" |
| #include "src/core/lib/gprpp/status_helper.h" |
| #include "src/core/lib/gprpp/time.h" |
| #include "src/core/lib/http/parser.h" |
| #include "src/core/lib/iomgr/combiner.h" |
| #include "src/core/lib/iomgr/error.h" |
| #include "src/core/lib/iomgr/exec_ctx.h" |
| #include "src/core/lib/iomgr/iomgr_fwd.h" |
| #include "src/core/lib/iomgr/port.h" |
| #include "src/core/lib/iomgr/timer.h" |
| #include "src/core/lib/promise/poll.h" |
| #include "src/core/lib/resource_quota/arena.h" |
| #include "src/core/lib/resource_quota/memory_quota.h" |
| #include "src/core/lib/resource_quota/resource_quota.h" |
| #include "src/core/lib/resource_quota/trace.h" |
| #include "src/core/lib/slice/slice.h" |
| #include "src/core/lib/slice/slice_buffer.h" |
| #include "src/core/lib/slice/slice_internal.h" |
| #include "src/core/lib/transport/bdp_estimator.h" |
| #include "src/core/lib/transport/connectivity_state.h" |
| #include "src/core/lib/transport/error_utils.h" |
| #include "src/core/lib/transport/http2_errors.h" |
| #include "src/core/lib/transport/metadata_batch.h" |
| #include "src/core/lib/transport/status_conversion.h" |
| #include "src/core/lib/transport/transport.h" |
| #include "src/core/lib/transport/transport_impl.h" |
| |
| #ifdef GRPC_POSIX_SOCKET_TCP |
| #include "src/core/lib/iomgr/ev_posix.h" |
| #endif |
| |
| #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) |
| #define MAX_WINDOW 0x7fffffffu |
| #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) |
| #define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024) |
| |
| #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX |
| #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 // 20 seconds |
| #define DEFAULT_SERVER_KEEPALIVE_TIME_MS 7200000 // 2 hours |
| #define DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS 20000 // 20 seconds |
| #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false |
| #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2 |
| |
| #define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 // 5 minutes |
| #define DEFAULT_MAX_PINGS_BETWEEN_DATA 2 |
| #define DEFAULT_MAX_PING_STRIKES 2 |
| |
| #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000 |
| |
| static int g_default_client_keepalive_time_ms = |
| DEFAULT_CLIENT_KEEPALIVE_TIME_MS; |
| static int g_default_client_keepalive_timeout_ms = |
| DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS; |
| static int g_default_server_keepalive_time_ms = |
| DEFAULT_SERVER_KEEPALIVE_TIME_MS; |
| static int g_default_server_keepalive_timeout_ms = |
| DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS; |
| static bool g_default_client_keepalive_permit_without_calls = |
| DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; |
| static bool g_default_server_keepalive_permit_without_calls = |
| DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; |
| |
| static int g_default_min_recv_ping_interval_without_data_ms = |
| DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS; |
| static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA; |
| static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES; |
| |
| #define MAX_CLIENT_STREAM_ID 0x7fffffffu |
| grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive"); |
| grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false, |
| "chttp2_refcount"); |
| |
| // forward declarations of various callbacks that we'll build closures around |
| static void write_action_begin_locked(void* t, grpc_error_handle error); |
| static void write_action(void* t, grpc_error_handle error); |
| static void write_action_end(void* t, grpc_error_handle error); |
| static void write_action_end_locked(void* t, grpc_error_handle error); |
| |
| static void read_action(void* t, grpc_error_handle error); |
| static void read_action_locked(void* t, grpc_error_handle error); |
| static void continue_read_action_locked(grpc_chttp2_transport* t); |
| |
| // Set a transport level setting, and push it to our peer |
| static void queue_setting_update(grpc_chttp2_transport* t, |
| grpc_chttp2_setting_id id, uint32_t value); |
| |
| static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, |
| grpc_error_handle error); |
| |
| // Start new streams that have been created if we can |
| static void maybe_start_some_streams(grpc_chttp2_transport* t); |
| |
| static void connectivity_state_set(grpc_chttp2_transport* t, |
| grpc_connectivity_state state, |
| const absl::Status& status, |
| const char* reason); |
| |
| static void benign_reclaimer_locked(void* arg, grpc_error_handle error); |
| static void destructive_reclaimer_locked(void* arg, grpc_error_handle error); |
| |
| static void post_benign_reclaimer(grpc_chttp2_transport* t); |
| static void post_destructive_reclaimer(grpc_chttp2_transport* t); |
| |
| static void close_transport_locked(grpc_chttp2_transport* t, |
| grpc_error_handle error); |
| static void end_all_the_calls(grpc_chttp2_transport* t, |
| grpc_error_handle error); |
| |
| static void start_bdp_ping(void* tp, grpc_error_handle error); |
| static void finish_bdp_ping(void* tp, grpc_error_handle error); |
| static void start_bdp_ping_locked(void* tp, grpc_error_handle error); |
| static void finish_bdp_ping_locked(void* tp, grpc_error_handle error); |
| static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t); |
| static void next_bdp_ping_timer_expired_locked( |
| void* tp, GRPC_UNUSED grpc_error_handle error); |
| |
| static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error); |
| static void send_ping_locked(grpc_chttp2_transport* t, |
| grpc_closure* on_initiate, grpc_closure* on_ack); |
| static void retry_initiate_ping_locked(void* tp, |
| GRPC_UNUSED grpc_error_handle error); |
| |
| // keepalive-relevant functions |
| static void init_keepalive_ping(grpc_chttp2_transport* t); |
| static void init_keepalive_ping_locked(void* arg, |
| GRPC_UNUSED grpc_error_handle error); |
| static void start_keepalive_ping(void* arg, grpc_error_handle error); |
| static void finish_keepalive_ping(void* arg, grpc_error_handle error); |
| static void start_keepalive_ping_locked(void* arg, grpc_error_handle error); |
| static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error); |
| static void keepalive_watchdog_fired(grpc_chttp2_transport* t); |
| static void keepalive_watchdog_fired_locked( |
| void* arg, GRPC_UNUSED grpc_error_handle error); |
| static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t); |
| |
| namespace { |
| void MaybeRecordTransportAnnotation(grpc_chttp2_stream* s, |
| absl::string_view annotation) { |
| GPR_ASSERT(s->context); |
| if (!grpc_core::IsTraceRecordCallopsEnabled()) { |
| return; |
| } |
| auto* call_tracer = static_cast<grpc_core::CallTracerInterface*>( |
| static_cast<grpc_call_context_element*>( |
| s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE] |
| .value); |
| if (!call_tracer) { |
| return; |
| } |
| call_tracer->RecordAnnotation(annotation); |
| } |
| } // namespace |
| |
| namespace grpc_core { |
| |
| namespace { |
| TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr; |
| TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback = |
| nullptr; |
| bool test_only_disable_transient_failure_state_notification = false; |
| } // namespace |
| |
| void TestOnlySetGlobalHttp2TransportInitCallback( |
| TestOnlyGlobalHttp2TransportInitCallback callback) { |
| test_only_init_callback = callback; |
| } |
| |
| void TestOnlySetGlobalHttp2TransportDestructCallback( |
| TestOnlyGlobalHttp2TransportDestructCallback callback) { |
| test_only_destruct_callback = callback; |
| } |
| |
| void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification( |
| bool disable) { |
| test_only_disable_transient_failure_state_notification = disable; |
| } |
| |
| } // namespace grpc_core |
| |
| // |
| // CONSTRUCTION/DESTRUCTION/REFCOUNTING |
| // |
| |
| grpc_chttp2_transport::~grpc_chttp2_transport() { |
| size_t i; |
| |
| event_engine.reset(); |
| |
| if (channelz_socket != nullptr) { |
| channelz_socket.reset(); |
| } |
| |
| grpc_endpoint_destroy(ep); |
| |
| grpc_slice_buffer_destroy(&qbuf); |
| |
| grpc_slice_buffer_destroy(&outbuf); |
| |
| grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed"); |
| // ContextList::Execute follows semantics of a callback function and does not |
| // take a ref on error |
| grpc_core::ContextList::Execute(cl, nullptr, error); |
| cl = nullptr; |
| |
| grpc_slice_buffer_destroy(&read_buffer); |
| grpc_chttp2_goaway_parser_destroy(&goaway_parser); |
| |
| for (i = 0; i < STREAM_LIST_COUNT; i++) { |
| GPR_ASSERT(lists[i].head == nullptr); |
| GPR_ASSERT(lists[i].tail == nullptr); |
| } |
| |
| GPR_ASSERT(grpc_chttp2_stream_map_size(&stream_map) == 0); |
| |
| grpc_chttp2_stream_map_destroy(&stream_map); |
| |
| GRPC_COMBINER_UNREF(combiner, "chttp2_transport"); |
| |
| cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed")); |
| |
| while (write_cb_pool) { |
| grpc_chttp2_write_cb* next = write_cb_pool->next; |
| gpr_free(write_cb_pool); |
| write_cb_pool = next; |
| } |
| |
| gpr_free(ping_acks); |
| if (grpc_core::test_only_destruct_callback != nullptr) { |
| grpc_core::test_only_destruct_callback(); |
| } |
| } |
| |
| static const grpc_transport_vtable* get_vtable(void); |
| |
| static void read_channel_args(grpc_chttp2_transport* t, |
| const grpc_core::ChannelArgs& channel_args, |
| bool is_client) { |
| const int initial_sequence_number = |
| channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1); |
| if (initial_sequence_number > 0) { |
| if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) { |
| gpr_log(GPR_ERROR, "%s: low bit must be %d on %s", |
| GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1, |
| is_client ? "client" : "server"); |
| } else { |
| t->next_stream_id = static_cast<uint32_t>(initial_sequence_number); |
| } |
| } |
| |
| const int max_hpack_table_size = |
| channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1); |
| if (max_hpack_table_size >= 0) { |
| t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size); |
| } |
| |
| t->ping_policy.max_pings_without_data = |
| std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA) |
| .value_or(g_default_max_pings_without_data)); |
| t->ping_policy.max_ping_strikes = |
| std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES) |
| .value_or(g_default_max_ping_strikes)); |
| t->ping_policy.min_recv_ping_interval_without_data = |
| std::max(grpc_core::Duration::Zero(), |
| channel_args |
| .GetDurationFromIntMillis( |
| GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS) |
| .value_or(grpc_core::Duration::Milliseconds( |
| g_default_min_recv_ping_interval_without_data_ms))); |
| t->write_buffer_size = |
| std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE) |
| .value_or(grpc_core::chttp2::kDefaultWindow)); |
| t->keepalive_time = |
| std::max(grpc_core::Duration::Milliseconds(1), |
| channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS) |
| .value_or(grpc_core::Duration::Milliseconds( |
| t->is_client ? g_default_client_keepalive_time_ms |
| : g_default_server_keepalive_time_ms))); |
| t->keepalive_timeout = std::max( |
| grpc_core::Duration::Zero(), |
| channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS) |
| .value_or(grpc_core::Duration::Milliseconds( |
| t->is_client ? g_default_client_keepalive_timeout_ms |
| : g_default_server_keepalive_timeout_ms))); |
| t->keepalive_permit_without_calls = |
| channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS) |
| .value_or(false); |
| // Only send the prefered rx frame size http2 setting if we are instructed |
| // to auto size the buffers allocated at tcp level and we also can adjust |
| // sending frame size. |
| t->enable_preferred_rx_crypto_frame_advertisement = |
| channel_args |
| .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE) |
| .value_or(false); |
| |
| if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) |
| .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { |
| t->channelz_socket = |
| grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>( |
| std::string(grpc_endpoint_get_local_address(t->ep)), |
| std::string(t->peer_string.as_string_view()), |
| absl::StrFormat("%s %s", get_vtable()->name, |
| t->peer_string.as_string_view()), |
| channel_args |
| .GetObjectRef<grpc_core::channelz::SocketNode::Security>()); |
| } |
| |
| static const struct { |
| absl::string_view channel_arg_name; |
| grpc_chttp2_setting_id setting_id; |
| int default_value; |
| int min; |
| int max; |
| bool availability[2] /* server, client */; |
| } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS, |
| GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, |
| -1, |
| 0, |
| INT32_MAX, |
| {true, false}}, |
| {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER, |
| GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, |
| -1, |
| 0, |
| INT32_MAX, |
| {true, true}}, |
| {GRPC_ARG_MAX_METADATA_SIZE, |
| GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
| -1, |
| 0, |
| INT32_MAX, |
| {true, true}}, |
| {GRPC_ARG_HTTP2_MAX_FRAME_SIZE, |
| GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, |
| -1, |
| 16384, |
| 16777215, |
| {true, true}}, |
| {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY, |
| GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, |
| 1, |
| 0, |
| 1, |
| {true, true}}, |
| {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, |
| GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, |
| -1, |
| 5, |
| INT32_MAX, |
| {true, true}}}; |
| |
| for (size_t i = 0; i < GPR_ARRAY_SIZE(settings_map); i++) { |
| const auto& setting = settings_map[i]; |
| if (setting.availability[is_client]) { |
| const int value = channel_args.GetInt(setting.channel_arg_name) |
| .value_or(setting.default_value); |
| if (value >= 0) { |
| queue_setting_update(t, setting.setting_id, |
| grpc_core::Clamp(value, setting.min, setting.max)); |
| } |
| } else if (channel_args.Contains(setting.channel_arg_name)) { |
| gpr_log(GPR_DEBUG, "%s is not available on %s", |
| std::string(setting.channel_arg_name).c_str(), |
| is_client ? "clients" : "servers"); |
| } |
| } |
| |
| if (t->enable_preferred_rx_crypto_frame_advertisement) { |
| const grpc_chttp2_setting_parameters* sp = |
| &grpc_chttp2_settings_parameters |
| [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE]; |
| queue_setting_update( |
| t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE, |
| grpc_core::Clamp(INT_MAX, static_cast<int>(sp->min_value), |
| static_cast<int>(sp->max_value))); |
| } |
| } |
| |
| static void init_transport_keepalive_settings(grpc_chttp2_transport* t) { |
| if (t->is_client) { |
| t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX |
| ? grpc_core::Duration::Infinity() |
| : grpc_core::Duration::Milliseconds( |
| g_default_client_keepalive_time_ms); |
| t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX |
| ? grpc_core::Duration::Infinity() |
| : grpc_core::Duration::Milliseconds( |
| g_default_client_keepalive_timeout_ms); |
| t->keepalive_permit_without_calls = |
| g_default_client_keepalive_permit_without_calls; |
| } else { |
| t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX |
| ? grpc_core::Duration::Infinity() |
| : grpc_core::Duration::Milliseconds( |
| g_default_server_keepalive_time_ms); |
| t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX |
| ? grpc_core::Duration::Infinity() |
| : grpc_core::Duration::Milliseconds( |
| g_default_server_keepalive_timeout_ms); |
| t->keepalive_permit_without_calls = |
| g_default_server_keepalive_permit_without_calls; |
| } |
| } |
| |
| static void configure_transport_ping_policy(grpc_chttp2_transport* t) { |
| t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; |
| t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; |
| t->ping_policy.min_recv_ping_interval_without_data = |
| grpc_core::Duration::Milliseconds( |
| g_default_min_recv_ping_interval_without_data_ms); |
| } |
| |
| static void init_keepalive_pings_if_enabled_locked( |
| void* arg, GRPC_UNUSED grpc_error_handle error) { |
| GPR_DEBUG_ASSERT(error.ok()); |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| if (t->keepalive_time != grpc_core::Duration::Infinity()) { |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; |
| GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); |
| t->keepalive_ping_timer_handle = |
| t->event_engine->RunAfter(t->keepalive_time, [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| init_keepalive_ping(t); |
| }); |
| } else { |
| // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no |
| // inflight keepalive timers |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; |
| } |
| } |
| |
| grpc_chttp2_transport::grpc_chttp2_transport( |
| const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, |
| bool is_client) |
| : refs(1, GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount) |
| ? "chttp2_refcount" |
| : nullptr), |
| ep(ep), |
| peer_string( |
| grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))), |
| memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>() |
| ->memory_quota() |
| ->CreateMemoryOwner(absl::StrCat( |
| grpc_endpoint_get_peer(ep), ":client_transport"))), |
| self_reservation( |
| memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))), |
| combiner(grpc_combiner_create()), |
| state_tracker(is_client ? "client_transport" : "server_transport", |
| GRPC_CHANNEL_READY), |
| is_client(is_client), |
| next_stream_id(is_client ? 1 : 2), |
| flow_control( |
| peer_string.as_string_view(), |
| channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true), |
| &memory_owner), |
| deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0), |
| event_engine( |
| channel_args |
| .GetObjectRef<grpc_event_engine::experimental::EventEngine>()) { |
| GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == |
| GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); |
| base.vtable = get_vtable(); |
| // 8 is a random stab in the dark as to a good initial size: it's small enough |
| // that it shouldn't waste memory for infrequently used connections, yet |
| // large enough that the exponential growth should happen nicely when it's |
| // needed. |
| // TODO(ctiller): tune this |
| grpc_chttp2_stream_map_init(&stream_map, 8); |
| |
| grpc_slice_buffer_init(&read_buffer); |
| grpc_slice_buffer_init(&outbuf); |
| if (is_client) { |
| grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string( |
| GRPC_CHTTP2_CLIENT_CONNECT_STRING)); |
| } |
| grpc_slice_buffer_init(&qbuf); |
| // copy in initial settings to all setting sets |
| size_t i; |
| int j; |
| for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { |
| for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) { |
| settings[j][i] = grpc_chttp2_settings_parameters[i].default_value; |
| } |
| } |
| grpc_chttp2_goaway_parser_init(&goaway_parser); |
| |
| // configure http2 the way we like it |
| if (is_client) { |
| queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); |
| queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); |
| } |
| queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, |
| DEFAULT_MAX_HEADER_LIST_SIZE); |
| queue_setting_update(this, |
| GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); |
| |
| configure_transport_ping_policy(this); |
| init_transport_keepalive_settings(this); |
| |
| read_channel_args(this, channel_args, is_client); |
| |
| // No pings allowed before receiving a header or data frame. |
| ping_state.pings_before_data_required = 0; |
| ping_state.last_ping_sent_time = grpc_core::Timestamp::InfPast(); |
| |
| ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast(); |
| ping_recv_state.ping_strikes = 0; |
| |
| grpc_core::ExecCtx exec_ctx; |
| combiner->Run( |
| GRPC_CLOSURE_INIT(&init_keepalive_ping_locked, |
| init_keepalive_pings_if_enabled_locked, this, nullptr), |
| absl::OkStatus()); |
| |
| if (flow_control.bdp_probe()) { |
| bdp_ping_blocked = true; |
| grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this, |
| nullptr); |
| } |
| |
| grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); |
| post_benign_reclaimer(this); |
| if (grpc_core::test_only_init_callback != nullptr) { |
| grpc_core::test_only_init_callback(); |
| } |
| |
| #ifdef GRPC_POSIX_SOCKET_TCP |
| closure_barrier_may_cover_write = |
| grpc_event_engine_run_in_background() && |
| grpc_core::IsScheduleCancellationOverWriteEnabled() |
| ? 0 |
| : CLOSURE_BARRIER_MAY_COVER_WRITE; |
| #endif |
| } |
| |
| static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| t->destroying = 1; |
| close_transport_locked( |
| t, grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"), |
| grpc_core::StatusIntProperty::kOccurredDuringWrite, |
| t->write_state)); |
| t->memory_owner.Reset(); |
| // Must be the last line. |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "destroy"); |
| } |
| |
| static void destroy_transport(grpc_transport* gt) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| t->combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, t, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void close_transport_locked(grpc_chttp2_transport* t, |
| grpc_error_handle error) { |
| end_all_the_calls(t, error); |
| cancel_pings(t, error); |
| if (t->closed_with_error.ok()) { |
| if (!grpc_error_has_clear_grpc_status(error)) { |
| error = |
| grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE); |
| } |
| if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { |
| if (t->close_transport_on_writes_finished.ok()) { |
| t->close_transport_on_writes_finished = |
| GRPC_ERROR_CREATE("Delayed close due to in-progress write"); |
| } |
| t->close_transport_on_writes_finished = |
| grpc_error_add_child(t->close_transport_on_writes_finished, error); |
| return; |
| } |
| GPR_ASSERT(!error.ok()); |
| t->closed_with_error = error; |
| connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(), |
| "close_transport"); |
| if (t->ping_state.delayed_ping_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->ping_state.delayed_ping_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); |
| t->ping_state.delayed_ping_timer_handle.reset(); |
| } |
| } |
| if (t->next_bdp_ping_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->next_bdp_ping_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); |
| t->next_bdp_ping_timer_handle.reset(); |
| } |
| } |
| switch (t->keepalive_state) { |
| case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: |
| if (t->keepalive_ping_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); |
| t->keepalive_ping_timer_handle.reset(); |
| } |
| } |
| break; |
| case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: |
| if (t->keepalive_ping_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); |
| t->keepalive_ping_timer_handle.reset(); |
| } |
| } |
| if (t->keepalive_watchdog_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); |
| t->keepalive_watchdog_timer_handle.reset(); |
| } |
| } |
| break; |
| case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: |
| case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED: |
| // keepalive timers are not set in these two states |
| break; |
| } |
| |
| // flush writable stream list to avoid dangling references |
| grpc_chttp2_stream* s; |
| while (grpc_chttp2_list_pop_writable_stream(t, &s)) { |
| GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close"); |
| } |
| GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE); |
| grpc_endpoint_shutdown(t->ep, error); |
| } |
| if (t->notify_on_receive_settings != nullptr) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings, |
| error); |
| t->notify_on_receive_settings = nullptr; |
| } |
| if (t->notify_on_close != nullptr) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error); |
| t->notify_on_close = nullptr; |
| } |
| } |
| |
| #ifndef NDEBUG |
| void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) { |
| grpc_stream_ref(s->refcount, reason); |
| } |
| void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) { |
| grpc_stream_unref(s->refcount, reason); |
| } |
| #else |
| void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) { |
| grpc_stream_ref(s->refcount); |
| } |
| void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) { |
| grpc_stream_unref(s->refcount); |
| } |
| #endif |
| |
| grpc_chttp2_stream::Reffer::Reffer(grpc_chttp2_stream* s) { |
| // We reserve one 'active stream' that's dropped when the stream is |
| // read-closed. The others are for Chttp2IncomingByteStreams that are |
| // actively reading |
| GRPC_CHTTP2_STREAM_REF(s, "chttp2"); |
| GRPC_CHTTP2_REF_TRANSPORT(s->t, "stream"); |
| } |
| |
| grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t, |
| grpc_stream_refcount* refcount, |
| const void* server_data, |
| grpc_core::Arena* arena) |
| : t(t), |
| refcount(refcount), |
| reffer(this), |
| initial_metadata_buffer(arena), |
| trailing_metadata_buffer(arena), |
| flow_control(&t->flow_control) { |
| if (server_data) { |
| id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data)); |
| if (grpc_http_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "HTTP:%p/%p creating accept stream %d [from %p]", t, |
| this, id, server_data); |
| } |
| *t->accepting_stream = this; |
| grpc_chttp2_stream_map_add(&t->stream_map, id, this); |
| post_destructive_reclaimer(t); |
| } |
| |
| grpc_slice_buffer_init(&frame_storage); |
| grpc_slice_buffer_init(&flow_controlled_buffer); |
| } |
| |
| grpc_chttp2_stream::~grpc_chttp2_stream() { |
| grpc_chttp2_list_remove_stalled_by_stream(t, this); |
| grpc_chttp2_list_remove_stalled_by_transport(t, this); |
| |
| if (t->channelz_socket != nullptr) { |
| if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) { |
| t->channelz_socket->RecordStreamSucceeded(); |
| } else { |
| t->channelz_socket->RecordStreamFailed(); |
| } |
| } |
| |
| GPR_ASSERT((write_closed && read_closed) || id == 0); |
| if (id != 0) { |
| GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, id) == nullptr); |
| } |
| |
| grpc_slice_buffer_destroy(&frame_storage); |
| |
| for (int i = 0; i < STREAM_LIST_COUNT; i++) { |
| if (GPR_UNLIKELY(included.is_set(i))) { |
| grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d", |
| t->is_client ? "client" : "server", id, |
| i)); |
| } |
| } |
| |
| GPR_ASSERT(send_initial_metadata_finished == nullptr); |
| GPR_ASSERT(send_trailing_metadata_finished == nullptr); |
| GPR_ASSERT(recv_initial_metadata_ready == nullptr); |
| GPR_ASSERT(recv_message_ready == nullptr); |
| GPR_ASSERT(recv_trailing_metadata_finished == nullptr); |
| grpc_slice_buffer_destroy(&flow_controlled_buffer); |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus()); |
| } |
| |
| static int init_stream(grpc_transport* gt, grpc_stream* gs, |
| grpc_stream_refcount* refcount, const void* server_data, |
| grpc_core::Arena* arena) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| new (gs) grpc_chttp2_stream(t, refcount, server_data, arena); |
| return 0; |
| } |
| |
| static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) { |
| grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp); |
| s->~grpc_chttp2_stream(); |
| } |
| |
| static void destroy_stream(grpc_transport* gt, grpc_stream* gs, |
| grpc_closure* then_schedule_closure) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); |
| |
| s->destroy_stream_arg = then_schedule_closure; |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr), |
| absl::OkStatus()); |
| } |
| |
| grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t, |
| uint32_t id) { |
| if (t->accept_stream_cb == nullptr) { |
| return nullptr; |
| } |
| grpc_chttp2_stream* accepting = nullptr; |
| GPR_ASSERT(t->accepting_stream == nullptr); |
| t->accepting_stream = &accepting; |
| t->accept_stream_cb(t->accept_stream_cb_user_data, &t->base, |
| reinterpret_cast<void*>(id)); |
| t->accepting_stream = nullptr; |
| return accepting; |
| } |
| |
| // |
| // OUTPUT PROCESSING |
| // |
| |
| static const char* write_state_name(grpc_chttp2_write_state st) { |
| switch (st) { |
| case GRPC_CHTTP2_WRITE_STATE_IDLE: |
| return "IDLE"; |
| case GRPC_CHTTP2_WRITE_STATE_WRITING: |
| return "WRITING"; |
| case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: |
| return "WRITING+MORE"; |
| } |
| GPR_UNREACHABLE_CODE(return "UNKNOWN"); |
| } |
| |
| static void set_write_state(grpc_chttp2_transport* t, |
| grpc_chttp2_write_state st, const char* reason) { |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t, |
| t->is_client ? "CLIENT" : "SERVER", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| write_state_name(t->write_state), write_state_name(st), reason)); |
| t->write_state = st; |
| // If the state is being reset back to idle, it means a write was just |
| // finished. Make sure all the run_after_write closures are scheduled. |
| // |
| // This is also our chance to close the transport if the transport was marked |
| // to be closed after all writes finish (for example, if we received a go-away |
| // from peer while we had some pending writes) |
| if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) { |
| grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); |
| if (!t->close_transport_on_writes_finished.ok()) { |
| grpc_error_handle err = t->close_transport_on_writes_finished; |
| t->close_transport_on_writes_finished = absl::OkStatus(); |
| close_transport_locked(t, err); |
| } |
| } |
| } |
| |
| void grpc_chttp2_initiate_write(grpc_chttp2_transport* t, |
| grpc_chttp2_initiate_write_reason reason) { |
| switch (t->write_state) { |
| case GRPC_CHTTP2_WRITE_STATE_IDLE: |
| set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, |
| grpc_chttp2_initiate_write_reason_string(reason)); |
| GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
| // Note that the 'write_action_begin_locked' closure is being scheduled |
| // on the 'finally_scheduler' of t->combiner. This means that |
| // 'write_action_begin_locked' is called only *after* all the other |
| // closures (some of which are potentially initiating more writes on the |
| // transport) are executed on the t->combiner. |
| // |
| // The reason for scheduling on finally_scheduler is to make sure we batch |
| // as many writes as possible. 'write_action_begin_locked' is the function |
| // that gathers all the relevant bytes (which are at various places in the |
| // grpc_chttp2_transport structure) and append them to 'outbuf' field in |
| // grpc_chttp2_transport thereby batching what would have been potentially |
| // multiple write operations. |
| // |
| // Also, 'write_action_begin_locked' only gathers the bytes into outbuf. |
| // It does not call the endpoint to write the bytes. That is done by the |
| // 'write_action' (which is scheduled by 'write_action_begin_locked') |
| t->combiner->FinallyRun( |
| GRPC_CLOSURE_INIT(&t->write_action_begin_locked, |
| write_action_begin_locked, t, nullptr), |
| absl::OkStatus()); |
| break; |
| case GRPC_CHTTP2_WRITE_STATE_WRITING: |
| set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, |
| grpc_chttp2_initiate_write_reason_string(reason)); |
| break; |
| case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: |
| break; |
| } |
| } |
| |
| void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s) { |
| if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) { |
| GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); |
| } |
| } |
| |
| static const char* begin_writing_desc(bool partial) { |
| if (partial) { |
| return "begin partial write in background"; |
| } else { |
| return "begin write in current thread"; |
| } |
| } |
| |
| static void write_action_begin_locked(void* gt, |
| grpc_error_handle /*error_ignored*/) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); |
| GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE); |
| grpc_chttp2_begin_write_result r; |
| if (!t->closed_with_error.ok()) { |
| r.writing = false; |
| } else { |
| r = grpc_chttp2_begin_write(t); |
| } |
| if (r.writing) { |
| set_write_state(t, |
| r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE |
| : GRPC_CHTTP2_WRITE_STATE_WRITING, |
| begin_writing_desc(r.partial)); |
| write_action(t, absl::OkStatus()); |
| if (t->reading_paused_on_pending_induced_frames) { |
| GPR_ASSERT(t->num_pending_induced_frames == 0); |
| // We had paused reading, because we had many induced frames (SETTINGS |
| // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have |
| // been able to flush qbuf, we can resume reading. |
| GRPC_CHTTP2_IF_TRACING(gpr_log( |
| GPR_INFO, |
| "transport %p : Resuming reading after being paused due to too " |
| "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames", |
| t)); |
| t->reading_paused_on_pending_induced_frames = false; |
| continue_read_action_locked(t); |
| } |
| } else { |
| set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); |
| } |
| } |
| |
| static void write_action(void* gt, grpc_error_handle /*error*/) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(gt); |
| void* cl = t->cl; |
| t->cl = nullptr; |
| // Choose max_frame_size as the prefered rx crypto frame size indicated by the |
| // peer. |
| int max_frame_size = |
| t->settings |
| [GRPC_PEER_SETTINGS] |
| [GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE]; |
| // Note: max frame size is 0 if the remote peer does not support adjusting the |
| // sending frame size. |
| if (max_frame_size == 0) { |
| max_frame_size = INT_MAX; |
| } |
| grpc_endpoint_write( |
| t->ep, &t->outbuf, |
| GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end, t, |
| grpc_schedule_on_exec_ctx), |
| cl, max_frame_size); |
| } |
| |
| static void write_action_end(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->write_action_end_locked, |
| write_action_end_locked, t, nullptr), |
| error); |
| } |
| |
| // Callback from the grpc_endpoint after bytes have been written by calling |
| // sendmsg |
| static void write_action_end_locked(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| |
| bool closed = false; |
| if (!error.ok()) { |
| close_transport_locked(t, error); |
| closed = true; |
| } |
| |
| if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) { |
| t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT; |
| closed = true; |
| if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { |
| close_transport_locked(t, GRPC_ERROR_CREATE("goaway sent")); |
| } |
| } |
| |
| switch (t->write_state) { |
| case GRPC_CHTTP2_WRITE_STATE_IDLE: |
| GPR_UNREACHABLE_CODE(break); |
| case GRPC_CHTTP2_WRITE_STATE_WRITING: |
| set_write_state(t, GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing"); |
| break; |
| case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: |
| set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING, "continue writing"); |
| GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); |
| // If the transport is closed, we will retry writing on the endpoint |
| // and next write may contain part of the currently serialized frames. |
| // So, we should only call the run_after_write callbacks when the next |
| // write finishes, or the callbacks will be invoked when the stream is |
| // closed. |
| if (!closed) { |
| grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write); |
| } |
| t->combiner->FinallyRun( |
| GRPC_CLOSURE_INIT(&t->write_action_begin_locked, |
| write_action_begin_locked, t, nullptr), |
| absl::OkStatus()); |
| break; |
| } |
| |
| grpc_chttp2_end_write(t, error); |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "writing"); |
| } |
| |
| // Dirties an HTTP2 setting to be sent out next time a writing path occurs. |
| // If the change needs to occur immediately, manually initiate a write. |
| static void queue_setting_update(grpc_chttp2_transport* t, |
| grpc_chttp2_setting_id id, uint32_t value) { |
| const grpc_chttp2_setting_parameters* sp = |
| &grpc_chttp2_settings_parameters[id]; |
| uint32_t use_value = grpc_core::Clamp(value, sp->min_value, sp->max_value); |
| if (use_value != value) { |
| gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name, |
| value, use_value); |
| } |
| if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) { |
| t->settings[GRPC_LOCAL_SETTINGS][id] = use_value; |
| t->dirtied_local_settings = true; |
| } |
| } |
| |
| // Cancel out streams that haven't yet started if we have received a GOAWAY |
| static void cancel_unstarted_streams(grpc_chttp2_transport* t, |
| grpc_error_handle error) { |
| grpc_chttp2_stream* s; |
| while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { |
| s->trailing_metadata_buffer.Set( |
| grpc_core::GrpcStreamNetworkState(), |
| grpc_core::GrpcStreamNetworkState::kNotSentOnWire); |
| grpc_chttp2_cancel_stream(t, s, error); |
| } |
| } |
| |
| void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t, |
| uint32_t goaway_error, |
| uint32_t last_stream_id, |
| absl::string_view goaway_text) { |
| t->goaway_error = grpc_error_set_str( |
| grpc_error_set_int( |
| grpc_error_set_int( |
| grpc_core::StatusCreate( |
| absl::StatusCode::kUnavailable, |
| absl::StrFormat( |
| "GOAWAY received; Error code: %u; Debug Text: %s", |
| goaway_error, goaway_text), |
| DEBUG_LOCATION, {}), |
| grpc_core::StatusIntProperty::kHttp2Error, |
| static_cast<intptr_t>(goaway_error)), |
| grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE), |
| grpc_core::StatusStrProperty::kRawBytes, goaway_text); |
| |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t, |
| last_stream_id)); |
| // We want to log this irrespective of whether http tracing is enabled if we |
| // received a GOAWAY with a non NO_ERROR code. |
| if (goaway_error != GRPC_HTTP2_NO_ERROR) { |
| gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s", |
| std::string(t->peer_string.as_string_view()).c_str(), goaway_error, |
| grpc_core::StatusToString(t->goaway_error).c_str()); |
| } |
| if (t->is_client) { |
| cancel_unstarted_streams(t, t->goaway_error); |
| // Cancel all unseen streams |
| grpc_chttp2_stream_map_for_each( |
| &t->stream_map, |
| [](void* user_data, uint32_t /* key */, void* stream) { |
| uint32_t last_stream_id = *(static_cast<uint32_t*>(user_data)); |
| grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream); |
| if (s->id > last_stream_id) { |
| s->trailing_metadata_buffer.Set( |
| grpc_core::GrpcStreamNetworkState(), |
| grpc_core::GrpcStreamNetworkState::kNotSeenByServer); |
| grpc_chttp2_cancel_stream(s->t, s, s->t->goaway_error); |
| } |
| }, |
| &last_stream_id); |
| } |
| absl::Status status = grpc_error_to_absl_status(t->goaway_error); |
| // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug |
| // data equal to "too_many_pings", it should log the occurrence at a log level |
| // that is enabled by default and double the configured KEEPALIVE_TIME used |
| // for new connections on that channel. |
| if (GPR_UNLIKELY(t->is_client && |
| goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM && |
| goaway_text == "too_many_pings")) { |
| gpr_log(GPR_ERROR, |
| "%s: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug " |
| "data equal to \"too_many_pings\". Current keepalive time (before " |
| "throttling): %s", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| t->keepalive_time.ToString().c_str()); |
| constexpr int max_keepalive_time_millis = |
| INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER; |
| int64_t throttled_keepalive_time = |
| t->keepalive_time.millis() > max_keepalive_time_millis |
| ? INT_MAX |
| : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER; |
| status.SetPayload(grpc_core::kKeepaliveThrottlingKey, |
| absl::Cord(std::to_string(throttled_keepalive_time))); |
| } |
| // lie: use transient failure from the transport to indicate goaway has been |
| // received. |
| if (!grpc_core::test_only_disable_transient_failure_state_notification) { |
| connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status, |
| "got_goaway"); |
| } |
| } |
| |
| static void maybe_start_some_streams(grpc_chttp2_transport* t) { |
| grpc_chttp2_stream* s; |
| // maybe cancel out streams that haven't yet started if we have received a |
| // GOAWAY |
| if (!t->goaway_error.ok()) { |
| cancel_unstarted_streams(t, t->goaway_error); |
| return; |
| } |
| // start streams where we have free grpc_chttp2_stream ids and free |
| // * concurrency |
| while (t->next_stream_id <= MAX_CLIENT_STREAM_ID && |
| grpc_chttp2_stream_map_size(&t->stream_map) < |
| t->settings[GRPC_PEER_SETTINGS] |
| [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && |
| grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { |
| // safe since we can't (legally) be parsing this stream yet |
| GRPC_CHTTP2_IF_TRACING(gpr_log( |
| GPR_INFO, |
| "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d", |
| t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id)); |
| |
| GPR_ASSERT(s->id == 0); |
| s->id = t->next_stream_id; |
| t->next_stream_id += 2; |
| |
| if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { |
| connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, |
| absl::Status(absl::StatusCode::kUnavailable, |
| "Transport Stream IDs exhausted"), |
| "no_more_stream_ids"); |
| } |
| |
| grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); |
| post_destructive_reclaimer(t); |
| grpc_chttp2_mark_stream_writable(t, s); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); |
| } |
| // cancel out streams that will never be started |
| if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) { |
| while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) { |
| s->trailing_metadata_buffer.Set( |
| grpc_core::GrpcStreamNetworkState(), |
| grpc_core::GrpcStreamNetworkState::kNotSentOnWire); |
| grpc_chttp2_cancel_stream( |
| t, s, |
| grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE)); |
| } |
| } |
| } |
| |
| static grpc_closure* add_closure_barrier(grpc_closure* closure) { |
| closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT; |
| return closure; |
| } |
| |
| static void null_then_sched_closure(grpc_closure** closure) { |
| grpc_closure* c = *closure; |
| *closure = nullptr; |
| // null_then_schedule_closure might be run during a start_batch which might |
| // subsequently examine the batch for more operations contained within. |
| // However, the closure run might make it back to the call object, push a |
| // completion, have the application see it, and make a new operation on the |
| // call which recycles the batch BEFORE the call to start_batch completes, |
| // forcing a race. |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus()); |
| } |
| |
| void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s, |
| grpc_closure** pclosure, |
| grpc_error_handle error, |
| const char* desc) { |
| grpc_closure* closure = *pclosure; |
| *pclosure = nullptr; |
| if (closure == nullptr) { |
| return; |
| } |
| closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log( |
| GPR_INFO, |
| "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s " |
| "write_state=%s", |
| t, closure, |
| static_cast<int>(closure->next_data.scratch / |
| CLOSURE_BARRIER_FIRST_REF_BIT), |
| static_cast<int>(closure->next_data.scratch % |
| CLOSURE_BARRIER_FIRST_REF_BIT), |
| desc, grpc_core::StatusToString(error).c_str(), |
| write_state_name(t->write_state)); |
| } |
| |
| if (s->context != nullptr) { |
| MaybeRecordTransportAnnotation( |
| s, absl::StrFormat("on_complete: s=%p %p desc=%s err=%s", s, closure, |
| desc, grpc_core::StatusToString(error).c_str())); |
| } |
| |
| if (!error.ok()) { |
| grpc_error_handle cl_err = |
| grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error); |
| if (cl_err.ok()) { |
| cl_err = GRPC_ERROR_CREATE(absl::StrCat( |
| "Error in HTTP transport completing operation: ", desc, |
| " write_state=", write_state_name(t->write_state), " refs=", |
| closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT, " flags=", |
| closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT)); |
| cl_err = grpc_error_set_str(cl_err, |
| grpc_core::StatusStrProperty::kTargetAddress, |
| std::string(t->peer_string.as_string_view())); |
| } |
| cl_err = grpc_error_add_child(cl_err, error); |
| closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err); |
| } |
| if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { |
| if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || |
| !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { |
| // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running |
| // closures earlier than when it is safe to do so. |
| grpc_error_handle run_error = |
| grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error); |
| closure->error_data.error = 0; |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error); |
| } else { |
| grpc_closure_list_append(&t->run_after_write, closure); |
| } |
| } |
| } |
| |
| static bool contains_non_ok_status(grpc_metadata_batch* batch) { |
| return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) != |
| GRPC_STATUS_OK; |
| } |
| |
| static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, |
| bool is_client, bool is_initial) { |
| gpr_log(GPR_INFO, "--metadata--"); |
| const std::string prefix = absl::StrCat( |
| "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:"); |
| md_batch->Log([&prefix](absl::string_view key, absl::string_view value) { |
| gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str()); |
| }); |
| } |
| |
| static void perform_stream_op_locked(void* stream_op, |
| grpc_error_handle /*error_ignored*/) { |
| grpc_transport_stream_op_batch* op = |
| static_cast<grpc_transport_stream_op_batch*>(stream_op); |
| grpc_chttp2_stream* s = |
| static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg); |
| grpc_transport_stream_op_batch_payload* op_payload = op->payload; |
| grpc_chttp2_transport* t = s->t; |
| |
| s->context = op->payload->context; |
| s->traced = op->is_traced; |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log(GPR_INFO, |
| "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s, |
| op, grpc_transport_stream_op_batch_string(op, false).c_str(), |
| op->on_complete); |
| if (op->send_initial_metadata) { |
| log_metadata(op_payload->send_initial_metadata.send_initial_metadata, |
| s->id, t->is_client, true); |
| } |
| if (op->send_trailing_metadata) { |
| log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata, |
| s->id, t->is_client, false); |
| } |
| } |
| |
| if (s->context != nullptr) { |
| MaybeRecordTransportAnnotation( |
| s, absl::StrFormat( |
| "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s, |
| op, grpc_transport_stream_op_batch_string(op, true).c_str(), |
| op->on_complete)); |
| } |
| |
| grpc_closure* on_complete = op->on_complete; |
| // on_complete will be null if and only if there are no send ops in the batch. |
| if (on_complete != nullptr) { |
| // This batch has send ops. Use final_data as a barrier until enqueue time; |
| // the initial counter is dropped at the end of this function. |
| on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; |
| on_complete->error_data.error = 0; |
| } |
| |
| if (op->cancel_stream) { |
| grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); |
| } |
| |
| if (op->send_initial_metadata) { |
| if (t->is_client && t->channelz_socket != nullptr) { |
| t->channelz_socket->RecordStreamStartedFromLocal(); |
| } |
| GPR_ASSERT(s->send_initial_metadata_finished == nullptr); |
| on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
| |
| s->send_initial_metadata_finished = add_closure_barrier(on_complete); |
| s->send_initial_metadata = |
| op_payload->send_initial_metadata.send_initial_metadata; |
| if (t->is_client) { |
| s->deadline = std::min( |
| s->deadline, |
| s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata()) |
| .value_or(grpc_core::Timestamp::InfFuture())); |
| } |
| if (contains_non_ok_status(s->send_initial_metadata)) { |
| s->seen_error = true; |
| } |
| if (!s->write_closed) { |
| if (t->is_client) { |
| if (t->closed_with_error.ok()) { |
| GPR_ASSERT(s->id == 0); |
| grpc_chttp2_list_add_waiting_for_concurrency(t, s); |
| maybe_start_some_streams(t); |
| } else { |
| s->trailing_metadata_buffer.Set( |
| grpc_core::GrpcStreamNetworkState(), |
| grpc_core::GrpcStreamNetworkState::kNotSentOnWire); |
| grpc_chttp2_cancel_stream( |
| t, s, |
| grpc_error_set_int( |
| GRPC_ERROR_CREATE_REFERENCING("Transport closed", |
| &t->closed_with_error, 1), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE)); |
| } |
| } else { |
| GPR_ASSERT(s->id != 0); |
| grpc_chttp2_mark_stream_writable(t, s); |
| if (!(op->send_message && |
| (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) { |
| grpc_chttp2_initiate_write( |
| t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); |
| } |
| } |
| } else { |
| s->send_initial_metadata = nullptr; |
| grpc_chttp2_complete_closure_step( |
| t, s, &s->send_initial_metadata_finished, |
| GRPC_ERROR_CREATE_REFERENCING( |
| "Attempt to send initial metadata after stream was closed", |
| &s->write_closed_error, 1), |
| "send_initial_metadata_finished"); |
| } |
| } |
| |
| if (op->send_message) { |
| t->num_messages_in_next_write++; |
| grpc_core::global_stats().IncrementHttp2SendMessageSize( |
| op->payload->send_message.send_message->Length()); |
| on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
| s->send_message_finished = add_closure_barrier(op->on_complete); |
| const uint32_t flags = op_payload->send_message.flags; |
| if (s->write_closed) { |
| op->payload->send_message.stream_write_closed = true; |
| // We should NOT return an error here, so as to avoid a cancel OP being |
| // started. The surface layer will notice that the stream has been closed |
| // for writes and fail the send message op. |
| grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, |
| absl::OkStatus(), |
| "fetching_send_message_finished"); |
| } else { |
| uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( |
| &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); |
| frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; |
| size_t len = op_payload->send_message.send_message->Length(); |
| frame_hdr[1] = static_cast<uint8_t>(len >> 24); |
| frame_hdr[2] = static_cast<uint8_t>(len >> 16); |
| frame_hdr[3] = static_cast<uint8_t>(len >> 8); |
| frame_hdr[4] = static_cast<uint8_t>(len); |
| |
| s->next_message_end_offset = |
| s->flow_controlled_bytes_written + |
| static_cast<int64_t>(s->flow_controlled_buffer.length) + |
| static_cast<int64_t>(len); |
| if (flags & GRPC_WRITE_BUFFER_HINT) { |
| s->next_message_end_offset -= t->write_buffer_size; |
| s->write_buffering = true; |
| } else { |
| s->write_buffering = false; |
| } |
| |
| grpc_slice* const slices = |
| op_payload->send_message.send_message->c_slice_buffer()->slices; |
| grpc_slice* const end = |
| slices + op_payload->send_message.send_message->Count(); |
| for (grpc_slice* slice = slices; slice != end; slice++) { |
| grpc_slice_buffer_add(&s->flow_controlled_buffer, |
| grpc_core::CSliceRef(*slice)); |
| } |
| |
| int64_t notify_offset = s->next_message_end_offset; |
| if (notify_offset <= s->flow_controlled_bytes_written) { |
| grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, |
| absl::OkStatus(), |
| "fetching_send_message_finished"); |
| } else { |
| grpc_chttp2_write_cb* cb = t->write_cb_pool; |
| if (cb == nullptr) { |
| cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb))); |
| } else { |
| t->write_cb_pool = cb->next; |
| } |
| cb->call_at_byte = notify_offset; |
| cb->closure = s->send_message_finished; |
| s->send_message_finished = nullptr; |
| grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH |
| ? &s->on_write_finished_cbs |
| : &s->on_flow_controlled_cbs; |
| cb->next = *list; |
| *list = cb; |
| } |
| |
| if (s->id != 0 && |
| (!s->write_buffering || |
| s->flow_controlled_buffer.length > t->write_buffer_size)) { |
| grpc_chttp2_mark_stream_writable(t, s); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); |
| } |
| } |
| } |
| |
| if (op->send_trailing_metadata) { |
| GPR_ASSERT(s->send_trailing_metadata_finished == nullptr); |
| on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
| s->send_trailing_metadata_finished = add_closure_barrier(on_complete); |
| s->send_trailing_metadata = |
| op_payload->send_trailing_metadata.send_trailing_metadata; |
| s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent; |
| s->write_buffering = false; |
| if (contains_non_ok_status(s->send_trailing_metadata)) { |
| s->seen_error = true; |
| } |
| if (s->write_closed) { |
| s->send_trailing_metadata = nullptr; |
| s->sent_trailing_metadata_op = nullptr; |
| grpc_chttp2_complete_closure_step( |
| t, s, &s->send_trailing_metadata_finished, |
| op->payload->send_trailing_metadata.send_trailing_metadata->empty() |
| ? absl::OkStatus() |
| : GRPC_ERROR_CREATE("Attempt to send trailing metadata after " |
| "stream was closed"), |
| "send_trailing_metadata_finished"); |
| } else if (s->id != 0) { |
| // TODO(ctiller): check if there's flow control for any outstanding |
| // bytes before going writable |
| grpc_chttp2_mark_stream_writable(t, s); |
| grpc_chttp2_initiate_write( |
| t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); |
| } |
| } |
| |
| if (op->recv_initial_metadata) { |
| GPR_ASSERT(s->recv_initial_metadata_ready == nullptr); |
| s->recv_initial_metadata_ready = |
| op_payload->recv_initial_metadata.recv_initial_metadata_ready; |
| s->recv_initial_metadata = |
| op_payload->recv_initial_metadata.recv_initial_metadata; |
| s->trailing_metadata_available = |
| op_payload->recv_initial_metadata.trailing_metadata_available; |
| if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) { |
| *s->trailing_metadata_available = true; |
| } |
| grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); |
| } |
| |
| if (op->recv_message) { |
| GPR_ASSERT(s->recv_message_ready == nullptr); |
| s->recv_message_ready = op_payload->recv_message.recv_message_ready; |
| s->recv_message = op_payload->recv_message.recv_message; |
| s->recv_message->emplace(); |
| s->recv_message_flags = op_payload->recv_message.flags; |
| s->call_failed_before_recv_message = |
| op_payload->recv_message.call_failed_before_recv_message; |
| grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
| } |
| |
| if (op->recv_trailing_metadata) { |
| GPR_ASSERT(s->collecting_stats == nullptr); |
| s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats; |
| GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); |
| s->recv_trailing_metadata_finished = |
| op_payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
| s->recv_trailing_metadata = |
| op_payload->recv_trailing_metadata.recv_trailing_metadata; |
| s->final_metadata_requested = true; |
| grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
| } |
| |
| if (on_complete != nullptr) { |
| grpc_chttp2_complete_closure_step(t, s, &on_complete, absl::OkStatus(), |
| "op->on_complete"); |
| } |
| |
| GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op"); |
| } |
| |
| static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, |
| grpc_transport_stream_op_batch* op) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs); |
| |
| if (!t->is_client) { |
| if (op->send_initial_metadata) { |
| GPR_ASSERT(!op->payload->send_initial_metadata.send_initial_metadata |
| ->get(grpc_core::GrpcTimeoutMetadata()) |
| .has_value()); |
| } |
| if (op->send_trailing_metadata) { |
| GPR_ASSERT(!op->payload->send_trailing_metadata.send_trailing_metadata |
| ->get(grpc_core::GrpcTimeoutMetadata()) |
| .has_value()); |
| } |
| } |
| |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op, |
| grpc_transport_stream_op_batch_string(op, false).c_str()); |
| } |
| |
| GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); |
| op->handler_private.extra_arg = gs; |
| t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, |
| perform_stream_op_locked, op, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) { |
| // callback remaining pings: they're not allowed to call into the transport, |
| // and maybe they hold resources that need to be freed |
| grpc_chttp2_ping_queue* pq = &t->ping_queue; |
| GPR_ASSERT(!error.ok()); |
| for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { |
| grpc_closure_list_fail_all(&pq->lists[j], error); |
| grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &pq->lists[j]); |
| } |
| } |
| |
| static void send_ping_locked(grpc_chttp2_transport* t, |
| grpc_closure* on_initiate, grpc_closure* on_ack) { |
| if (!t->closed_with_error.ok()) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error); |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error); |
| return; |
| } |
| grpc_chttp2_ping_queue* pq = &t->ping_queue; |
| grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, |
| absl::OkStatus()); |
| grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, |
| absl::OkStatus()); |
| } |
| |
| // Specialized form of send_ping_locked for keepalive ping. If there is already |
| // a ping in progress, the keepalive ping would piggyback onto that ping, |
| // instead of waiting for that ping to complete and then starting a new ping. |
| static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { |
| if (!t->closed_with_error.ok()) { |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, |
| start_keepalive_ping_locked, t, nullptr), |
| t->closed_with_error); |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, |
| finish_keepalive_ping_locked, t, nullptr), |
| t->closed_with_error); |
| return; |
| } |
| grpc_chttp2_ping_queue* pq = &t->ping_queue; |
| if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { |
| // There is a ping in flight. Add yourself to the inflight closure list. |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, |
| start_keepalive_ping_locked, t, nullptr), |
| t->closed_with_error); |
| grpc_closure_list_append( |
| &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], |
| GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, |
| finish_keepalive_ping, t, grpc_schedule_on_exec_ctx), |
| absl::OkStatus()); |
| return; |
| } |
| grpc_closure_list_append( |
| &pq->lists[GRPC_CHTTP2_PCL_INITIATE], |
| GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, start_keepalive_ping, |
| t, grpc_schedule_on_exec_ctx), |
| absl::OkStatus()); |
| grpc_closure_list_append( |
| &pq->lists[GRPC_CHTTP2_PCL_NEXT], |
| GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, finish_keepalive_ping, |
| t, grpc_schedule_on_exec_ctx), |
| absl::OkStatus()); |
| } |
| |
| void grpc_chttp2_retry_initiate_ping(grpc_chttp2_transport* t) { |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->retry_initiate_ping_locked, |
| retry_initiate_ping_locked, t, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void retry_initiate_ping_locked(void* tp, |
| GRPC_UNUSED grpc_error_handle error) { |
| GPR_DEBUG_ASSERT(error.ok()); |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| GPR_ASSERT(t->ping_state.delayed_ping_timer_handle.has_value()); |
| t->ping_state.delayed_ping_timer_handle.reset(); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "retry_initiate_ping_locked"); |
| } |
| |
| void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { |
| grpc_chttp2_ping_queue* pq = &t->ping_queue; |
| if (pq->inflight_id != id) { |
| gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, |
| std::string(t->peer_string.as_string_view()).c_str(), id); |
| return; |
| } |
| grpc_core::ExecCtx::RunList(DEBUG_LOCATION, |
| &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); |
| if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); |
| } |
| } |
| |
| namespace { |
| |
| // Fire and forget (deletes itself on completion). Does a graceful shutdown by |
| // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping |
| // and waiting for an ack (effective waiting for an RTT) and then sending a |
| // final GOAWAY freame with an updated last stream identifier. This helps ensure |
| // that a connection can be cleanly shut down without losing requests. |
| // In the event, that the client does not respond to the ping for some reason, |
| // we add a 20 second deadline, after which we send the second goaway. |
| class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> { |
| public: |
| static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); } |
| |
| ~GracefulGoaway() override { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t_, "graceful goaway"); |
| } |
| |
| private: |
| explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t) { |
| t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY; |
| GRPC_CHTTP2_REF_TRANSPORT(t_, "graceful goaway"); |
| grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf); |
| send_ping_locked( |
| t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr)); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); |
| Ref().release(); // Ref for the timer |
| grpc_timer_init( |
| &timer_, grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(20), |
| GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr)); |
| } |
| |
| void MaybeSendFinalGoawayLocked() { |
| if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) { |
| // We already sent the final GOAWAY. |
| return; |
| } |
| if (t_->destroying || !t_->closed_with_error.ok()) { |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, |
| "transport:%p %s peer:%s Transport already shutting down. " |
| "Graceful GOAWAY abandoned.", |
| t_, t_->is_client ? "CLIENT" : "SERVER", |
| std::string(t_->peer_string.as_string_view()).c_str())); |
| return; |
| } |
| // Ping completed. Send final goaway. |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, |
| "transport:%p %s peer:%s Graceful shutdown: Ping received. " |
| "Sending final GOAWAY with stream_id:%d", |
| t_, t_->is_client ? "CLIENT" : "SERVER", |
| std::string(t_->peer_string.as_string_view()).c_str(), |
| t_->last_new_stream_id)); |
| t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED; |
| grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(), |
| &t_->qbuf); |
| grpc_chttp2_initiate_write(t_, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); |
| } |
| |
| static void OnPingAck(void* arg, grpc_error_handle /* error */) { |
| auto* self = static_cast<GracefulGoaway*>(arg); |
| self->t_->combiner->Run( |
| GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) { |
| auto* self = static_cast<GracefulGoaway*>(arg); |
| grpc_timer_cancel(&self->timer_); |
| self->MaybeSendFinalGoawayLocked(); |
| self->Unref(); |
| } |
| |
| static void OnTimer(void* arg, grpc_error_handle error) { |
| auto* self = static_cast<GracefulGoaway*>(arg); |
| if (!error.ok()) { |
| self->Unref(); |
| return; |
| } |
| self->t_->combiner->Run( |
| GRPC_CLOSURE_INIT(&self->on_timer_, OnTimerLocked, self, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void OnTimerLocked(void* arg, grpc_error_handle /* error */) { |
| auto* self = static_cast<GracefulGoaway*>(arg); |
| self->MaybeSendFinalGoawayLocked(); |
| self->Unref(); |
| } |
| |
| grpc_chttp2_transport* t_; |
| grpc_closure on_ping_ack_; |
| grpc_timer timer_; |
| grpc_closure on_timer_; |
| }; |
| |
| } // namespace |
| |
| static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error, |
| bool immediate_disconnect_hint) { |
| grpc_http2_error_code http_error; |
| std::string message; |
| grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr, |
| &message, &http_error, nullptr); |
| if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR && |
| !immediate_disconnect_hint) { |
| // Do a graceful shutdown. |
| if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) { |
| GracefulGoaway::Start(t); |
| } else { |
| // Graceful GOAWAY is already in progress. |
| } |
| } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND || |
| t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) { |
| // We want to log this irrespective of whether http tracing is enabled |
| gpr_log(GPR_DEBUG, "%s: Sending goaway err=%s", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| grpc_core::StatusToString(error).c_str()); |
| t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED; |
| grpc_chttp2_goaway_append( |
| t->last_new_stream_id, static_cast<uint32_t>(http_error), |
| grpc_slice_from_cpp_string(std::move(message)), &t->qbuf); |
| } else { |
| // Final GOAWAY has already been sent. |
| } |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); |
| } |
| |
| void grpc_chttp2_add_ping_strike(grpc_chttp2_transport* t) { |
| if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && |
| t->ping_policy.max_ping_strikes != 0) { |
| send_goaway(t, |
| grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"), |
| grpc_core::StatusIntProperty::kHttp2Error, |
| GRPC_HTTP2_ENHANCE_YOUR_CALM), |
| /*immediate_disconnect_hint=*/true); |
| // The transport will be closed after the write is done |
| close_transport_locked( |
| t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE)); |
| } |
| } |
| |
| void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) { |
| if (!t->is_client) { |
| t->ping_recv_state.last_ping_recv_time = grpc_core::Timestamp::InfPast(); |
| t->ping_recv_state.ping_strikes = 0; |
| } |
| t->ping_state.pings_before_data_required = |
| t->ping_policy.max_pings_without_data; |
| } |
| |
| static void perform_transport_op_locked(void* stream_op, |
| grpc_error_handle /*error_ignored*/) { |
| grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op); |
| grpc_chttp2_transport* t = |
| static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg); |
| |
| if (!op->goaway_error.ok()) { |
| send_goaway(t, op->goaway_error, /*immediate_disconnect_hint=*/false); |
| } |
| |
| if (op->set_accept_stream) { |
| t->accept_stream_cb = op->set_accept_stream_fn; |
| t->accept_stream_cb_user_data = op->set_accept_stream_user_data; |
| } |
| |
| if (op->bind_pollset) { |
| grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset); |
| } |
| |
| if (op->bind_pollset_set) { |
| grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set); |
| } |
| |
| if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { |
| send_ping_locked(t, op->send_ping.on_initiate, op->send_ping.on_ack); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); |
| } |
| |
| if (op->start_connectivity_watch != nullptr) { |
| t->state_tracker.AddWatcher(op->start_connectivity_watch_state, |
| std::move(op->start_connectivity_watch)); |
| } |
| if (op->stop_connectivity_watch != nullptr) { |
| t->state_tracker.RemoveWatcher(op->stop_connectivity_watch); |
| } |
| |
| if (!op->disconnect_with_error.ok()) { |
| send_goaway(t, op->disconnect_with_error, |
| /*immediate_disconnect_hint=*/true); |
| close_transport_locked(t, op->disconnect_with_error); |
| } |
| |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus()); |
| |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); |
| } |
| |
| static void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", t, |
| grpc_transport_op_string(op).c_str()); |
| } |
| op->handler_private.extra_arg = gt; |
| GRPC_CHTTP2_REF_TRANSPORT(t, "transport_op"); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure, |
| perform_transport_op_locked, op, nullptr), |
| absl::OkStatus()); |
| } |
| |
| // |
| // INPUT PROCESSING - GENERAL |
| // |
| |
| void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s) { |
| if (s->recv_initial_metadata_ready != nullptr && |
| s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { |
| if (s->seen_error) { |
| grpc_slice_buffer_reset_and_unref(&s->frame_storage); |
| } |
| *s->recv_initial_metadata = std::move(s->initial_metadata_buffer); |
| s->recv_initial_metadata->Set(grpc_core::PeerString(), |
| t->peer_string.Ref()); |
| // If we didn't receive initial metadata from the wire and instead faked a |
| // status (due to stream cancellations for example), let upper layers know |
| // that trailing metadata is immediately available. |
| if (s->trailing_metadata_available != nullptr && |
| s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE && |
| s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) { |
| *s->trailing_metadata_available = true; |
| s->trailing_metadata_available = nullptr; |
| } |
| null_then_sched_closure(&s->recv_initial_metadata_ready); |
| } |
| } |
| |
| void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s) { |
| if (s->recv_message_ready == nullptr) return; |
| |
| grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd( |
| &s->flow_control); |
| grpc_error_handle error; |
| |
| // Lambda is immediately invoked as a big scoped section that can be |
| // exited out of at any point by returning. |
| [&]() { |
| if (grpc_http_trace.enabled()) { |
| gpr_log(GPR_DEBUG, |
| "maybe_complete_recv_message %p final_metadata_requested=%d " |
| "seen_error=%d", |
| s, s->final_metadata_requested, s->seen_error); |
| } |
| if (s->final_metadata_requested && s->seen_error) { |
| grpc_slice_buffer_reset_and_unref(&s->frame_storage); |
| s->recv_message->reset(); |
| } else { |
| if (s->frame_storage.length != 0) { |
| while (true) { |
| GPR_ASSERT(s->frame_storage.length > 0); |
| int64_t min_progress_size; |
| auto r = grpc_deframe_unprocessed_incoming_frames( |
| s, &min_progress_size, &**s->recv_message, s->recv_message_flags); |
| if (grpc_http_trace.enabled()) { |
| gpr_log(GPR_DEBUG, "Deframe data frame: %s", |
| grpc_core::PollToString(r, [](absl::Status r) { |
| return r.ToString(); |
| }).c_str()); |
| } |
| if (r.pending()) { |
| if (s->read_closed) { |
| grpc_slice_buffer_reset_and_unref(&s->frame_storage); |
| s->recv_message->reset(); |
| break; |
| } else { |
| upd.SetMinProgressSize(min_progress_size); |
| return; // Out of lambda to enclosing function |
| } |
| } else { |
| error = std::move(r.value()); |
| if (!error.ok()) { |
| s->seen_error = true; |
| grpc_slice_buffer_reset_and_unref(&s->frame_storage); |
| break; |
| } else { |
| if (t->channelz_socket != nullptr) { |
| t->channelz_socket->RecordMessageReceived(); |
| } |
| break; |
| } |
| } |
| } |
| } else if (s->read_closed) { |
| s->recv_message->reset(); |
| } else { |
| upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES); |
| return; // Out of lambda to enclosing function |
| } |
| } |
| // save the length of the buffer before handing control back to application |
| // threads. Needed to support correct flow control bookkeeping |
| if (error.ok() && s->recv_message->has_value()) { |
| null_then_sched_closure(&s->recv_message_ready); |
| } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { |
| if (s->call_failed_before_recv_message != nullptr) { |
| *s->call_failed_before_recv_message = |
| (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE); |
| } |
| null_then_sched_closure(&s->recv_message_ready); |
| } |
| }(); |
| |
| upd.SetPendingSize(s->frame_storage.length); |
| grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s); |
| } |
| |
| void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s) { |
| grpc_chttp2_maybe_complete_recv_message(t, s); |
| if (grpc_http_trace.enabled()) { |
| gpr_log(GPR_DEBUG, |
| "maybe_complete_recv_trailing_metadata cli=%d s=%p closure=%p " |
| "read_closed=%d " |
| "write_closed=%d %" PRIdPTR, |
| t->is_client, s, s->recv_trailing_metadata_finished, s->read_closed, |
| s->write_closed, s->frame_storage.length); |
| } |
| if (s->recv_trailing_metadata_finished != nullptr && s->read_closed && |
| s->write_closed) { |
| if (s->seen_error || !t->is_client) { |
| grpc_slice_buffer_reset_and_unref(&s->frame_storage); |
| } |
| if (s->read_closed && s->frame_storage.length == 0 && |
| s->recv_trailing_metadata_finished != nullptr) { |
| grpc_transport_move_stats(&s->stats, s->collecting_stats); |
| s->collecting_stats = nullptr; |
| *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer); |
| null_then_sched_closure(&s->recv_trailing_metadata_finished); |
| } |
| } |
| } |
| |
| static void remove_stream(grpc_chttp2_transport* t, uint32_t id, |
| grpc_error_handle error) { |
| grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>( |
| grpc_chttp2_stream_map_delete(&t->stream_map, id)); |
| GPR_DEBUG_ASSERT(s); |
| if (t->incoming_stream == s) { |
| t->incoming_stream = nullptr; |
| grpc_chttp2_parsing_become_skip_parser(t); |
| } |
| |
| if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { |
| post_benign_reclaimer(t); |
| if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) { |
| close_transport_locked( |
| t, GRPC_ERROR_CREATE_REFERENCING( |
| "Last stream closed after sending GOAWAY", &error, 1)); |
| } |
| } |
| if (grpc_chttp2_list_remove_writable_stream(t, s)) { |
| GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream"); |
| } |
| grpc_chttp2_list_remove_stalled_by_stream(t, s); |
| grpc_chttp2_list_remove_stalled_by_transport(t, s); |
| |
| maybe_start_some_streams(t); |
| } |
| |
| void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s, |
| grpc_error_handle due_to_error) { |
| if (!t->is_client && !s->sent_trailing_metadata && |
| grpc_error_has_clear_grpc_status(due_to_error)) { |
| close_from_api(t, s, due_to_error); |
| return; |
| } |
| |
| if (!s->read_closed || !s->write_closed) { |
| if (s->id != 0) { |
| grpc_http2_error_code http_error; |
| grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr, |
| &http_error, nullptr); |
| grpc_chttp2_add_rst_stream_to_next_write( |
| t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); |
| } |
| } |
| if (!due_to_error.ok() && !s->seen_error) { |
| s->seen_error = true; |
| } |
| grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error); |
| } |
| |
| void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s, |
| grpc_error_handle error) { |
| grpc_status_code status; |
| std::string message; |
| grpc_error_get_status(error, s->deadline, &status, &message, nullptr, |
| nullptr); |
| if (status != GRPC_STATUS_OK) { |
| s->seen_error = true; |
| } |
| // stream_global->recv_trailing_metadata_finished gives us a |
| // last chance replacement: we've received trailing metadata, |
| // but something more important has become available to signal |
| // to the upper layers - drop what we've got, and then publish |
| // what we want - which is safe because we haven't told anyone |
| // about the metadata yet |
| if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED || |
| s->recv_trailing_metadata_finished != nullptr || |
| !s->final_metadata_requested) { |
| s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status); |
| if (!message.empty()) { |
| s->trailing_metadata_buffer.Set( |
| grpc_core::GrpcMessageMetadata(), |
| grpc_core::Slice::FromCopiedBuffer(message)); |
| } |
| s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; |
| grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
| } |
| } |
| |
| static void add_error(grpc_error_handle error, grpc_error_handle* refs, |
| size_t* nrefs) { |
| if (error.ok()) return; |
| for (size_t i = 0; i < *nrefs; i++) { |
| if (error == refs[i]) { |
| return; |
| } |
| } |
| refs[*nrefs] = error; |
| ++*nrefs; |
| } |
| |
| static grpc_error_handle removal_error(grpc_error_handle extra_error, |
| grpc_chttp2_stream* s, |
| const char* main_error_msg) { |
| grpc_error_handle refs[3]; |
| size_t nrefs = 0; |
| add_error(s->read_closed_error, refs, &nrefs); |
| add_error(s->write_closed_error, refs, &nrefs); |
| add_error(extra_error, refs, &nrefs); |
| grpc_error_handle error; |
| if (nrefs > 0) { |
| error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs); |
| } |
| return error; |
| } |
| |
| static void flush_write_list(grpc_chttp2_transport* t, grpc_chttp2_stream* s, |
| grpc_chttp2_write_cb** list, |
| grpc_error_handle error) { |
| while (*list) { |
| grpc_chttp2_write_cb* cb = *list; |
| *list = cb->next; |
| grpc_chttp2_complete_closure_step(t, s, &cb->closure, error, |
| "on_write_finished_cb"); |
| cb->next = t->write_cb_pool; |
| t->write_cb_pool = cb; |
| } |
| } |
| |
| void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s, |
| grpc_error_handle error) { |
| error = |
| removal_error(error, s, "Pending writes failed due to stream closure"); |
| s->send_initial_metadata = nullptr; |
| grpc_chttp2_complete_closure_step(t, s, &s->send_initial_metadata_finished, |
| error, "send_initial_metadata_finished"); |
| |
| s->send_trailing_metadata = nullptr; |
| s->sent_trailing_metadata_op = nullptr; |
| grpc_chttp2_complete_closure_step(t, s, &s->send_trailing_metadata_finished, |
| error, "send_trailing_metadata_finished"); |
| |
| grpc_chttp2_complete_closure_step(t, s, &s->send_message_finished, error, |
| "fetching_send_message_finished"); |
| flush_write_list(t, s, &s->on_write_finished_cbs, error); |
| flush_write_list(t, s, &s->on_flow_controlled_cbs, error); |
| } |
| |
| void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t, |
| grpc_chttp2_stream* s, int close_reads, |
| int close_writes, grpc_error_handle error) { |
| if (grpc_http_trace.enabled()) { |
| gpr_log( |
| GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id, |
| (close_reads && close_writes) |
| ? "read+write" |
| : (close_reads ? "read" : (close_writes ? "write" : "nothing??")), |
| error.ToString().c_str()); |
| } |
| if (s->read_closed && s->write_closed) { |
| // already closed, but we should still fake the status if needed. |
| grpc_error_handle overall_error = removal_error(error, s, "Stream removed"); |
| if (!overall_error.ok()) { |
| grpc_chttp2_fake_status(t, s, overall_error); |
| } |
| grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
| return; |
| } |
| bool closed_read = false; |
| bool became_closed = false; |
| if (close_reads && !s->read_closed) { |
| s->read_closed_error = error; |
| s->read_closed = true; |
| closed_read = true; |
| } |
| if (close_writes && !s->write_closed) { |
| s->write_closed_error = error; |
| s->write_closed = true; |
| grpc_chttp2_fail_pending_writes(t, s, error); |
| } |
| if (s->read_closed && s->write_closed) { |
| became_closed = true; |
| grpc_error_handle overall_error = removal_error(error, s, "Stream removed"); |
| if (s->id != 0) { |
| remove_stream(t, s->id, overall_error); |
| } else { |
| // Purge streams waiting on concurrency still waiting for id assignment |
| grpc_chttp2_list_remove_waiting_for_concurrency(t, s); |
| } |
| if (!overall_error.ok()) { |
| grpc_chttp2_fake_status(t, s, overall_error); |
| } |
| } |
| if (closed_read) { |
| for (int i = 0; i < 2; i++) { |
| if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) { |
| s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE; |
| } |
| } |
| grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); |
| grpc_chttp2_maybe_complete_recv_message(t, s); |
| } |
| if (became_closed) { |
| s->stats.latency = |
| gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time); |
| grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
| GRPC_CHTTP2_STREAM_UNREF(s, "chttp2"); |
| } |
| } |
| |
| static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s, |
| grpc_error_handle error) { |
| grpc_slice hdr; |
| grpc_slice status_hdr; |
| grpc_slice http_status_hdr; |
| grpc_slice content_type_hdr; |
| grpc_slice message_pfx; |
| uint8_t* p; |
| uint32_t len = 0; |
| grpc_status_code grpc_status; |
| std::string message; |
| grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr, |
| nullptr); |
| |
| GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); |
| |
| // Hand roll a header block. |
| // This is unnecessarily ugly - at some point we should find a more |
| // elegant solution. |
| // It's complicated by the fact that our send machinery would be dead by |
| // the time we got around to sending this, so instead we ignore HPACK |
| // compression and just write the uncompressed bytes onto the wire. |
| if (!s->sent_initial_metadata) { |
| http_status_hdr = GRPC_SLICE_MALLOC(13); |
| p = GRPC_SLICE_START_PTR(http_status_hdr); |
| *p++ = 0x00; |
| *p++ = 7; |
| *p++ = ':'; |
| *p++ = 's'; |
| *p++ = 't'; |
| *p++ = 'a'; |
| *p++ = 't'; |
| *p++ = 'u'; |
| *p++ = 's'; |
| *p++ = 3; |
| *p++ = '2'; |
| *p++ = '0'; |
| *p++ = '0'; |
| GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); |
| len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr); |
| |
| content_type_hdr = GRPC_SLICE_MALLOC(31); |
| p = GRPC_SLICE_START_PTR(content_type_hdr); |
| *p++ = 0x00; |
| *p++ = 12; |
| *p++ = 'c'; |
| *p++ = 'o'; |
| *p++ = 'n'; |
| *p++ = 't'; |
| *p++ = 'e'; |
| *p++ = 'n'; |
| *p++ = 't'; |
| *p++ = '-'; |
| *p++ = 't'; |
| *p++ = 'y'; |
| *p++ = 'p'; |
| *p++ = 'e'; |
| *p++ = 16; |
| *p++ = 'a'; |
| *p++ = 'p'; |
| *p++ = 'p'; |
| *p++ = 'l'; |
| *p++ = 'i'; |
| *p++ = 'c'; |
| *p++ = 'a'; |
| *p++ = 't'; |
| *p++ = 'i'; |
| *p++ = 'o'; |
| *p++ = 'n'; |
| *p++ = '/'; |
| *p++ = 'g'; |
| *p++ = 'r'; |
| *p++ = 'p'; |
| *p++ = 'c'; |
| GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr)); |
| len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr); |
| } |
| |
| status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10)); |
| p = GRPC_SLICE_START_PTR(status_hdr); |
| *p++ = 0x00; // literal header, not indexed |
| *p++ = 11; // len(grpc-status) |
| *p++ = 'g'; |
| *p++ = 'r'; |
| *p++ = 'p'; |
| *p++ = 'c'; |
| *p++ = '-'; |
| *p++ = 's'; |
| *p++ = 't'; |
| *p++ = 'a'; |
| *p++ = 't'; |
| *p++ = 'u'; |
| *p++ = 's'; |
| if (grpc_status < 10) { |
| *p++ = 1; |
| *p++ = static_cast<uint8_t>('0' + grpc_status); |
| } else { |
| *p++ = 2; |
| *p++ = static_cast<uint8_t>('0' + (grpc_status / 10)); |
| *p++ = static_cast<uint8_t>('0' + (grpc_status % 10)); |
| } |
| GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr)); |
| len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr); |
| |
| size_t msg_len = message.length(); |
| GPR_ASSERT(msg_len <= UINT32_MAX); |
| grpc_core::VarintWriter<1> msg_len_writer(static_cast<uint32_t>(msg_len)); |
| message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length()); |
| p = GRPC_SLICE_START_PTR(message_pfx); |
| *p++ = 0x00; // literal header, not indexed |
| *p++ = 12; // len(grpc-message) |
| *p++ = 'g'; |
| *p++ = 'r'; |
| *p++ = 'p'; |
| *p++ = 'c'; |
| *p++ = '-'; |
| *p++ = 'm'; |
| *p++ = 'e'; |
| *p++ = 's'; |
| *p++ = 's'; |
| *p++ = 'a'; |
| *p++ = 'g'; |
| *p++ = 'e'; |
| msg_len_writer.Write(0, p); |
| p += msg_len_writer.length(); |
| GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx)); |
| len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx); |
| len += static_cast<uint32_t>(msg_len); |
| |
| hdr = GRPC_SLICE_MALLOC(9); |
| p = GRPC_SLICE_START_PTR(hdr); |
| *p++ = static_cast<uint8_t>(len >> 16); |
| *p++ = static_cast<uint8_t>(len >> 8); |
| *p++ = static_cast<uint8_t>(len); |
| *p++ = GRPC_CHTTP2_FRAME_HEADER; |
| *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; |
| *p++ = static_cast<uint8_t>(s->id >> 24); |
| *p++ = static_cast<uint8_t>(s->id >> 16); |
| *p++ = static_cast<uint8_t>(s->id >> 8); |
| *p++ = static_cast<uint8_t>(s->id); |
| GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr)); |
| |
| grpc_slice_buffer_add(&t->qbuf, hdr); |
| if (!s->sent_initial_metadata) { |
| grpc_slice_buffer_add(&t->qbuf, http_status_hdr); |
| grpc_slice_buffer_add(&t->qbuf, content_type_hdr); |
| } |
| grpc_slice_buffer_add(&t->qbuf, status_hdr); |
| grpc_slice_buffer_add(&t->qbuf, message_pfx); |
| grpc_slice_buffer_add(&t->qbuf, |
| grpc_slice_from_cpp_string(std::move(message))); |
| grpc_chttp2_reset_ping_clock(t); |
| grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR, |
| &s->stats.outgoing); |
| |
| grpc_chttp2_mark_stream_closed(t, s, 1, 1, error); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); |
| } |
| |
| struct cancel_stream_cb_args { |
| grpc_error_handle error; |
| grpc_chttp2_transport* t; |
| }; |
| |
| static void cancel_stream_cb(void* user_data, uint32_t /*key*/, void* stream) { |
| cancel_stream_cb_args* args = static_cast<cancel_stream_cb_args*>(user_data); |
| grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(stream); |
| grpc_chttp2_cancel_stream(args->t, s, args->error); |
| } |
| |
| static void end_all_the_calls(grpc_chttp2_transport* t, |
| grpc_error_handle error) { |
| intptr_t http2_error; |
| // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server. |
| if (!t->is_client && !grpc_error_has_clear_grpc_status(error) && |
| !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error, |
| &http2_error)) { |
| error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE); |
| } |
| cancel_unstarted_streams(t, error); |
| cancel_stream_cb_args args = {error, t}; |
| grpc_chttp2_stream_map_for_each(&t->stream_map, cancel_stream_cb, &args); |
| } |
| |
| // |
| // INPUT PROCESSING - PARSING |
| // |
| |
| template <class F> |
| static void WithUrgency(grpc_chttp2_transport* t, |
| grpc_core::chttp2::FlowControlAction::Urgency urgency, |
| grpc_chttp2_initiate_write_reason reason, F action) { |
| switch (urgency) { |
| case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED: |
| break; |
| case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY: |
| grpc_chttp2_initiate_write(t, reason); |
| ABSL_FALLTHROUGH_INTENDED; |
| case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE: |
| action(); |
| break; |
| } |
| } |
| |
| void grpc_chttp2_act_on_flowctl_action( |
| const grpc_core::chttp2::FlowControlAction& action, |
| grpc_chttp2_transport* t, grpc_chttp2_stream* s) { |
| WithUrgency(t, action.send_stream_update(), |
| GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() { |
| if (s->id != 0 && !s->read_closed) { |
| grpc_chttp2_mark_stream_writable(t, s); |
| } |
| }); |
| WithUrgency(t, action.send_transport_update(), |
| GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {}); |
| WithUrgency(t, action.send_initial_window_update(), |
| GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() { |
| queue_setting_update(t, |
| GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, |
| action.initial_window_size()); |
| }); |
| WithUrgency(t, action.send_max_frame_size_update(), |
| GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() { |
| queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, |
| action.max_frame_size()); |
| }); |
| if (t->enable_preferred_rx_crypto_frame_advertisement) { |
| WithUrgency( |
| t, action.preferred_rx_crypto_frame_size_update(), |
| GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() { |
| queue_setting_update( |
| t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE, |
| action.preferred_rx_crypto_frame_size()); |
| }); |
| } |
| } |
| |
| static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) { |
| grpc_http_parser parser; |
| size_t i = 0; |
| grpc_error_handle error; |
| grpc_http_response response; |
| |
| grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); |
| |
| grpc_error_handle parse_error; |
| for (; i < t->read_buffer.count && parse_error.ok(); i++) { |
| parse_error = |
| grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr); |
| } |
| if (parse_error.ok() && |
| (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) { |
| error = grpc_error_set_int( |
| grpc_error_set_int( |
| GRPC_ERROR_CREATE("Trying to connect an http1.x server"), |
| grpc_core::StatusIntProperty::kHttpStatus, response.status), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| grpc_http2_status_to_grpc_status(response.status)); |
| } |
| |
| grpc_http_parser_destroy(&parser); |
| grpc_http_response_destroy(&response); |
| return error; |
| } |
| |
| static void read_action(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, nullptr), |
| error); |
| } |
| |
| static void read_action_locked(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| |
| grpc_error_handle err = error; |
| if (!err.ok()) { |
| err = grpc_error_set_int( |
| GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1), |
| grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state); |
| } |
| std::swap(err, error); |
| if (t->closed_with_error.ok()) { |
| size_t i = 0; |
| grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()}; |
| for (; i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) { |
| errors[1] = grpc_chttp2_perform_read(t, t->read_buffer.slices[i]); |
| } |
| if (errors[1] != absl::OkStatus()) { |
| errors[2] = try_http_parsing(t); |
| error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, |
| GPR_ARRAY_SIZE(errors)); |
| } |
| |
| if (t->initial_window_update != 0) { |
| if (t->initial_window_update > 0) { |
| grpc_chttp2_stream* s; |
| while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { |
| grpc_chttp2_mark_stream_writable(t, s); |
| grpc_chttp2_initiate_write( |
| t, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); |
| } |
| } |
| t->initial_window_update = 0; |
| } |
| } |
| |
| bool keep_reading = false; |
| if (error.ok() && !t->closed_with_error.ok()) { |
| error = GRPC_ERROR_CREATE_REFERENCING("Transport closed", |
| &t->closed_with_error, 1); |
| } |
| if (!error.ok()) { |
| // If a goaway frame was received, this might be the reason why the read |
| // failed. Add this info to the error |
| if (!t->goaway_error.ok()) { |
| error = grpc_error_add_child(error, t->goaway_error); |
| } |
| |
| close_transport_locked(t, error); |
| t->endpoint_reading = 0; |
| } else if (t->closed_with_error.ok()) { |
| keep_reading = true; |
| // Since we have read a byte, reset the keepalive timer |
| if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { |
| maybe_reset_keepalive_ping_timer_locked(t); |
| } |
| } |
| grpc_slice_buffer_reset_and_unref(&t->read_buffer); |
| |
| if (keep_reading) { |
| if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) { |
| t->reading_paused_on_pending_induced_frames = true; |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, |
| "transport %p : Pausing reading due to too " |
| "many unwritten SETTINGS ACK and RST_STREAM frames", |
| t)); |
| } else { |
| continue_read_action_locked(t); |
| } |
| } else { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); |
| } |
| } |
| |
| static void continue_read_action_locked(grpc_chttp2_transport* t) { |
| const bool urgent = !t->goaway_error.ok(); |
| GRPC_CLOSURE_INIT(&t->read_action_locked, read_action, t, |
| grpc_schedule_on_exec_ctx); |
| grpc_endpoint_read(t->ep, &t->read_buffer, &t->read_action_locked, urgent, |
| grpc_chttp2_min_read_progress_size(t)); |
| } |
| |
| // t is reffed prior to calling the first time, and once the callback chain |
| // that kicks off finishes, it's unreffed |
| void schedule_bdp_ping_locked(grpc_chttp2_transport* t) { |
| t->flow_control.bdp_estimator()->SchedulePing(); |
| send_ping_locked( |
| t, |
| GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, start_bdp_ping, t, |
| grpc_schedule_on_exec_ctx), |
| GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, finish_bdp_ping, t, |
| grpc_schedule_on_exec_ctx)); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING); |
| } |
| |
| static void start_bdp_ping(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_bdp_ping_locked, |
| start_bdp_ping_locked, t, nullptr), |
| error); |
| } |
| |
| static void start_bdp_ping_locked(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log(GPR_INFO, "%s: Start BDP ping err=%s", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| grpc_core::StatusToString(error).c_str()); |
| } |
| if (!error.ok() || !t->closed_with_error.ok()) { |
| return; |
| } |
| // Reset the keepalive ping timer |
| if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { |
| maybe_reset_keepalive_ping_timer_locked(t); |
| } |
| t->flow_control.bdp_estimator()->StartPing(); |
| t->bdp_ping_started = true; |
| } |
| |
| static void finish_bdp_ping(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, |
| finish_bdp_ping_locked, t, nullptr), |
| error); |
| } |
| |
| static void finish_bdp_ping_locked(void* tp, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { |
| gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| grpc_core::StatusToString(error).c_str()); |
| } |
| if (!error.ok() || !t->closed_with_error.ok()) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); |
| return; |
| } |
| if (!t->bdp_ping_started) { |
| // start_bdp_ping_locked has not been run yet. Schedule |
| // finish_bdp_ping_locked to be run later. |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_bdp_ping_locked, |
| finish_bdp_ping_locked, t, nullptr), |
| error); |
| return; |
| } |
| t->bdp_ping_started = false; |
| grpc_core::Timestamp next_ping = |
| t->flow_control.bdp_estimator()->CompletePing(); |
| grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t, |
| nullptr); |
| GPR_ASSERT(!t->next_bdp_ping_timer_handle.has_value()); |
| t->next_bdp_ping_timer_handle = |
| t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| next_bdp_ping_timer_expired(t); |
| }); |
| } |
| |
| static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) { |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&t->next_bdp_ping_timer_expired_locked, |
| next_bdp_ping_timer_expired_locked, t, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void next_bdp_ping_timer_expired_locked( |
| void* tp, GRPC_UNUSED grpc_error_handle error) { |
| GPR_DEBUG_ASSERT(error.ok()); |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp); |
| GPR_ASSERT(t->next_bdp_ping_timer_handle.has_value()); |
| t->next_bdp_ping_timer_handle.reset(); |
| if (t->flow_control.bdp_estimator()->accumulator() == 0) { |
| // Block the bdp ping till we receive more data. |
| t->bdp_ping_blocked = true; |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "bdp_ping"); |
| } else { |
| schedule_bdp_ping_locked(t); |
| } |
| } |
| |
| void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, |
| bool is_client) { |
| size_t i; |
| if (args) { |
| for (i = 0; i < args->num_args; i++) { |
| if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { |
| const int value = grpc_channel_arg_get_integer( |
| &args->args[i], {is_client ? g_default_client_keepalive_time_ms |
| : g_default_server_keepalive_time_ms, |
| 1, INT_MAX}); |
| if (is_client) { |
| g_default_client_keepalive_time_ms = value; |
| } else { |
| g_default_server_keepalive_time_ms = value; |
| } |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { |
| const int value = grpc_channel_arg_get_integer( |
| &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms |
| : g_default_server_keepalive_timeout_ms, |
| 0, INT_MAX}); |
| if (is_client) { |
| g_default_client_keepalive_timeout_ms = value; |
| } else { |
| g_default_server_keepalive_timeout_ms = value; |
| } |
| } else if (0 == strcmp(args->args[i].key, |
| GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { |
| const bool value = static_cast<uint32_t>(grpc_channel_arg_get_integer( |
| &args->args[i], |
| {is_client ? g_default_client_keepalive_permit_without_calls |
| : g_default_server_keepalive_timeout_ms, |
| 0, 1})); |
| if (is_client) { |
| g_default_client_keepalive_permit_without_calls = value; |
| } else { |
| g_default_server_keepalive_permit_without_calls = value; |
| } |
| } else if (0 == |
| strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { |
| g_default_max_ping_strikes = grpc_channel_arg_get_integer( |
| &args->args[i], {g_default_max_ping_strikes, 0, INT_MAX}); |
| } else if (0 == strcmp(args->args[i].key, |
| GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { |
| g_default_max_pings_without_data = grpc_channel_arg_get_integer( |
| &args->args[i], {g_default_max_pings_without_data, 0, INT_MAX}); |
| } else if (0 == |
| strcmp( |
| args->args[i].key, |
| GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { |
| g_default_min_recv_ping_interval_without_data_ms = |
| grpc_channel_arg_get_integer( |
| &args->args[i], |
| {g_default_min_recv_ping_interval_without_data_ms, 0, INT_MAX}); |
| } |
| } |
| } |
| } |
| |
| static void init_keepalive_ping(grpc_chttp2_transport* t) { |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->init_keepalive_ping_locked, |
| init_keepalive_ping_locked, t, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void init_keepalive_ping_locked(void* arg, |
| GRPC_UNUSED grpc_error_handle error) { |
| GPR_DEBUG_ASSERT(error.ok()); |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); |
| GPR_ASSERT(t->keepalive_ping_timer_handle.has_value()); |
| t->keepalive_ping_timer_handle.reset(); |
| if (t->destroying || !t->closed_with_error.ok()) { |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; |
| } else { |
| if (t->keepalive_permit_without_calls || |
| grpc_chttp2_stream_map_size(&t->stream_map) > 0) { |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; |
| GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); |
| send_keepalive_ping_locked(t); |
| grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); |
| } else { |
| GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); |
| t->keepalive_ping_timer_handle = |
| t->event_engine->RunAfter(t->keepalive_time, [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| init_keepalive_ping(t); |
| }); |
| } |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "init keepalive ping"); |
| } |
| |
| static void start_keepalive_ping(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->start_keepalive_ping_locked, |
| start_keepalive_ping_locked, t, nullptr), |
| error); |
| } |
| |
| static void start_keepalive_ping_locked(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| if (!error.ok()) { |
| return; |
| } |
| if (t->channelz_socket != nullptr) { |
| t->channelz_socket->RecordKeepaliveSent(); |
| } |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) || |
| GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) { |
| gpr_log(GPR_INFO, "%s: Start keepalive ping", |
| std::string(t->peer_string.as_string_view()).c_str()); |
| } |
| GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); |
| t->keepalive_watchdog_timer_handle = |
| t->event_engine->RunAfter(t->keepalive_timeout, [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| keepalive_watchdog_fired(t); |
| }); |
| t->keepalive_ping_started = true; |
| } |
| |
| static void finish_keepalive_ping(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| t->combiner->Run(GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, |
| finish_keepalive_ping_locked, t, nullptr), |
| error); |
| } |
| |
| static void finish_keepalive_ping_locked(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { |
| if (error.ok()) { |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) || |
| GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) { |
| gpr_log(GPR_INFO, "%s: Finish keepalive ping", |
| std::string(t->peer_string.as_string_view()).c_str()); |
| } |
| if (!t->keepalive_ping_started) { |
| // start_keepalive_ping_locked has not run yet. Reschedule |
| // finish_keepalive_ping_locked for it to be run later. |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&t->finish_keepalive_ping_locked, |
| finish_keepalive_ping_locked, t, nullptr), |
| error); |
| return; |
| } |
| t->keepalive_ping_started = false; |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; |
| if (t->keepalive_watchdog_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->keepalive_watchdog_timer_handle)) { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); |
| t->keepalive_watchdog_timer_handle.reset(); |
| } |
| } |
| GPR_ASSERT(!t->keepalive_ping_timer_handle.has_value()); |
| GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); |
| t->keepalive_ping_timer_handle = |
| t->event_engine->RunAfter(t->keepalive_time, [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| init_keepalive_ping(t); |
| }); |
| } |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive ping end"); |
| } |
| |
| static void keepalive_watchdog_fired(grpc_chttp2_transport* t) { |
| t->combiner->Run( |
| GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked, |
| keepalive_watchdog_fired_locked, t, nullptr), |
| absl::OkStatus()); |
| } |
| |
| static void keepalive_watchdog_fired_locked( |
| void* arg, GRPC_UNUSED grpc_error_handle error) { |
| GPR_DEBUG_ASSERT(error.ok()); |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| GPR_ASSERT(t->keepalive_watchdog_timer_handle.has_value()); |
| t->keepalive_watchdog_timer_handle.reset(); |
| if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { |
| gpr_log(GPR_INFO, "%s: Keepalive watchdog fired. Closing transport.", |
| std::string(t->peer_string.as_string_view()).c_str()); |
| t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; |
| close_transport_locked( |
| t, grpc_error_set_int(GRPC_ERROR_CREATE("keepalive watchdog timeout"), |
| grpc_core::StatusIntProperty::kRpcStatus, |
| GRPC_STATUS_UNAVAILABLE)); |
| } else { |
| // If keepalive_state is not PINGING, we consider it as an error. Maybe the |
| // cancellation failed in finish_keepalive_ping_locked. Users have seen |
| // other states: https://github.com/grpc/grpc/issues/32085. |
| gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", |
| t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "keepalive watchdog"); |
| } |
| |
| static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) { |
| if (t->keepalive_ping_timer_handle.has_value()) { |
| if (t->event_engine->Cancel(*t->keepalive_ping_timer_handle)) { |
| // Cancel succeeds, resets the keepalive ping timer. Note that we don't |
| // need to Ref or Unref here since we still hold the Ref. |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) || |
| GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) { |
| gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.", |
| std::string(t->peer_string.as_string_view()).c_str()); |
| } |
| t->keepalive_ping_timer_handle = |
| t->event_engine->RunAfter(t->keepalive_time, [t] { |
| grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
| grpc_core::ExecCtx exec_ctx; |
| init_keepalive_ping(t); |
| }); |
| } |
| } |
| } |
| |
| // |
| // CALLBACK LOOP |
| // |
| |
| static void connectivity_state_set(grpc_chttp2_transport* t, |
| grpc_connectivity_state state, |
| const absl::Status& status, |
| const char* reason) { |
| GRPC_CHTTP2_IF_TRACING( |
| gpr_log(GPR_INFO, "transport %p set connectivity_state=%d", t, state)); |
| t->state_tracker.SetState(state, status, reason); |
| } |
| |
| // |
| // POLLSET STUFF |
| // |
| |
| static void set_pollset(grpc_transport* gt, grpc_stream* /*gs*/, |
| grpc_pollset* pollset) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| grpc_endpoint_add_to_pollset(t->ep, pollset); |
| } |
| |
| static void set_pollset_set(grpc_transport* gt, grpc_stream* /*gs*/, |
| grpc_pollset_set* pollset_set) { |
| grpc_chttp2_transport* t = reinterpret_cast<grpc_chttp2_transport*>(gt); |
| grpc_endpoint_add_to_pollset_set(t->ep, pollset_set); |
| } |
| |
| // |
| // RESOURCE QUOTAS |
| // |
| |
| static void post_benign_reclaimer(grpc_chttp2_transport* t) { |
| if (!t->benign_reclaimer_registered) { |
| t->benign_reclaimer_registered = true; |
| GRPC_CHTTP2_REF_TRANSPORT(t, "benign_reclaimer"); |
| t->memory_owner.PostReclaimer( |
| grpc_core::ReclamationPass::kBenign, |
| [t](absl::optional<grpc_core::ReclamationSweep> sweep) { |
| if (sweep.has_value()) { |
| GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, |
| benign_reclaimer_locked, t, |
| grpc_schedule_on_exec_ctx); |
| t->active_reclamation = std::move(*sweep); |
| t->combiner->Run(&t->benign_reclaimer_locked, absl::OkStatus()); |
| } else { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); |
| } |
| }); |
| } |
| } |
| |
| static void post_destructive_reclaimer(grpc_chttp2_transport* t) { |
| if (!t->destructive_reclaimer_registered) { |
| t->destructive_reclaimer_registered = true; |
| GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); |
| t->memory_owner.PostReclaimer( |
| grpc_core::ReclamationPass::kDestructive, |
| [t](absl::optional<grpc_core::ReclamationSweep> sweep) { |
| if (sweep.has_value()) { |
| GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, |
| destructive_reclaimer_locked, t, |
| grpc_schedule_on_exec_ctx); |
| t->active_reclamation = std::move(*sweep); |
| t->combiner->Run(&t->destructive_reclaimer_locked, |
| absl::OkStatus()); |
| } else { |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); |
| } |
| }); |
| } |
| } |
| |
| static void benign_reclaimer_locked(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| if (error.ok() && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { |
| // Channel with no active streams: send a goaway to try and make it |
| // disconnect cleanly |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
| gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory", |
| std::string(t->peer_string.as_string_view()).c_str()); |
| } |
| send_goaway(t, |
| grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), |
| grpc_core::StatusIntProperty::kHttp2Error, |
| GRPC_HTTP2_ENHANCE_YOUR_CALM), |
| /*immediate_disconnect_hint=*/true); |
| } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
| gpr_log(GPR_INFO, |
| "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR |
| " streams", |
| std::string(t->peer_string.as_string_view()).c_str(), |
| grpc_chttp2_stream_map_size(&t->stream_map)); |
| } |
| t->benign_reclaimer_registered = false; |
| if (error != absl::CancelledError()) { |
| t->active_reclamation.Finish(); |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "benign_reclaimer"); |
| } |
| |
| static void destructive_reclaimer_locked(void* arg, grpc_error_handle error) { |
| grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(arg); |
| size_t n = grpc_chttp2_stream_map_size(&t->stream_map); |
| t->destructive_reclaimer_registered = false; |
| if (error.ok() && n > 0) { |
| grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>( |
| grpc_chttp2_stream_map_rand(&t->stream_map)); |
| if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) { |
| gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d", |
| std::string(t->peer_string.as_string_view()).c_str(), s->id); |
| } |
| grpc_chttp2_cancel_stream( |
| t, s, |
| grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"), |
| grpc_core::StatusIntProperty::kHttp2Error, |
| GRPC_HTTP2_ENHANCE_YOUR_CALM)); |
| if (n > 1) { |
| // Since we cancel one stream per destructive reclamation, if |
| // there are more streams left, we can immediately post a new |
| // reclaimer in case the resource quota needs to free more |
| // memory |
| post_destructive_reclaimer(t); |
| } |
| } |
| if (error != absl::CancelledError()) { |
| t->active_reclamation.Finish(); |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "destructive_reclaimer"); |
| } |
| |
| // |
| // MONITORING |
| // |
| |
| const char* grpc_chttp2_initiate_write_reason_string( |
| grpc_chttp2_initiate_write_reason reason) { |
| switch (reason) { |
| case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: |
| return "INITIAL_WRITE"; |
| case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: |
| return "START_NEW_STREAM"; |
| case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: |
| return "SEND_MESSAGE"; |
| case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: |
| return "SEND_INITIAL_METADATA"; |
| case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: |
| return "SEND_TRAILING_METADATA"; |
| case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: |
| return "RETRY_SEND_PING"; |
| case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: |
| return "CONTINUE_PINGS"; |
| case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: |
| return "GOAWAY_SENT"; |
| case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: |
| return "RST_STREAM"; |
| case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: |
| return "CLOSE_FROM_API"; |
| case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: |
| return "STREAM_FLOW_CONTROL"; |
| case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: |
| return "TRANSPORT_FLOW_CONTROL"; |
| case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: |
| return "SEND_SETTINGS"; |
| case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK: |
| return "SETTINGS_ACK"; |
| case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: |
| return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; |
| case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: |
| return "FLOW_CONTROL_UNSTALLED_BY_UPDATE"; |
| case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: |
| return "APPLICATION_PING"; |
| case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING: |
| return "BDP_PING"; |
| case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: |
| return "KEEPALIVE_PING"; |
| case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: |
| return "TRANSPORT_FLOW_CONTROL_UNSTALLED"; |
| case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: |
| return "PING_RESPONSE"; |
| case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: |
| return "FORCE_RST_STREAM"; |
| } |
| GPR_UNREACHABLE_CODE(return "unknown"); |
| } |
| |
| static grpc_endpoint* chttp2_get_endpoint(grpc_transport* t) { |
| return (reinterpret_cast<grpc_chttp2_transport*>(t))->ep; |
| } |
| |
| static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), |
| "chttp2", |
| init_stream, |
| nullptr, |
| set_pollset, |
| set_pollset_set, |
| perform_stream_op, |
| perform_transport_op, |
| destroy_stream, |
| destroy_transport, |
| chttp2_get_endpoint}; |
| |
| static const grpc_transport_vtable* get_vtable(void) { return &vtable; } |
| |
| grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> |
| grpc_chttp2_transport_get_socket_node(grpc_transport* transport) { |
| grpc_chttp2_transport* t = |
| reinterpret_cast<grpc_chttp2_transport*>(transport); |
| return t->channelz_socket; |
| } |
| |
| grpc_transport* grpc_create_chttp2_transport( |
| const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, |
| bool is_client) { |
| auto t = new grpc_chttp2_transport(channel_args, ep, is_client); |
| return &t->base; |
| } |
| |
| void grpc_chttp2_transport_start_reading( |
| grpc_transport* transport, grpc_slice_buffer* read_buffer, |
| grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) { |
| grpc_chttp2_transport* t = |
| reinterpret_cast<grpc_chttp2_transport*>(transport); |
| GRPC_CHTTP2_REF_TRANSPORT( |
| t, "reading_action"); // matches unref inside reading_action |
| if (read_buffer != nullptr) { |
| grpc_slice_buffer_move_into(read_buffer, &t->read_buffer); |
| gpr_free(read_buffer); |
| } |
| t->combiner->Run( |
| grpc_core::NewClosure([t, notify_on_receive_settings, |
| notify_on_close](grpc_error_handle) { |
| if (!t->closed_with_error.ok()) { |
| if (notify_on_receive_settings != nullptr) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings, |
| t->closed_with_error); |
| } |
| if (notify_on_close != nullptr) { |
| grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close, |
| t->closed_with_error); |
| } |
| GRPC_CHTTP2_UNREF_TRANSPORT(t, "reading_action"); |
| return; |
| } |
| t->notify_on_receive_settings = notify_on_receive_settings; |
| t->notify_on_close = notify_on_close; |
| read_action_locked(t, absl::OkStatus()); |
| }), |
| absl::OkStatus()); |
| } |