blob: 3e8f6162601d605a841d297fb9f96c30e72284fa [file] [log] [blame]
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H
#include <grpc/support/port_platform.h>
// IWYU pragma: no_include <bits/types/struct_iovec.h>
#include <atomic>
#include <cstdint>
#include <memory>
#include <new>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/hash/hash.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/traced_buffer_list.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <sys/socket.h> // IWYU pragma: keep
#include <sys/types.h> // IWYU pragma: keep
#ifdef GRPC_MSG_IOVLEN_TYPE
typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
#else
typedef size_t msg_iovlen_type;
#endif
#endif // GRPC_POSIX_SOCKET_TCP
namespace grpc_event_engine {
namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
class TcpZerocopySendRecord {
public:
TcpZerocopySendRecord() { buf_.Clear(); };
~TcpZerocopySendRecord() { DebugAssertEmpty(); }
// TcpZerocopySendRecord contains a slice buffer holding the slices to be
// sent. Given the slices that we wish to send, and the current offset into
// the slice buffer (indicating which have already been sent), populate an
// iovec array that will be used for a zerocopy enabled sendmsg().
// unwind_slice_idx - input/output parameter. It indicates the index of last
// slice whose contents were partially sent in the previous sendmsg. After
// this function returns, it gets updated to to a new offset
// depending on the number of bytes which are decided to be sent in the
// current sendmsg.
// unwind_byte_idx - input/output parameter. It indicates the byte offset
// within the last slice whose contents were partially sent in the previous
// sendmsg. After this function returns, it gets updated to a new offset
// depending on the number of bytes which are decided to be sent in the
// current sendmsg.
// sending_length - total number of bytes to be sent in the current sendmsg.
// iov - An iovec array containing the bytes to be sent in the current
// sendmsg.
// Returns: the number of entries in the iovec array.
//
msg_iovlen_type PopulateIovs(size_t* unwind_slice_idx,
size_t* unwind_byte_idx, size_t* sending_length,
iovec* iov);
// A sendmsg() may not be able to send the bytes that we requested at this
// time, returning EAGAIN (possibly due to backpressure). In this case,
// unwind the offset into the slice buffer so we retry sending these bytes.
void UnwindIfThrottled(size_t unwind_slice_idx, size_t unwind_byte_idx) {
out_offset_.byte_idx = unwind_byte_idx;
out_offset_.slice_idx = unwind_slice_idx;
}
// Update the offset into the slice buffer based on how much we wanted to sent
// vs. what sendmsg() actually sent (which may be lower, possibly due to
// backpressure).
void UpdateOffsetForBytesSent(size_t sending_length, size_t actually_sent);
// Indicates whether all underlying data has been sent or not.
bool AllSlicesSent() { return out_offset_.slice_idx == buf_.Count(); }
// Reset this structure for a new tcp_write() with zerocopy.
void PrepareForSends(
grpc_event_engine::experimental::SliceBuffer& slices_to_send) {
DebugAssertEmpty();
out_offset_.slice_idx = 0;
out_offset_.byte_idx = 0;
buf_.Swap(slices_to_send);
Ref();
}
// References: 1 reference per sendmsg(), and 1 for the tcp_write().
void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
// Unref: called when we get an error queue notification for a sendmsg(), if a
// sendmsg() failed or when tcp_write() is done.
bool Unref() {
const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel);
GPR_DEBUG_ASSERT(prior > 0);
if (prior == 1) {
AllSendsComplete();
return true;
}
return false;
}
private:
struct OutgoingOffset {
size_t slice_idx = 0;
size_t byte_idx = 0;
};
void DebugAssertEmpty() {
GPR_DEBUG_ASSERT(buf_.Count() == 0);
GPR_DEBUG_ASSERT(buf_.Length() == 0);
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
}
// When all sendmsg() calls associated with this tcp_write() have been
// completed (ie. we have received the notifications for each sequence number
// for each sendmsg()) and all reference counts have been dropped, drop our
// reference to the underlying data since we no longer need it.
void AllSendsComplete() {
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
buf_.Clear();
}
grpc_event_engine::experimental::SliceBuffer buf_;
std::atomic<intptr_t> ref_{0};
OutgoingOffset out_offset_;
};
class TcpZerocopySendCtx {
public:
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; // 16KB
explicit TcpZerocopySendCtx(
bool zerocopy_enabled, int max_sends = kDefaultMaxSends,
size_t send_bytes_threshold = kDefaultSendBytesThreshold)
: max_sends_(max_sends),
free_send_records_size_(max_sends),
threshold_bytes_(send_bytes_threshold) {
send_records_ = static_cast<TcpZerocopySendRecord*>(
gpr_malloc(max_sends * sizeof(*send_records_)));
free_send_records_ = static_cast<TcpZerocopySendRecord**>(
gpr_malloc(max_sends * sizeof(*free_send_records_)));
if (send_records_ == nullptr || free_send_records_ == nullptr) {
gpr_free(send_records_);
gpr_free(free_send_records_);
gpr_log(GPR_INFO, "Disabling TCP TX zerocopy due to memory pressure.\n");
memory_limited_ = true;
enabled_ = false;
} else {
for (int idx = 0; idx < max_sends_; ++idx) {
new (send_records_ + idx) TcpZerocopySendRecord();
free_send_records_[idx] = send_records_ + idx;
}
enabled_ = zerocopy_enabled;
}
}
~TcpZerocopySendCtx() {
if (send_records_ != nullptr) {
for (int idx = 0; idx < max_sends_; ++idx) {
send_records_[idx].~TcpZerocopySendRecord();
}
}
gpr_free(send_records_);
gpr_free(free_send_records_);
}
// True if we were unable to allocate the various bookkeeping structures at
// transport initialization time. If memory limited, we do not zerocopy.
bool MemoryLimited() const { return memory_limited_; }
// TCP send zerocopy maintains an implicit sequence number for every
// successful sendmsg() with zerocopy enabled; the kernel later gives us an
// error queue notification with this sequence number indicating that the
// underlying data buffers that we sent can now be released. Once that
// notification is received, we can release the buffers associated with this
// zerocopy send record. Here, we associate the sequence number with the data
// buffers that were sent with the corresponding call to sendmsg().
void NoteSend(TcpZerocopySendRecord* record) {
record->Ref();
{
grpc_core::MutexLock lock(&mu_);
is_in_write_ = true;
AssociateSeqWithSendRecordLocked(last_send_, record);
}
++last_send_;
}
// If sendmsg() actually failed, though, we need to revert the sequence number
// that we speculatively bumped before calling sendmsg(). Note that we bump
// this sequence number and perform relevant bookkeeping (see: NoteSend())
// *before* calling sendmsg() since, if we called it *after* sendmsg(), then
// there is a possible race with the release notification which could occur on
// another thread before we do the necessary bookkeeping. Hence, calling
// NoteSend() *before* sendmsg() and implementing an undo function is needed.
void UndoSend() {
--last_send_;
if (ReleaseSendRecord(last_send_)->Unref()) {
// We should still be holding the ref taken by tcp_write().
GPR_DEBUG_ASSERT(0);
}
}
// Simply associate this send record (and the underlying sent data buffers)
// with the implicit sequence number for this zerocopy sendmsg().
void AssociateSeqWithSendRecordLocked(uint32_t seq,
TcpZerocopySendRecord* record)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
ctx_lookup_.emplace(seq, record);
}
// Get a send record for a send that we wish to do with zerocopy.
TcpZerocopySendRecord* GetSendRecord() {
grpc_core::MutexLock lock(&mu_);
return TryGetSendRecordLocked();
}
// A given send record corresponds to a single tcp_write() with zerocopy
// enabled. This can result in several sendmsg() calls to flush all of the
// data to wire. Each sendmsg() takes a reference on the
// TcpZerocopySendRecord, and corresponds to a single sequence number.
// ReleaseSendRecord releases a reference on TcpZerocopySendRecord for a
// single sequence number. This is called either when we receive the relevant
// error queue notification (saying that we can discard the underlying
// buffers for this sendmsg()) is received from the kernel - or, in case
// sendmsg() was unsuccessful to begin with.
TcpZerocopySendRecord* ReleaseSendRecord(uint32_t seq) {
grpc_core::MutexLock lock(&mu_);
return ReleaseSendRecordLocked(seq);
}
// After all the references to a TcpZerocopySendRecord are released, we can
// add it back to the pool (of size max_sends_). Note that we can only have
// max_sends_ tcp_write() instances with zerocopy enabled in flight at the
// same time.
void PutSendRecord(TcpZerocopySendRecord* record) {
grpc_core::MutexLock lock(&mu_);
GPR_DEBUG_ASSERT(record >= send_records_ &&
record < send_records_ + max_sends_);
PutSendRecordLocked(record);
}
// Indicate that we are disposing of this zerocopy context. This indicator
// will prevent new zerocopy writes from being issued.
void Shutdown() { shutdown_.store(true, std::memory_order_release); }
// Indicates that there are no inflight tcp_write() instances with zerocopy
// enabled.
bool AllSendRecordsEmpty() {
grpc_core::MutexLock lock(&mu_);
return free_send_records_size_ == max_sends_;
}
bool Enabled() const { return enabled_; }
// Only use zerocopy if we are sending at least this many bytes. The
// additional overhead of reading the error queue for notifications means that
// zerocopy is not useful for small transfers.
size_t ThresholdBytes() const { return threshold_bytes_; }
// Expected to be called by handler reading messages from the err queue.
// It is used to indicate that some optmem memory is now available. It returns
// true to tell the caller to mark the file descriptor as immediately
// writable.
//
// OptMem (controlled by the kernel option optmem_max) refers to the memory
// allocated to the cmsg list maintained by the kernel that contains "extra"
// packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
// the kernel to allocate more memory as needed for more control messages that
// need to be sent for each socket connected.
//
// If a write is currently in progress on the socket (ie. we have issued a
// sendmsg() and are about to check its return value) then we set omem state
// to CHECK to make the sending thread know that some tcp_omem was
// concurrently freed even if sendmsg() returns ENOBUFS. In this case, since
// there is already an active send thread, we do not need to mark the
// socket writeable, so we return false.
//
// If there was no write in progress on the socket, and the socket was not
// marked as FULL, then we need not mark the socket writeable now that some
// tcp_omem memory is freed since it was not considered as blocked on
// tcp_omem to begin with. So in this case, return false.
//
// But, if a write was not in progress and the omem state was FULL, then we
// need to mark the socket writeable since it is no longer blocked by
// tcp_omem. In this case, return true.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOptMemStateAfterFree() {
grpc_core::MutexLock lock(&mu_);
if (is_in_write_) {
zcopy_enobuf_state_ = OptMemState::kCheck;
return false;
}
GPR_DEBUG_ASSERT(zcopy_enobuf_state_ != OptMemState::kCheck);
if (zcopy_enobuf_state_ == OptMemState::kFull) {
// A previous sendmsg attempt was blocked by ENOBUFS. Return true to
// mark the fd as writable so the next write attempt could be made.
zcopy_enobuf_state_ = OptMemState::kOpen;
return true;
} else if (zcopy_enobuf_state_ == OptMemState::kOpen) {
// No need to mark the fd as writable because the previous write
// attempt did not encounter ENOBUFS.
return false;
} else {
// This state should never be reached because it implies that the previous
// state was CHECK and is_in_write is false. This means that after the
// previous sendmsg returned and set is_in_write to false, it did
// not update the z-copy change from CHECK to OPEN.
grpc_core::Crash("OMem state error!");
}
}
// Expected to be called by the thread calling sendmsg after the syscall
// invocation. is complete. If an ENOBUF is seen, it checks if the error
// handler (Tx0cp completions) has already run and free'ed up some OMem. It
// returns true indicating that the write can be attempted again immediately.
// If ENOBUFS was seen but no Tx0cp completions have been received between the
// sendmsg() and us taking this lock, then tcp_omem is still full from our
// point of view. Therefore, we do not signal that the socket is writeable
// with respect to the availability of tcp_omem. Therefore the function
// returns false. This indicates that another write should not be attempted
// immediately and the calling thread should wait until the socket is writable
// again. If ENOBUFS was not seen, then again return false because the next
// write should be attempted only when the socket is writable again.
//
// Please refer to the STATE TRANSITION DIAGRAM below for more details.
//
bool UpdateZeroCopyOptMemStateAfterSend(bool seen_enobuf, bool& constrained) {
grpc_core::MutexLock lock(&mu_);
is_in_write_ = false;
constrained = false;
if (seen_enobuf) {
if (ctx_lookup_.size() == 1) {
// There is no un-acked z-copy record. Set constrained to true to
// indicate that we are re-source constrained because we're seeing
// ENOBUFS even for the first record. This indicates that either
// the process does not have hard memlock ulimit or RLIMIT_MEMLOCK
// configured correctly.
constrained = true;
}
if (zcopy_enobuf_state_ == OptMemState::kCheck) {
zcopy_enobuf_state_ = OptMemState::kOpen;
return true;
} else {
zcopy_enobuf_state_ = OptMemState::kFull;
}
} else if (zcopy_enobuf_state_ != OptMemState::kOpen) {
zcopy_enobuf_state_ = OptMemState::kOpen;
}
return false;
}
private:
// STATE TRANSITION DIAGRAM
//
// sendmsg succeeds Tx-zero copy succeeds and there is no active sendmsg
// ----<<--+ +------<<-------------------------------------+
// | | | |
// | | v sendmsg returns ENOBUFS |
// +-----> OPEN ------------->>-------------------------> FULL
// ^ |
// | |
// | sendmsg completes |
// +----<<---------- CHECK <-------<<-------------+
// Tx-zero copy succeeds and there is
// an active sendmsg
//
// OptMem (controlled by the kernel option optmem_max) refers to the memory
// allocated to the cmsg list maintained by the kernel that contains "extra"
// packet information like SCM_RIGHTS or IP_TTL. Increasing this option allows
// the kernel to allocate more memory as needed for more control messages that
// need to be sent for each socket connected. Each tx zero copy sendmsg has
// a corresponding entry added into the Optmem queue. The entry is popped
// from the Optmem queue when the zero copy send is complete.
enum class OptMemState : int8_t {
kOpen, // Everything is clear and omem is not full.
kFull, // The last sendmsg() has returned with an errno of ENOBUFS.
kCheck, // Error queue is read while is_in_write_ was true, so we should
// check this state after the sendmsg.
};
TcpZerocopySendRecord* ReleaseSendRecordLocked(uint32_t seq)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto iter = ctx_lookup_.find(seq);
GPR_DEBUG_ASSERT(iter != ctx_lookup_.end());
TcpZerocopySendRecord* record = iter->second;
ctx_lookup_.erase(iter);
return record;
}
TcpZerocopySendRecord* TryGetSendRecordLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (shutdown_.load(std::memory_order_acquire)) {
return nullptr;
}
if (free_send_records_size_ == 0) {
return nullptr;
}
free_send_records_size_--;
return free_send_records_[free_send_records_size_];
}
void PutSendRecordLocked(TcpZerocopySendRecord* record)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
GPR_DEBUG_ASSERT(free_send_records_size_ < max_sends_);
free_send_records_[free_send_records_size_] = record;
free_send_records_size_++;
}
TcpZerocopySendRecord* send_records_ ABSL_GUARDED_BY(mu_);
TcpZerocopySendRecord** free_send_records_ ABSL_GUARDED_BY(mu_);
int max_sends_;
int free_send_records_size_ ABSL_GUARDED_BY(mu_);
grpc_core::Mutex mu_;
uint32_t last_send_ = 0;
std::atomic<bool> shutdown_{false};
bool enabled_ = false;
size_t threshold_bytes_ = kDefaultSendBytesThreshold;
absl::flat_hash_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_
ABSL_GUARDED_BY(mu_);
bool memory_limited_ = false;
bool is_in_write_ ABSL_GUARDED_BY(mu_) = false;
OptMemState zcopy_enobuf_state_ ABSL_GUARDED_BY(mu_) = OptMemState::kOpen;
};
class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
public:
PosixEndpointImpl(
EventHandle* handle, PosixEngineClosure* on_done,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
grpc_event_engine::experimental::MemoryAllocator&& allocator,
const PosixTcpOptions& options);
~PosixEndpointImpl() override;
bool Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args);
bool Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args);
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const {
return peer_address_;
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const {
return local_address_;
}
int GetWrappedFd() { return fd_; }
bool CanTrackErrors() const { return poller_->CanTrackErrors(); }
void MaybeShutdown(
absl::Status why,
absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> on_release_fd);
private:
void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void HandleWrite(absl::Status status);
void HandleError(absl::Status status);
void HandleRead(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS;
bool HandleReadLocked(absl::Status& status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void MaybeMakeReadSlices() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
bool TcpDoRead(absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void FinishEstimate();
void AddToEstimate(size_t bytes);
void MaybePostReclaimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
void PerformReclamation() ABSL_LOCKS_EXCLUDED(read_mu_);
// Zero copy related helper methods.
TcpZerocopySendRecord* TcpGetSendZerocopyRecord(
grpc_event_engine::experimental::SliceBuffer& buf);
bool DoFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
bool TcpFlushZerocopy(TcpZerocopySendRecord* record, absl::Status& status);
bool TcpFlush(absl::Status& status);
void TcpShutdownTracedBufferList();
void UnrefMaybePutZerocopySendRecord(TcpZerocopySendRecord* record);
void ZerocopyDisableAndWaitForRemaining();
bool WriteWithTimestamps(struct msghdr* msg, size_t sending_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags);
absl::Status TcpAnnotateError(absl::Status src_error);
#ifdef GRPC_LINUX_ERRQUEUE
bool ProcessErrors();
// Reads a cmsg to process zerocopy control messages.
void ProcessZerocopy(struct cmsghdr* cmsg);
// Reads a cmsg to derive timestamps from the control messages.
struct cmsghdr* ProcessTimestamp(msghdr* msg, struct cmsghdr* cmsg);
#endif // GRPC_LINUX_ERRQUEUE
grpc_core::Mutex read_mu_;
PosixSocketWrapper sock_;
int fd_;
bool is_first_read_ = true;
bool has_posted_reclaimer_ ABSL_GUARDED_BY(read_mu_) = false;
double target_length_;
int min_read_chunk_size_;
int max_read_chunk_size_;
int set_rcvlowat_ = 0;
double bytes_read_this_round_ = 0;
std::atomic<int> ref_count_{1};
// garbage after the last read.
grpc_event_engine::experimental::SliceBuffer last_read_buffer_;
grpc_event_engine::experimental::SliceBuffer* incoming_buffer_
ABSL_GUARDED_BY(read_mu_) = nullptr;
// bytes pending on the socket from the last read.
int inq_ = 1;
// cache whether kernel supports inq.
bool inq_capable_ = false;
grpc_event_engine::experimental::SliceBuffer* outgoing_buffer_ = nullptr;
// byte within outgoing_buffer's slices[0] to write next.
size_t outgoing_byte_idx_ = 0;
PosixEngineClosure* on_read_ = nullptr;
PosixEngineClosure* on_write_ = nullptr;
PosixEngineClosure* on_error_ = nullptr;
PosixEngineClosure* on_done_ = nullptr;
absl::AnyInvocable<void(absl::Status)> read_cb_ ABSL_GUARDED_BY(read_mu_);
absl::AnyInvocable<void(absl::Status)> write_cb_;
grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_;
grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_;
// Maintain a shared_ptr to mem_quota_ to ensure the underlying basic memory
// quota is not deleted until the endpoint is destroyed.
grpc_core::MemoryQuotaRefPtr mem_quota_;
grpc_core::MemoryOwner memory_owner_;
grpc_core::MemoryAllocator::Reservation self_reservation_;
void* outgoing_buffer_arg_ = nullptr;
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_ = nullptr;
// A counter which starts at 0. It is initialized the first time the
// socket options for collecting timestamps are set, and is incremented
// with each byte sent.
int bytes_counter_ = -1;
// True if timestamping options are set on the socket.
#ifdef GRPC_LINUX_ERRQUEUE
bool socket_ts_enabled_ = false;
#endif // GRPC_LINUX_ERRQUEUE
// Cache whether we can set timestamping options
bool ts_capable_ = true;
// Set to 1 if we do not want to be notified on errors anymore.
std::atomic<bool> stop_error_notification_{false};
std::unique_ptr<TcpZerocopySendCtx> tcp_zerocopy_send_ctx_;
TcpZerocopySendRecord* current_zerocopy_send_ = nullptr;
// A hint from upper layers specifying the minimum number of bytes that need
// to be read to make meaningful progress.
int min_progress_size_ = 1;
TracedBufferList traced_buffers_;
// The handle is owned by the PosixEndpointImpl object.
EventHandle* handle_;
PosixEventPoller* poller_;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
};
class PosixEndpoint : public PosixEndpointWithFdSupport {
public:
PosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
grpc_event_engine::experimental::MemoryAllocator&& allocator,
const PosixTcpOptions& options)
: impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine),
std::move(allocator), options)) {}
bool Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args) override {
return impl_->Read(std::move(on_read), buffer, args);
}
bool Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args) override {
return impl_->Write(std::move(on_writable), data, args);
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const override {
return impl_->GetPeerAddress();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const override {
return impl_->GetLocalAddress();
}
int GetWrappedFd() override { return impl_->GetWrappedFd(); }
bool CanTrackErrors() override { return impl_->CanTrackErrors(); }
void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) override {
if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
std::move(on_release_fd));
}
}
~PosixEndpoint() override {
if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
nullptr);
}
}
private:
PosixEndpointImpl* impl_;
std::atomic<bool> shutdown_{false};
};
#else // GRPC_POSIX_SOCKET_TCP
class PosixEndpoint : public PosixEndpointWithFdSupport {
public:
PosixEndpoint() = default;
bool Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
grpc_event_engine::experimental::SliceBuffer* /*buffer*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
ReadArgs* /*args*/) override {
grpc_core::Crash("PosixEndpoint::Read not supported on this platform");
}
bool Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
grpc_event_engine::experimental::SliceBuffer* /*data*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
WriteArgs* /*args*/) override {
grpc_core::Crash("PosixEndpoint::Write not supported on this platform");
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const override {
grpc_core::Crash(
"PosixEndpoint::GetPeerAddress not supported on this platform");
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const override {
grpc_core::Crash(
"PosixEndpoint::GetLocalAddress not supported on this platform");
}
int GetWrappedFd() override {
grpc_core::Crash(
"PosixEndpoint::GetWrappedFd not supported on this platform");
}
bool CanTrackErrors() override {
grpc_core::Crash(
"PosixEndpoint::CanTrackErrors not supported on this platform");
}
void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) override {
grpc_core::Crash("PosixEndpoint::Shutdown not supported on this platform");
}
~PosixEndpoint() override = default;
};
#endif // GRPC_POSIX_SOCKET_TCP
// Create a PosixEndpoint.
// A shared_ptr of the EventEngine is passed to the endpoint to ensure that
// the EventEngine is alive for the lifetime of the endpoint. The ownership
// of the EventHandle is transferred to the endpoint.
std::unique_ptr<PosixEndpoint> CreatePosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown,
std::shared_ptr<EventEngine> engine,
grpc_event_engine::experimental::MemoryAllocator&& allocator,
const PosixTcpOptions& options);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENDPOINT_H