blob: 9ddd2b0256f6d570df027811e30827cbdbc46cd8 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include <grpc/grpc_posix.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/frame.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/insecure/insecure_credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/handshaker.h"
#include "src/core/lib/transport/handshaker_registry.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/transport/transport_fwd.h"
#include "src/core/lib/uri/uri_parser.h"
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
namespace grpc_core {
namespace {
const char kUnixUriPrefix[] = "unix:";
const char kUnixAbstractUriPrefix[] = "unix-abstract:";
class Chttp2ServerListener : public Server::ListenerInterface {
public:
static grpc_error_handle Create(Server* server, grpc_resolved_address* addr,
const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier,
int* port_num);
static grpc_error_handle CreateWithAcceptor(
Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier);
// Do not instantiate directly. Use one of the factory methods above.
Chttp2ServerListener(Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier);
~Chttp2ServerListener() override;
void Start(Server* server,
const std::vector<grpc_pollset*>* pollsets) override;
channelz::ListenSocketNode* channelz_listen_socket_node() const override {
return channelz_listen_socket_.get();
}
void SetOnDestroyDone(grpc_closure* on_destroy_done) override;
void Orphan() override;
private:
class ConfigFetcherWatcher
: public grpc_server_config_fetcher::WatcherInterface {
public:
explicit ConfigFetcherWatcher(RefCountedPtr<Chttp2ServerListener> listener)
: listener_(std::move(listener)) {}
void UpdateConnectionManager(
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager) override;
void StopServing() override;
private:
RefCountedPtr<Chttp2ServerListener> listener_;
};
class ActiveConnection : public InternallyRefCounted<ActiveConnection> {
public:
class HandshakingState : public InternallyRefCounted<HandshakingState> {
public:
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
const ChannelArgs& args);
~HandshakingState() override;
void Orphan() override;
void Start(grpc_endpoint* endpoint, const ChannelArgs& args);
// Needed to be able to grab an external ref in ActiveConnection::Start()
using InternallyRefCounted<HandshakingState>::Ref;
private:
static void OnTimeout(void* arg, grpc_error_handle error);
static void OnReceiveSettings(void* arg, grpc_error_handle /* error */);
static void OnHandshakeDone(void* arg, grpc_error_handle error);
RefCountedPtr<ActiveConnection> const connection_;
grpc_pollset* const accepting_pollset_;
grpc_tcp_server_acceptor* acceptor_;
RefCountedPtr<HandshakeManager> handshake_mgr_
ABSL_GUARDED_BY(&connection_->mu_);
// State for enforcing handshake timeout on receiving HTTP/2 settings.
Timestamp const deadline_;
grpc_timer timer_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_);
grpc_pollset_set* const interested_parties_;
};
ActiveConnection(grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor,
const ChannelArgs& args, MemoryOwner memory_owner);
~ActiveConnection() override;
void Orphan() override;
void SendGoAway();
void Start(RefCountedPtr<Chttp2ServerListener> listener,
grpc_endpoint* endpoint, const ChannelArgs& args);
// Needed to be able to grab an external ref in
// Chttp2ServerListener::OnAccept()
using InternallyRefCounted<ActiveConnection>::Ref;
private:
static void OnClose(void* arg, grpc_error_handle error);
static void OnDrainGraceTimeExpiry(void* arg, grpc_error_handle error);
RefCountedPtr<Chttp2ServerListener> listener_;
Mutex mu_ ABSL_ACQUIRED_AFTER(&listener_->mu_);
// Set by HandshakingState before the handshaking begins and reset when
// handshaking is done.
OrphanablePtr<HandshakingState> handshaking_state_ ABSL_GUARDED_BY(&mu_);
// Set by HandshakingState when handshaking is done and a valid transport is
// created.
grpc_chttp2_transport* transport_ ABSL_GUARDED_BY(&mu_) = nullptr;
grpc_closure on_close_;
grpc_timer drain_grace_timer_;
grpc_closure on_drain_grace_time_expiry_;
bool drain_grace_timer_expiry_callback_pending_ ABSL_GUARDED_BY(&mu_) =
false;
bool shutdown_ ABSL_GUARDED_BY(&mu_) = false;
};
// To allow access to RefCounted<> like interface.
friend class RefCountedPtr<Chttp2ServerListener>;
// Should only be called once so as to start the TCP server.
void StartListening();
static void OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
static void TcpServerShutdownComplete(void* arg, grpc_error_handle error);
static void DestroyListener(Server* /*server*/, void* arg,
grpc_closure* destroy_done);
// The interface required by RefCountedPtr<> has been manually implemented
// here to take a ref on tcp_server_ instead. Note that, the handshaker needs
// tcp_server_ to exist for the lifetime of the handshake since it's needed by
// acceptor. Sharing refs between the listener and tcp_server_ is just an
// optimization to avoid taking additional refs on the listener, since
// TcpServerShutdownComplete already holds a ref to the listener.
void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
void IncrementRefCount(const DebugLocation& /* location */,
const char* /* reason */) {
IncrementRefCount();
}
RefCountedPtr<Chttp2ServerListener> Ref() GRPC_MUST_USE_RESULT {
IncrementRefCount();
return RefCountedPtr<Chttp2ServerListener>(this);
}
RefCountedPtr<Chttp2ServerListener> Ref(const DebugLocation& /* location */,
const char* /* reason */)
GRPC_MUST_USE_RESULT {
return Ref();
}
void Unref() { grpc_tcp_server_unref(tcp_server_); }
void Unref(const DebugLocation& /* location */, const char* /* reason */) {
Unref();
}
Server* const server_;
grpc_tcp_server* tcp_server_;
grpc_resolved_address resolved_address_;
Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
ChannelArgs args_;
Mutex mu_;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_ ABSL_GUARDED_BY(mu_);
// Signals whether grpc_tcp_server_start() has been called.
bool started_ ABSL_GUARDED_BY(mu_) = false;
// Signals whether grpc_tcp_server_start() has completed.
CondVar started_cv_ ABSL_GUARDED_BY(mu_);
// Signals whether new requests/connections are to be accepted.
bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
// Signals whether the application has triggered shutdown.
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_
ABSL_GUARDED_BY(mu_);
grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_);
grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr;
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
MemoryQuotaRefPtr memory_quota_;
};
//
// Chttp2ServerListener::ConfigFetcherWatcher
//
void Chttp2ServerListener::ConfigFetcherWatcher::UpdateConnectionManager(
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager) {
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager_to_destroy;
class GracefulShutdownExistingConnections {
public:
~GracefulShutdownExistingConnections() {
// Send GOAWAYs on the transports so that they get disconnected when
// existing RPCs finish, and so that no new RPC is started on them.
for (auto& connection : connections_) {
connection.first->SendGoAway();
}
}
void set_connections(
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>>
connections) {
GPR_ASSERT(connections_.empty());
connections_ = std::move(connections);
}
private:
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections_;
} connections_to_shutdown;
{
MutexLock lock(&listener_->mu_);
connection_manager_to_destroy = listener_->connection_manager_;
listener_->connection_manager_ = std::move(connection_manager);
connections_to_shutdown.set_connections(std::move(listener_->connections_));
if (listener_->shutdown_) {
return;
}
listener_->is_serving_ = true;
if (listener_->started_) return;
}
int port_temp;
grpc_error_handle error = grpc_tcp_server_add_port(
listener_->tcp_server_, &listener_->resolved_address_, &port_temp);
if (!GRPC_ERROR_IS_NONE(error)) {
GRPC_ERROR_UNREF(error);
gpr_log(GPR_ERROR, "Error adding port to server: %s",
grpc_error_std_string(error).c_str());
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT(0);
}
listener_->StartListening();
{
MutexLock lock(&listener_->mu_);
listener_->started_ = true;
listener_->started_cv_.SignalAll();
}
}
void Chttp2ServerListener::ConfigFetcherWatcher::StopServing() {
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
{
MutexLock lock(&listener_->mu_);
listener_->is_serving_ = false;
connections = std::move(listener_->connections_);
}
// Send GOAWAYs on the transports so that they disconnected when existing RPCs
// finish.
for (auto& connection : connections) {
connection.first->SendGoAway();
}
}
//
// Chttp2ServerListener::ActiveConnection::HandshakingState
//
Timestamp GetConnectionDeadline(const ChannelArgs& args) {
return ExecCtx::Get()->Now() +
std::max(
Duration::Milliseconds(1),
args.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)
.value_or(Duration::Seconds(120)));
}
Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
RefCountedPtr<ActiveConnection> connection_ref,
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
const ChannelArgs& args)
: connection_(std::move(connection_ref)),
accepting_pollset_(accepting_pollset),
acceptor_(acceptor),
handshake_mgr_(MakeRefCounted<HandshakeManager>()),
deadline_(GetConnectionDeadline(args)),
interested_parties_(grpc_pollset_set_create()) {
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
}
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
grpc_pollset_set_destroy(interested_parties_);
gpr_free(acceptor_);
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::Orphan() {
{
MutexLock lock(&connection_->mu_);
if (handshake_mgr_ != nullptr) {
handshake_mgr_->Shutdown(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Listener stopped serving."));
}
}
Unref();
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::Start(
grpc_endpoint* endpoint, const ChannelArgs& channel_args) {
Ref().release(); // Held by OnHandshakeDone
RefCountedPtr<HandshakeManager> handshake_mgr;
{
MutexLock lock(&connection_->mu_);
if (handshake_mgr_ == nullptr) return;
handshake_mgr = handshake_mgr_;
}
handshake_mgr->DoHandshake(endpoint, channel_args, deadline_, acceptor_,
OnHandshakeDone, this);
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnTimeout(
void* arg, grpc_error_handle error) {
HandshakingState* self = static_cast<HandshakingState*>(arg);
// Note that we may be called with GRPC_ERROR_NONE when the timer fires
// or with an error indicating that the timer system is being shut down.
if (error != GRPC_ERROR_CANCELLED) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Did not receive HTTP/2 settings before handshake timeout");
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&self->connection_->mu_);
transport = self->connection_->transport_;
}
grpc_transport_perform_op(&transport->base, op);
}
self->Unref();
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::
OnReceiveSettings(void* arg, grpc_error_handle /* error */) {
HandshakingState* self = static_cast<HandshakingState*>(arg);
grpc_timer_cancel(&self->timer_);
self->Unref();
}
void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
void* arg, grpc_error_handle error) {
auto* args = static_cast<HandshakerArgs*>(arg);
HandshakingState* self = static_cast<HandshakingState*>(args->user_data);
OrphanablePtr<HandshakingState> handshaking_state_ref;
RefCountedPtr<HandshakeManager> handshake_mgr;
bool cleanup_connection = false;
{
MutexLock connection_lock(&self->connection_->mu_);
if (!GRPC_ERROR_IS_NONE(error) || self->connection_->shutdown_) {
std::string error_str = grpc_error_std_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str());
cleanup_connection = true;
if (GRPC_ERROR_IS_NONE(error) && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE);
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
}
} else {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
if (args->endpoint != nullptr) {
grpc_transport* transport =
grpc_create_chttp2_transport(args->args, args->endpoint, false);
grpc_error_handle channel_init_err =
self->connection_->listener_->server_->SetupTransport(
transport, self->accepting_pollset_, args->args,
grpc_chttp2_transport_get_socket_node(transport));
if (GRPC_ERROR_IS_NONE(channel_init_err)) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
// Note: The reinterpret_cast<>s here are safe, because
// grpc_chttp2_transport is a C-style extension of
// grpc_transport, so this is morally equivalent of a
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->connection_->transport_ =
reinterpret_cast<grpc_chttp2_transport*>(transport);
GRPC_CHTTP2_REF_TRANSPORT(self->connection_->transport_,
"ActiveConnection"); // Held by connection_
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
self, grpc_schedule_on_exec_ctx);
// If the listener has been configured with a config fetcher, we need
// to watch on the transport being closed so that we can an updated
// list of active connections.
grpc_closure* on_close = nullptr;
if (self->connection_->listener_->config_fetcher_watcher_ !=
nullptr) {
// Refs helds by OnClose()
self->connection_->Ref().release();
on_close = &self->connection_->on_close_;
} else {
// Remove the connection from the connections_ map since OnClose()
// will not be invoked when a config fetcher is set.
cleanup_connection = true;
}
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_,
on_close);
self->Ref().release(); // Held by OnTimeout().
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
} else {
// Failed to create channel from transport. Clean up.
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_error_std_string(channel_init_err).c_str());
GRPC_ERROR_UNREF(channel_init_err);
grpc_transport_destroy(transport);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
cleanup_connection = true;
}
} else {
cleanup_connection = true;
}
}
// Since the handshake manager is done, the connection no longer needs to
// shutdown the handshake when the listener needs to stop serving.
// Avoid calling the destructor of HandshakeManager and HandshakingState
// from within the critical region.
handshake_mgr = std::move(self->handshake_mgr_);
handshaking_state_ref = std::move(self->connection_->handshaking_state_);
}
gpr_free(self->acceptor_);
self->acceptor_ = nullptr;
OrphanablePtr<ActiveConnection> connection;
if (cleanup_connection) {
MutexLock listener_lock(&self->connection_->listener_->mu_);
auto it = self->connection_->listener_->connections_.find(
self->connection_.get());
if (it != self->connection_->listener_->connections_.end()) {
connection = std::move(it->second);
self->connection_->listener_->connections_.erase(it);
}
}
self->Unref();
}
//
// Chttp2ServerListener::ActiveConnection
//
Chttp2ServerListener::ActiveConnection::ActiveConnection(
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor,
const ChannelArgs& args, MemoryOwner memory_owner)
: handshaking_state_(memory_owner.MakeOrphanable<HandshakingState>(
Ref(), accepting_pollset, acceptor, args)) {
GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this,
grpc_schedule_on_exec_ctx);
}
Chttp2ServerListener::ActiveConnection::~ActiveConnection() {
if (transport_ != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(transport_, "ActiveConnection");
}
}
void Chttp2ServerListener::ActiveConnection::Orphan() {
OrphanablePtr<HandshakingState> handshaking_state;
{
MutexLock lock(&mu_);
shutdown_ = true;
// Reset handshaking_state_ since we have been orphaned by the listener
// signaling that the listener has stopped serving.
handshaking_state = std::move(handshaking_state_);
}
Unref();
}
void Chttp2ServerListener::ActiveConnection::SendGoAway() {
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&mu_);
if (transport_ != nullptr && !shutdown_) {
transport = transport_;
Ref().release(); // Ref held by OnDrainGraceTimeExpiry
GRPC_CLOSURE_INIT(&on_drain_grace_time_expiry_, OnDrainGraceTimeExpiry,
this, nullptr);
grpc_timer_init(
&drain_grace_timer_,
ExecCtx::Get()->Now() +
std::max(
Duration::Zero(),
listener_->args_
.GetDurationFromIntMillis(
GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
.value_or(Duration::Minutes(10))),
&on_drain_grace_time_expiry_);
drain_grace_timer_expiry_callback_pending_ = true;
shutdown_ = true;
}
}
if (transport != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server is stopping to serve requests.");
grpc_transport_perform_op(&transport->base, op);
}
}
void Chttp2ServerListener::ActiveConnection::Start(
RefCountedPtr<Chttp2ServerListener> listener, grpc_endpoint* endpoint,
const ChannelArgs& args) {
RefCountedPtr<HandshakingState> handshaking_state_ref;
listener_ = std::move(listener);
{
MutexLock lock(&mu_);
if (shutdown_) return;
// Hold a ref to HandshakingState to allow starting the handshake outside
// the critical region.
handshaking_state_ref = handshaking_state_->Ref();
}
handshaking_state_ref->Start(endpoint, args);
}
void Chttp2ServerListener::ActiveConnection::OnClose(
void* arg, grpc_error_handle /* error */) {
ActiveConnection* self = static_cast<ActiveConnection*>(arg);
OrphanablePtr<ActiveConnection> connection;
{
MutexLock listener_lock(&self->listener_->mu_);
MutexLock connection_lock(&self->mu_);
// The node was already deleted from the connections_ list if the connection
// is shutdown.
if (!self->shutdown_) {
auto it = self->listener_->connections_.find(self);
if (it != self->listener_->connections_.end()) {
connection = std::move(it->second);
self->listener_->connections_.erase(it);
}
self->shutdown_ = true;
}
// Cancel the drain_grace_timer_ if needed.
if (self->drain_grace_timer_expiry_callback_pending_) {
grpc_timer_cancel(&self->drain_grace_timer_);
}
}
self->Unref();
}
void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry(
void* arg, grpc_error_handle error) {
ActiveConnection* self = static_cast<ActiveConnection*>(arg);
// If the drain_grace_timer_ was not cancelled, disconnect the transport
// immediately.
if (GRPC_ERROR_IS_NONE(error)) {
grpc_chttp2_transport* transport = nullptr;
{
MutexLock lock(&self->mu_);
transport = self->transport_;
}
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Drain grace time expired. Closing connection immediately.");
grpc_transport_perform_op(&transport->base, op);
}
self->Unref();
}
//
// Chttp2ServerListener
//
grpc_error_handle Chttp2ServerListener::Create(
Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, int* port_num) {
Chttp2ServerListener* listener = nullptr;
// The bulk of this method is inside of a lambda to make cleanup
// easier without using goto.
grpc_error_handle error = [&]() {
grpc_error_handle error = GRPC_ERROR_NONE;
// Create Chttp2ServerListener.
listener = new Chttp2ServerListener(server, args, args_modifier);
error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args.ToC().get(), &listener->tcp_server_);
if (!GRPC_ERROR_IS_NONE(error)) return error;
if (server->config_fetcher() != nullptr) {
listener->resolved_address_ = *addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
} else {
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
if (!GRPC_ERROR_IS_NONE(error)) return error;
}
// Create channelz node.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
auto string_address = grpc_sockaddr_to_uri(addr);
if (!string_address.ok()) {
return GRPC_ERROR_CREATE_FROM_CPP_STRING(
string_address.status().ToString());
}
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
*string_address,
absl::StrCat("chttp2 listener ", *string_address));
}
// Register with the server only upon success
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return GRPC_ERROR_NONE;
}();
if (!GRPC_ERROR_IS_NONE(error)) {
if (listener != nullptr) {
if (listener->tcp_server_ != nullptr) {
// listener is deleted when tcp_server_ is shutdown.
grpc_tcp_server_unref(listener->tcp_server_);
} else {
delete listener;
}
}
}
return error;
}
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier) {
Chttp2ServerListener* listener =
new Chttp2ServerListener(server, args, args_modifier);
grpc_error_handle error =
grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_,
args.ToC().get(), &listener->tcp_server_);
if (!GRPC_ERROR_IS_NONE(error)) {
delete listener;
return error;
}
// TODO(yangg) channelz
TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name);
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return GRPC_ERROR_NONE;
}
Chttp2ServerListener::Chttp2ServerListener(
Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier)
: server_(server),
args_modifier_(args_modifier),
args_(args),
memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()) {
GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete,
this, grpc_schedule_on_exec_ctx);
}
Chttp2ServerListener::~Chttp2ServerListener() {
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (on_destroy_done_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE);
ExecCtx::Get()->Flush();
}
}
/* Server callback: start listening on our ports */
void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (server_->config_fetcher() != nullptr) {
auto watcher = absl::make_unique<ConfigFetcherWatcher>(Ref());
config_fetcher_watcher_ = watcher.get();
server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false).value(),
std::move(watcher));
} else {
{
MutexLock lock(&mu_);
started_ = true;
is_serving_ = true;
}
StartListening();
}
}
void Chttp2ServerListener::StartListening() {
grpc_tcp_server_start(tcp_server_, &server_->pollsets(), OnAccept, this);
}
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
MutexLock lock(&mu_);
on_destroy_done_ = on_destroy_done;
}
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
ChannelArgs args = self->args_;
RefCountedPtr<grpc_server_config_fetcher::ConnectionManager>
connection_manager;
{
MutexLock lock(&self->mu_);
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
grpc_endpoint_shutdown(tcp, error);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
};
if (self->server_->config_fetcher() != nullptr) {
if (connection_manager == nullptr) {
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No ConnectionManager configured. Closing connection.");
endpoint_cleanup(error);
return;
}
absl::StatusOr<ChannelArgs> args_result =
connection_manager->UpdateChannelArgsForConnection(args, tcp);
if (!args_result.ok()) {
gpr_log(GPR_DEBUG, "Closing connection: %s",
args_result.status().ToString().c_str());
endpoint_cleanup(
GRPC_ERROR_CREATE_FROM_CPP_STRING(args_result.status().ToString()));
return;
}
grpc_error_handle error = GRPC_ERROR_NONE;
args = self->args_modifier_(*args_result, &error);
if (!GRPC_ERROR_IS_NONE(error)) {
gpr_log(GPR_DEBUG, "Closing connection: %s",
grpc_error_std_string(error).c_str());
endpoint_cleanup(error);
return;
}
}
auto memory_owner = self->memory_quota_->CreateMemoryOwner(
absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel"));
auto connection = memory_owner.MakeOrphanable<ActiveConnection>(
accepting_pollset, acceptor, args, std::move(memory_owner));
// We no longer own acceptor
acceptor = nullptr;
// Hold a ref to connection to allow starting handshake outside the
// critical region
RefCountedPtr<ActiveConnection> connection_ref = connection->Ref();
RefCountedPtr<Chttp2ServerListener> listener_ref;
{
MutexLock lock(&self->mu_);
// Shutdown the the connection if listener's stopped serving or if the
// connection manager has changed.
if (!self->shutdown_ && self->is_serving_ &&
connection_manager == self->connection_manager_) {
// This ref needs to be taken in the critical region after having made
// sure that the listener has not been Orphaned, so as to avoid
// heap-use-after-free issues where `Ref()` is invoked when the ref of
// tcp_server_ has already reached 0. (Ref() implementation of
// Chttp2ServerListener is grpc_tcp_server_ref().)
listener_ref = self->Ref();
self->connections_.emplace(connection.get(), std::move(connection));
}
}
if (connection != nullptr) {
endpoint_cleanup(GRPC_ERROR_NONE);
} else {
connection_ref->Start(std::move(listener_ref), tcp, args);
}
}
void Chttp2ServerListener::TcpServerShutdownComplete(void* arg,
grpc_error_handle error) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
self->channelz_listen_socket_.reset();
GRPC_ERROR_UNREF(error);
delete self;
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks) */
void Chttp2ServerListener::Orphan() {
// Cancel the watch before shutting down so as to avoid holding a ref to the
// listener in the watcher.
if (config_fetcher_watcher_ != nullptr) {
server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
}
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
grpc_tcp_server* tcp_server;
{
MutexLock lock(&mu_);
shutdown_ = true;
is_serving_ = false;
// Orphan the connections so that they can start cleaning up.
connections = std::move(connections_);
// If the listener is currently set to be serving but has not been started
// yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
// operation to finish to avoid causing races.
while (is_serving_ && !started_) {
started_cv_.Wait(&mu_);
}
tcp_server = tcp_server_;
}
grpc_tcp_server_shutdown_listeners(tcp_server);
grpc_tcp_server_unref(tcp_server);
}
} // namespace
//
// Chttp2ServerAddPort()
//
grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier,
int* port_num) {
if (addr == nullptr) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Invalid address: addr cannot be a nullptr.");
}
if (strncmp(addr, "external:", 9) == 0) {
return Chttp2ServerListener::CreateWithAcceptor(server, addr, args,
args_modifier);
}
*port_num = -1;
absl::StatusOr<std::vector<grpc_resolved_address>> resolved_or;
std::vector<grpc_error_handle> error_list;
std::string parsed_addr = URI::PercentDecode(addr);
absl::string_view parsed_addr_unprefixed{parsed_addr};
// Using lambda to avoid use of goto.
grpc_error_handle error = [&]() {
grpc_error_handle error = GRPC_ERROR_NONE;
if (absl::ConsumePrefix(&parsed_addr_unprefixed, kUnixUriPrefix)) {
resolved_or = grpc_resolve_unix_domain_address(parsed_addr_unprefixed);
} else if (absl::ConsumePrefix(&parsed_addr_unprefixed,
kUnixAbstractUriPrefix)) {
resolved_or =
grpc_resolve_unix_abstract_domain_address(parsed_addr_unprefixed);
} else {
resolved_or =
GetDNSResolver()->LookupHostnameBlocking(parsed_addr, "https");
}
if (!resolved_or.ok()) {
return absl_status_to_grpc_error(resolved_or.status());
}
// Create a listener for each resolved address.
for (auto& addr : *resolved_or) {
// If address has a wildcard port (0), use the same port as a previous
// listener.
if (*port_num != -1 && grpc_sockaddr_get_port(&addr) == 0) {
grpc_sockaddr_set_port(&addr, *port_num);
}
int port_temp = -1;
error = Chttp2ServerListener::Create(server, &addr, args, args_modifier,
&port_temp);
if (!GRPC_ERROR_IS_NONE(error)) {
error_list.push_back(error);
} else {
if (*port_num == -1) {
*port_num = port_temp;
} else {
GPR_ASSERT(*port_num == port_temp);
}
}
}
if (error_list.size() == resolved_or->size()) {
std::string msg = absl::StrFormat(
"No address added out of total %" PRIuPTR " resolved for '%s'",
resolved_or->size(), addr);
return GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
} else if (!error_list.empty()) {
std::string msg = absl::StrFormat(
"Only %" PRIuPTR " addresses added out of total %" PRIuPTR
" resolved",
resolved_or->size() - error_list.size(), resolved_or->size());
error = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
msg.c_str(), error_list.data(), error_list.size());
gpr_log(GPR_INFO, "WARNING: %s", grpc_error_std_string(error).c_str());
GRPC_ERROR_UNREF(error);
// we managed to bind some addresses: continue without error
}
return GRPC_ERROR_NONE;
}(); // lambda end
for (const grpc_error_handle& error : error_list) {
GRPC_ERROR_UNREF(error);
}
if (!GRPC_ERROR_IS_NONE(error)) *port_num = 0;
return error;
}
namespace {
ChannelArgs ModifyArgsForConnection(const ChannelArgs& args,
grpc_error_handle* error) {
auto* server_credentials = args.GetObject<grpc_server_credentials>();
if (server_credentials == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Could not find server credentials");
return args;
}
auto security_connector = server_credentials->create_security_connector(args);
if (security_connector == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("Unable to create secure server with credentials of type ",
server_credentials->type().name()));
return args;
}
return args.SetObject(security_connector);
}
} // namespace
} // namespace grpc_core
int grpc_server_add_http2_port(grpc_server* server, const char* addr,
grpc_server_credentials* creds) {
grpc_core::ExecCtx exec_ctx;
grpc_error_handle err = GRPC_ERROR_NONE;
grpc_core::RefCountedPtr<grpc_server_security_connector> sc;
int port_num = 0;
grpc_core::Server* core_server = grpc_core::Server::FromC(server);
grpc_core::ChannelArgs args = core_server->channel_args();
GRPC_API_TRACE("grpc_server_add_http2_port(server=%p, addr=%s, creds=%p)", 3,
(server, addr, creds));
// Create security context.
if (creds == nullptr) {
err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No credentials specified for secure server port (creds==NULL)");
goto done;
}
// TODO(yashykt): Ideally, we would not want to have different behavior here
// based on whether a config fetcher is configured or not. Currently, we have
// a feature for SSL credentials reloading with an application callback that
// assumes that there is a single security connector. If we delay the creation
// of the security connector to after the creation of the listener(s), we
// would have potentially multiple security connectors which breaks the
// assumption for SSL creds reloading. When the API for SSL creds reloading is
// rewritten, we would be able to make this workaround go away by removing
// that assumption. As an immediate drawback of this workaround, config
// fetchers need to be registered before adding ports to the server.
if (core_server->config_fetcher() != nullptr) {
// Create channel args.
args = args.SetObject(creds->Ref());
} else {
sc = creds->create_security_connector(grpc_core::ChannelArgs());
if (sc == nullptr) {
err = GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
"Unable to create secure server with credentials of type ",
creds->type().name()));
goto done;
}
args = args.SetObject(creds->Ref()).SetObject(sc);
}
// Add server port.
err = grpc_core::Chttp2ServerAddPort(
core_server, addr, args, grpc_core::ModifyArgsForConnection, &port_num);
done:
sc.reset(DEBUG_LOCATION, "server");
if (!GRPC_ERROR_IS_NONE(err)) {
gpr_log(GPR_ERROR, "%s", grpc_error_std_string(err).c_str());
GRPC_ERROR_UNREF(err);
}
return port_num;
}
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
grpc_server_credentials* creds) {
// For now, we only support insecure server credentials
if (creds == nullptr ||
creds->type() != grpc_core::InsecureServerCredentials::Type()) {
gpr_log(GPR_ERROR, "Failed to create channel due to invalid creds");
return;
}
grpc_core::ExecCtx exec_ctx;
grpc_core::Server* core_server = grpc_core::Server::FromC(server);
grpc_core::ChannelArgs server_args = core_server->channel_args();
std::string name = absl::StrCat("fd:", fd);
auto memory_quota =
server_args.GetObject<grpc_core::ResourceQuota>()->memory_quota();
grpc_endpoint* server_endpoint = grpc_tcp_create(
grpc_fd_create(fd, name.c_str(), true), server_args.ToC().get(), name);
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false /* is_client */
);
grpc_error_handle error =
core_server->SetupTransport(transport, nullptr, server_args, nullptr);
if (GRPC_ERROR_IS_NONE(error)) {
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_error_std_string(error).c_str());
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD
void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */,
grpc_server_credentials* /* creds */) {
GPR_ASSERT(0);
}
#endif // GPR_SUPPORT_CHANNELS_FROM_FD