blob: e9d6e9d8bcc03f5aba47d544f9eaccde3ce0c749 [file] [log] [blame]
// Copyright 2021 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GRPC_USE_EVENT_ENGINE
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/event_engine/sockaddr.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/event_engine/closure.h"
#include "src/core/lib/iomgr/event_engine/endpoint.h"
#include "src/core/lib/iomgr/event_engine/pollset.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/surface/init.h"
#include "src/core/lib/transport/error_utils.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
using ::grpc_event_engine::experimental::GrpcClosureToStatusCallback;
using ::grpc_event_engine::experimental::SliceAllocator;
using ::grpc_event_engine::experimental::SliceAllocatorFactory;
using ::grpc_event_engine::experimental::SliceBuffer;
} // namespace
class WrappedInternalSliceAllocator : public SliceAllocator {
public:
explicit WrappedInternalSliceAllocator(grpc_slice_allocator* slice_allocator)
: slice_allocator_(slice_allocator) {}
~WrappedInternalSliceAllocator() {
grpc_slice_allocator_destroy(slice_allocator_);
}
absl::Status Allocate(size_t size, SliceBuffer* dest,
SliceAllocator::AllocateCallback cb) override {
// TODO(nnoble): requires the SliceBuffer definition.
grpc_slice_allocator_allocate(
slice_allocator_, size, 1, grpc_slice_allocator_intent::kReadBuffer,
dest->RawSliceBuffer(),
[](void* arg, grpc_error_handle error) {
auto cb = static_cast<SliceAllocator::AllocateCallback*>(arg);
(*cb)(grpc_error_to_absl_status(error));
delete cb;
},
new SliceAllocator::AllocateCallback(cb));
return absl::OkStatus();
}
private:
grpc_slice_allocator* slice_allocator_;
};
class WrappedInternalSliceAllocatorFactory : public SliceAllocatorFactory {
public:
explicit WrappedInternalSliceAllocatorFactory(
grpc_slice_allocator_factory* slice_allocator_factory)
: slice_allocator_factory_(slice_allocator_factory) {}
~WrappedInternalSliceAllocatorFactory() {
grpc_slice_allocator_factory_destroy(slice_allocator_factory_);
}
std::unique_ptr<SliceAllocator> CreateSliceAllocator(
absl::string_view peer_name) override {
return absl::make_unique<WrappedInternalSliceAllocator>(
grpc_slice_allocator_factory_create_slice_allocator(
slice_allocator_factory_, peer_name));
};
private:
grpc_slice_allocator_factory* slice_allocator_factory_;
};
struct grpc_tcp_server {
explicit grpc_tcp_server(std::unique_ptr<EventEngine::Listener> listener)
: refcount(1, GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) ? "tcp" : nullptr),
listener(std::move(listener)) {
shutdown_starting.head = nullptr;
shutdown_starting.tail = nullptr;
};
~grpc_tcp_server() {
grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &shutdown_starting);
grpc_core::ExecCtx::Get()->Flush();
}
grpc_core::RefCount refcount;
grpc_core::Mutex mu;
std::unique_ptr<EventEngine::Listener> listener;
grpc_closure_list shutdown_starting ABSL_GUARDED_BY(mu);
grpc_tcp_server_cb on_accept_internal;
void* on_accept_internal_arg;
};
namespace {
/// Converts a grpc_closure to an EventEngine Callback. The closure is expected
/// to already be initialized.
EventEngine::OnConnectCallback GrpcClosureToOnConnectCallback(
grpc_closure* closure, grpc_endpoint** endpoint_ptr) {
return [closure, endpoint_ptr](
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> endpoint) {
grpc_core::ExecCtx exec_ctx;
if (endpoint.ok()) {
auto* grpc_endpoint_out =
reinterpret_cast<grpc_event_engine_endpoint*>(*endpoint_ptr);
grpc_endpoint_out->endpoint = std::move(*endpoint);
} else {
grpc_endpoint_destroy(*endpoint_ptr);
*endpoint_ptr = nullptr;
}
grpc_core::Closure::Run(DEBUG_LOCATION, closure,
absl_status_to_grpc_error(endpoint.status()));
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
};
}
/// Usage note: this method does not take ownership of any pointer arguments.
void tcp_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
grpc_slice_allocator* slice_allocator,
grpc_pollset_set* /* interested_parties */,
const grpc_channel_args* channel_args,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
grpc_event_engine_endpoint* ee_endpoint =
reinterpret_cast<grpc_event_engine_endpoint*>(
grpc_tcp_create(channel_args, grpc_sockaddr_to_uri(addr)));
*endpoint = &ee_endpoint->base;
EventEngine::OnConnectCallback ee_on_connect =
GrpcClosureToOnConnectCallback(on_connect, endpoint);
auto ee_slice_allocator =
absl::make_unique<WrappedInternalSliceAllocator>(slice_allocator);
EventEngine::ResolvedAddress ra(reinterpret_cast<const sockaddr*>(addr->addr),
addr->len);
absl::Time ee_deadline =
grpc_core::ToAbslTime(deadline.as_timespec(GPR_CLOCK_MONOTONIC));
ChannelArgsEndpointConfig endpoint_config(channel_args);
absl::Status connected = GetDefaultEventEngine()->Connect(
ee_on_connect, ra, endpoint_config, std::move(ee_slice_allocator),
ee_deadline);
if (!connected.ok()) {
// EventEngine failed to start an asynchronous connect.
grpc_endpoint_destroy(*endpoint);
*endpoint = nullptr;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_connect,
absl_status_to_grpc_error(connected));
}
}
grpc_error_handle tcp_server_create(
grpc_closure* shutdown_complete, const grpc_channel_args* args,
grpc_slice_allocator_factory* slice_allocator_factory,
grpc_tcp_server** server) {
ChannelArgsEndpointConfig endpoint_config(args);
auto ee_slice_allocator_factory =
absl::make_unique<WrappedInternalSliceAllocatorFactory>(
slice_allocator_factory);
EventEngine* event_engine = GetDefaultEventEngine();
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> listener =
event_engine->CreateListener(
[server](std::unique_ptr<EventEngine::Endpoint> ee_endpoint,
const SliceAllocator& /*slice_allocator*/) {
grpc_core::ExecCtx exec_ctx;
GPR_ASSERT((*server)->on_accept_internal != nullptr);
grpc_event_engine_endpoint* iomgr_endpoint =
grpc_tcp_server_endpoint_create(std::move(ee_endpoint));
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(
gpr_zalloc(sizeof(*acceptor)));
acceptor->from_server = *server;
acceptor->external_connection = false;
(*server)->on_accept_internal((*server)->on_accept_internal_arg,
&iomgr_endpoint->base, nullptr,
acceptor);
exec_ctx.Flush();
grpc_pollset_ee_broadcast_event();
},
GrpcClosureToStatusCallback(shutdown_complete), endpoint_config,
std::move(ee_slice_allocator_factory));
if (!listener.ok()) {
return absl_status_to_grpc_error(listener.status());
}
*server = new grpc_tcp_server(std::move(*listener));
return GRPC_ERROR_NONE;
}
void tcp_server_start(grpc_tcp_server* server,
const std::vector<grpc_pollset*>* /* pollsets */,
grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
server->on_accept_internal = on_accept_cb;
server->on_accept_internal_arg = cb_arg;
// The iomgr API does not handle situations where the server cannot start, so
// a crash may be preferable for now.
GPR_ASSERT(server->listener->Start().ok());
}
grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* out_port) {
EventEngine::ResolvedAddress ra(reinterpret_cast<const sockaddr*>(addr->addr),
addr->len);
auto port = s->listener->Bind(ra);
if (!port.ok()) {
return absl_status_to_grpc_error(port.status());
}
*out_port = *port;
return GRPC_ERROR_NONE;
}
grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
grpc_tcp_server* /* s */) {
// EventEngine-iomgr does not support fds.
return nullptr;
}
unsigned tcp_server_port_fd_count(grpc_tcp_server* /* s */,
unsigned /* port_index */) {
return 0;
}
int tcp_server_port_fd(grpc_tcp_server* /* s */, unsigned /* port_index */,
unsigned /* fd_index */) {
// Note: only used internally
return -1;
}
grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
s->refcount.Ref(DEBUG_LOCATION, "server ref");
return s;
}
void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
grpc_closure* shutdown_starting) {
grpc_core::MutexLock lock(&s->mu);
grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
GRPC_ERROR_NONE);
}
void tcp_server_unref(grpc_tcp_server* s) {
if (GPR_UNLIKELY(s->refcount.Unref(DEBUG_LOCATION, "server unref"))) {
delete s;
}
}
// No-op, all are handled on listener unref
void tcp_server_shutdown_listeners(grpc_tcp_server* /* s */) {}
} // namespace
grpc_tcp_client_vtable grpc_event_engine_tcp_client_vtable = {tcp_connect};
grpc_tcp_server_vtable grpc_event_engine_tcp_server_vtable = {
tcp_server_create, tcp_server_start,
tcp_server_add_port, tcp_server_create_fd_handler,
tcp_server_port_fd_count, tcp_server_port_fd,
tcp_server_ref, tcp_server_shutdown_starting_add,
tcp_server_unref, tcp_server_shutdown_listeners};
// Methods that are expected to exist elsewhere in the codebase.
struct grpc_fd {
int fd;
};
grpc_fd* grpc_fd_create(int /* fd */, const char* /* name */,
bool /* track_err */) {
return nullptr;
}
grpc_endpoint* grpc_tcp_client_create_from_fd(
grpc_fd* /* fd */, const grpc_channel_args* /* channel_args */,
absl::string_view /* addr_str */,
grpc_slice_allocator* slice_allocator /* slice_allocator */) {
grpc_slice_allocator_destroy(slice_allocator);
return nullptr;
}
#endif // GRPC_USE_EVENT_ENGINE