blob: c2de5da4161b98084fd21b9a44d53345e812710a [file] [log] [blame]
// Copyright 2021 gRPC authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <assert.h>
#include <stddef.h>
#include <stdint.h>
#include <new>
#include <type_traits>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/promise_factory.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
template <typename T>
struct Pipe;
template <typename T>
class PipeSender;
template <typename T>
class PipeReceiver;
namespace pipe_detail {
template <typename T>
class Push;
template <typename T>
class Next;
// Center sits between a sender and a receiver to provide a one-deep buffer of
// Ts
template <typename T>
class Center {
// Initialize with one send ref (held by PipeSender) and one recv ref (held by
// PipeReceiver)
Center() {
send_refs_ = 1;
recv_refs_ = 1;
has_value_ = false;
// Add one ref to the send side of this object, and return this.
Center* RefSend() {
return this;
// Add one ref to the recv side of this object, and return this.
Center* RefRecv() {
return this;
// Drop a send side ref
// If no send refs remain, wake due to send closure
// If no refs remain, destroy this object
void UnrefSend() {
GPR_DEBUG_ASSERT(send_refs_ > 0);
if (0 == send_refs_) {
if (0 == recv_refs_) {
// Drop a recv side ref
// If no recv refs remain, wake due to recv closure
// If no refs remain, destroy this object
void UnrefRecv() {
GPR_DEBUG_ASSERT(recv_refs_ > 0);
if (0 == recv_refs_) {
if (0 == send_refs_) {
} else if (has_value_) {
// Try to push *value into the pipe.
// Return Pending if there is no space.
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
GPR_DEBUG_ASSERT(send_refs_ != 0);
if (recv_refs_ == 0) return false;
if (has_value_) return on_empty_.pending();
has_value_ = true;
value_ = std::move(*value);
return true;
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
GPR_DEBUG_ASSERT(recv_refs_ != 0);
if (!has_value_) {
if (send_refs_ == 0) return absl::nullopt;
return on_full_.pending();
has_value_ = false;
return std::move(value_);
void ResetValue() {
// Fancy dance to move out of value in the off chance that we reclaim some
// memory earlier.
[](T) {}(std::move(value_));
has_value_ = false;
T value_;
// Number of sending objects.
// 0 => send is closed.
// 1 ref each for PipeSender and Push.
uint8_t send_refs_ : 2;
// Number of receiving objects.
// 0 => recv is closed.
// 1 ref each for PipeReceiver and Next.
uint8_t recv_refs_ : 2;
// True iff there is a value in the pipe.
bool has_value_ : 1;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
} // namespace pipe_detail
// Send end of a Pipe.
template <typename T>
class PipeSender {
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
PipeSender(PipeSender&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
PipeSender& operator=(PipeSender&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
return *this;
~PipeSender() {
if (center_ != nullptr) center_->UnrefSend();
// Send a single message along the pipe.
// Returns a promise that will resolve to a bool - true if the message was
// sent, false if it could never be sent. Blocks the promise until the
// receiver is either closed or able to receive another message.
pipe_detail::Push<T> Push(T value);
friend struct Pipe<T>;
explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
// Receive end of a Pipe.
template <typename T>
class PipeReceiver {
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
PipeReceiver(PipeReceiver&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
PipeReceiver& operator=(PipeReceiver&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
~PipeReceiver() {
if (center_ != nullptr) center_->UnrefRecv();
// Receive a single message from the pipe.
// Returns a promise that will resolve to an optional<T> - with a value if a
// message was received, or no value if the other end of the pipe was closed.
// Blocks the promise until the receiver is either closed or a message is
// available.
pipe_detail::Next<T> Next();
friend struct Pipe<T>;
explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
pipe_detail::Center<T>* center_;
namespace pipe_detail {
// Implementation of PipeSender::Push promise.
template <typename T>
class Push {
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept
: center_(other.center_), push_(std::move(other.push_)) {
other.center_ = nullptr;
Push& operator=(Push&& other) noexcept {
if (center_ != nullptr) center_->UnrefSend();
center_ = other.center_;
other.center_ = nullptr;
push_ = std::move(other.push_);
return *this;
~Push() {
if (center_ != nullptr) center_->UnrefSend();
Poll<bool> operator()() { return center_->Push(&push_); }
friend class PipeSender<T>;
explicit Push(pipe_detail::Center<T>* center, T push)
: center_(center), push_(std::move(push)) {}
Center<T>* center_;
T push_;
// Implementation of PipeReceiver::Next promise.
template <typename T>
class Next {
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
Next(Next&& other) noexcept : center_(other.center_) {
other.center_ = nullptr;
Next& operator=(Next&& other) noexcept {
if (center_ != nullptr) center_->UnrefRecv();
center_ = other.center_;
other.center_ = nullptr;
return *this;
~Next() {
if (center_ != nullptr) center_->UnrefRecv();
Poll<absl::optional<T>> operator()() { return center_->Next(); }
friend class PipeReceiver<T>;
explicit Next(pipe_detail::Center<T>* center) : center_(center) {}
Center<T>* center_;
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
return pipe_detail::Push<T>(center_->RefSend(), std::move(value));
template <typename T>
pipe_detail::Next<T> PipeReceiver<T>::Next() {
return pipe_detail::Next<T>(center_->RefRecv());
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
// The primary Pipe data structure is allocated from an arena, so the activity
// must have an arena as part of its context.
// By performing that allocation we can ensure stable pointer to shared data
// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
// implementation.
// This type has been optimized with the expectation that there are relatively
// few pipes per activity. If this assumption does not hold then a design
// allowing inline filtering of pipe contents (instead of connecting pipes with
// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
Pipe() : Pipe(GetContext<Arena>()->New<pipe_detail::Center<T>>()) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
Pipe& operator=(Pipe&&) noexcept = default;
PipeSender<T> sender;
PipeReceiver<T> receiver;
explicit Pipe(pipe_detail::Center<T>* center)
: sender(center), receiver(center) {}
} // namespace grpc_core