blob: c3ec5ad447cb066596a9b1702f59662f6f68088f [file] [log] [blame]
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
#define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <stdlib.h>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/interceptor_list.h"
#include "src/core/lib/promise/intra_activity_waiter.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/trace.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc_core {
namespace pipe_detail {
template <typename T>
class Center;
}
template <typename T>
struct Pipe;
// Result of Pipe::Next - represents a received value.
// If has_value() is false, the pipe was closed by the time we polled for the
// next value. No value was received, nor will there ever be.
// This type is movable but not copyable.
// Once the final move is destroyed the pipe will ack the read and unblock the
// send.
template <typename T>
class NextResult final {
public:
NextResult() : center_(nullptr) {}
explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center)
: center_(std::move(center)) {
GPR_ASSERT(center_ != nullptr);
}
explicit NextResult(bool cancelled)
: center_(nullptr), cancelled_(cancelled) {}
~NextResult();
NextResult(const NextResult&) = delete;
NextResult& operator=(const NextResult&) = delete;
NextResult(NextResult&& other) noexcept = default;
NextResult& operator=(NextResult&& other) noexcept = default;
using value_type = T;
void reset();
bool has_value() const;
// Only valid if has_value()
const T& value() const {
GPR_ASSERT(has_value());
return **this;
}
T& value() {
GPR_ASSERT(has_value());
return **this;
}
const T& operator*() const;
T& operator*();
// Only valid if !has_value()
bool cancelled() { return cancelled_; }
private:
RefCountedPtr<pipe_detail::Center<T>> center_;
bool cancelled_;
};
namespace pipe_detail {
template <typename T>
class Push;
template <typename T>
class Next;
// Center sits between a sender and a receiver to provide a one-deep buffer of
// Ts
template <typename T>
class Center : public InterceptorList<T> {
public:
// Initialize with one send ref (held by PipeSender) and one recv ref (held by
// PipeReceiver)
Center() {
refs_ = 2;
value_state_ = ValueState::kEmpty;
}
// Add one ref to this object, and return this.
void IncrementRefCount() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s", DebugOpString("IncrementRefCount").c_str());
}
refs_++;
GPR_DEBUG_ASSERT(refs_ != 0);
}
RefCountedPtr<Center> Ref() {
IncrementRefCount();
return RefCountedPtr<Center>(this);
}
// Drop a ref
// If no refs remain, destroy this object
void Unref() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s", DebugOpString("Unref").c_str());
}
GPR_DEBUG_ASSERT(refs_ > 0);
refs_--;
if (0 == refs_) {
this->~Center();
}
}
// Try to push *value into the pipe.
// Return Pending if there is no space.
// Return true if the value was pushed.
// Return false if the recv end is closed.
Poll<bool> Push(T* value) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str());
}
GPR_DEBUG_ASSERT(refs_ != 0);
switch (value_state_) {
case ValueState::kClosed:
case ValueState::kReadyClosed:
case ValueState::kCancelled:
return false;
case ValueState::kReady:
case ValueState::kAcked:
return on_empty_.pending();
case ValueState::kEmpty:
value_state_ = ValueState::kReady;
value_ = std::move(*value);
on_full_.Wake();
return true;
}
GPR_UNREACHABLE_CODE(return false);
}
Poll<bool> PollAck() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str());
}
GPR_DEBUG_ASSERT(refs_ != 0);
switch (value_state_) {
case ValueState::kClosed:
case ValueState::kReadyClosed:
case ValueState::kCancelled:
return false;
case ValueState::kReady:
case ValueState::kEmpty:
return on_empty_.pending();
case ValueState::kAcked:
value_state_ = ValueState::kEmpty;
on_empty_.Wake();
return true;
}
return true;
}
// Try to receive a value from the pipe.
// Return Pending if there is no value.
// Return the value if one was retrieved.
// Return nullopt if the send end is closed and no value had been pushed.
Poll<absl::optional<T>> Next() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str());
}
GPR_DEBUG_ASSERT(refs_ != 0);
switch (value_state_) {
case ValueState::kEmpty:
case ValueState::kAcked:
return on_full_.pending();
case ValueState::kReadyClosed:
this->ResetInterceptorList();
value_state_ = ValueState::kClosed;
ABSL_FALLTHROUGH_INTENDED;
case ValueState::kReady:
return std::move(value_);
case ValueState::kClosed:
case ValueState::kCancelled:
return absl::nullopt;
}
GPR_UNREACHABLE_CODE(return absl::nullopt);
}
void AckNext() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str());
}
switch (value_state_) {
case ValueState::kReady:
value_state_ = ValueState::kAcked;
on_empty_.Wake();
break;
case ValueState::kReadyClosed:
this->ResetInterceptorList();
value_state_ = ValueState::kClosed;
break;
case ValueState::kClosed:
case ValueState::kCancelled:
break;
case ValueState::kEmpty:
case ValueState::kAcked:
abort();
}
}
void MarkClosed() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("MarkClosed").c_str());
}
switch (value_state_) {
case ValueState::kEmpty:
case ValueState::kAcked:
this->ResetInterceptorList();
value_state_ = ValueState::kClosed;
on_full_.Wake();
break;
case ValueState::kReady:
value_state_ = ValueState::kReadyClosed;
break;
case ValueState::kReadyClosed:
case ValueState::kClosed:
case ValueState::kCancelled:
break;
}
}
void MarkCancelled() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s", DebugOpString("MarkCancelled").c_str());
}
switch (value_state_) {
case ValueState::kEmpty:
case ValueState::kAcked:
case ValueState::kReady:
case ValueState::kReadyClosed:
this->ResetInterceptorList();
value_state_ = ValueState::kCancelled;
on_full_.Wake();
break;
case ValueState::kClosed:
value_state_ = ValueState::kCancelled;
break;
case ValueState::kCancelled:
break;
}
}
bool cancelled() { return value_state_ == ValueState::kCancelled; }
T& value() { return value_; }
const T& value() const { return value_; }
std::string DebugTag() {
if (auto* activity = Activity::current()) {
return absl::StrCat(activity->DebugTag(), " PIPE[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
} else {
return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: ");
}
}
private:
// State of value_.
enum class ValueState : uint8_t {
// No value is set, it's possible to send.
kEmpty,
// Value has been pushed but not acked, it's possible to receive.
kReady,
// Value has been received and acked, we can unblock senders and transition
// to empty.
kAcked,
// Pipe is closed successfully, no more values can be sent
kClosed,
// Pipe is closed successfully, no more values can be sent
// (but one value is queued and ready to be received)
kReadyClosed,
// Pipe is closed unsuccessfully, no more values can be sent
kCancelled,
};
std::string DebugOpString(std::string op) {
return absl::StrCat(DebugTag(), op, " refs=", refs_,
" value_state=", ValueStateName(value_state_),
" on_empty=", on_empty_.DebugString().c_str(),
" on_full=", on_full_.DebugString().c_str());
}
static const char* ValueStateName(ValueState state) {
switch (state) {
case ValueState::kEmpty:
return "Empty";
case ValueState::kReady:
return "Ready";
case ValueState::kAcked:
return "Acked";
case ValueState::kClosed:
return "Closed";
case ValueState::kReadyClosed:
return "ReadyClosed";
case ValueState::kCancelled:
return "Cancelled";
}
GPR_UNREACHABLE_CODE(return "unknown");
}
T value_;
// Number of refs
uint8_t refs_;
// Current state of the value.
ValueState value_state_;
IntraActivityWaiter on_empty_;
IntraActivityWaiter on_full_;
// Make failure to destruct show up in ASAN builds.
#ifndef NDEBUG
std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
#endif
};
} // namespace pipe_detail
// Send end of a Pipe.
template <typename T>
class PipeSender {
public:
using PushType = pipe_detail::Push<T>;
PipeSender(const PipeSender&) = delete;
PipeSender& operator=(const PipeSender&) = delete;
PipeSender(PipeSender&& other) noexcept = default;
PipeSender& operator=(PipeSender&& other) noexcept = default;
~PipeSender() {
if (center_ != nullptr) center_->MarkClosed();
}
void Close() {
if (center_ != nullptr) {
center_->MarkClosed();
center_.reset();
}
}
void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
// Send a single message along the pipe.
// Returns a promise that will resolve to a bool - true if the message was
// sent, false if it could never be sent. Blocks the promise until the
// receiver is either closed or able to receive another message.
PushType Push(T value);
template <typename Fn>
void InterceptAndMap(Fn f, DebugLocation from = {}) {
center_->PrependMap(std::move(f), from);
}
template <typename Fn, typename OnHalfClose>
void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) {
center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
}
private:
friend struct Pipe<T>;
explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
RefCountedPtr<pipe_detail::Center<T>> center_;
// Make failure to destruct show up in ASAN builds.
#ifndef NDEBUG
std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
#endif
};
// Receive end of a Pipe.
template <typename T>
class PipeReceiver {
public:
PipeReceiver(const PipeReceiver&) = delete;
PipeReceiver& operator=(const PipeReceiver&) = delete;
PipeReceiver(PipeReceiver&& other) noexcept = default;
PipeReceiver& operator=(PipeReceiver&& other) noexcept = default;
~PipeReceiver() {
if (center_ != nullptr) center_->MarkClosed();
}
void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); }
// Receive a single message from the pipe.
// Returns a promise that will resolve to an optional<T> - with a value if a
// message was received, or no value if the other end of the pipe was closed.
// Blocks the promise until the receiver is either closed or a message is
// available.
auto Next();
template <typename Fn>
void InterceptAndMap(Fn f, DebugLocation from = {}) {
center_->AppendMap(std::move(f), from);
}
template <typename Fn, typename OnHalfClose>
void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn,
DebugLocation from = {}) {
center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
}
private:
friend struct Pipe<T>;
explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
RefCountedPtr<pipe_detail::Center<T>> center_;
// Make failure to destruct show up in ASAN builds.
#ifndef NDEBUG
std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
#endif
};
namespace pipe_detail {
// Implementation of PipeSender::Push promise.
template <typename T>
class Push {
public:
Push(const Push&) = delete;
Push& operator=(const Push&) = delete;
Push(Push&& other) noexcept = default;
Push& operator=(Push&& other) noexcept = default;
Poll<bool> operator()() {
if (center_ == nullptr) return false;
if (auto* p = absl::get_if<T>(&state_)) {
auto r = center_->Push(p);
if (auto* ok = r.value_if_ready()) {
state_.template emplace<AwaitingAck>();
if (!*ok) return false;
} else {
return Pending{};
}
}
GPR_DEBUG_ASSERT(absl::holds_alternative<AwaitingAck>(state_));
return center_->PollAck();
}
private:
struct AwaitingAck {};
friend class PipeSender<T>;
explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push)
: center_(std::move(center)), state_(std::move(push)) {}
RefCountedPtr<Center<T>> center_;
absl::variant<T, AwaitingAck> state_;
};
// Implementation of PipeReceiver::Next promise.
template <typename T>
class Next {
public:
Next(const Next&) = delete;
Next& operator=(const Next&) = delete;
Next(Next&& other) noexcept = default;
Next& operator=(Next&& other) noexcept = default;
Poll<absl::optional<T>> operator()() { return center_->Next(); }
private:
friend class PipeReceiver<T>;
explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {}
RefCountedPtr<Center<T>> center_;
};
} // namespace pipe_detail
template <typename T>
pipe_detail::Push<T> PipeSender<T>::Push(T value) {
return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(),
std::move(value));
}
template <typename T>
auto PipeReceiver<T>::Next() {
return Seq(
pipe_detail::Next<T>(center_->Ref()),
[center = center_->Ref()](absl::optional<T> value) {
bool open = value.has_value();
bool cancelled = center->cancelled();
return If(
open,
[center = std::move(center), value = std::move(value)]() mutable {
auto run_interceptors = center->Run(std::move(value));
return Map(std::move(run_interceptors),
[center = std::move(center)](
absl::optional<T> value) mutable {
if (value.has_value()) {
center->value() = std::move(*value);
return NextResult<T>(std::move(center));
} else {
center->MarkCancelled();
return NextResult<T>(true);
}
});
},
[cancelled]() { return NextResult<T>(cancelled); });
});
}
template <typename T>
using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next());
template <typename T>
bool NextResult<T>::has_value() const {
return center_ != nullptr;
}
template <typename T>
T& NextResult<T>::operator*() {
return center_->value();
}
template <typename T>
const T& NextResult<T>::operator*() const {
return center_->value();
}
template <typename T>
NextResult<T>::~NextResult() {
if (center_ != nullptr) center_->AckNext();
}
template <typename T>
void NextResult<T>::reset() {
if (center_ != nullptr) {
center_->AckNext();
center_.reset();
}
}
// A Pipe is an intra-Activity communications channel that transmits T's from
// one end to the other.
// It is only safe to use a Pipe within the context of a single Activity.
// No synchronization is performed internally.
// The primary Pipe data structure is allocated from an arena, so the activity
// must have an arena as part of its context.
// By performing that allocation we can ensure stable pointer to shared data
// allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
// implementation.
// This type has been optimized with the expectation that there are relatively
// few pipes per activity. If this assumption does not hold then a design
// allowing inline filtering of pipe contents (instead of connecting pipes with
// polling code) would likely be more appropriate.
template <typename T>
struct Pipe {
Pipe() : Pipe(GetContext<Arena>()) {}
explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {}
Pipe(const Pipe&) = delete;
Pipe& operator=(const Pipe&) = delete;
Pipe(Pipe&&) noexcept = default;
Pipe& operator=(Pipe&&) noexcept = default;
PipeSender<T> sender;
PipeReceiver<T> receiver;
private:
explicit Pipe(pipe_detail::Center<T>* center)
: sender(center), receiver(center) {}
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H